Source code for airflow.providers.cncf.kubernetes.pod_launcher_deprecated

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Launches pods."""
from __future__ import annotations

import json
import math
import time
import warnings
from typing import TYPE_CHECKING

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
from requests.exceptions import HTTPError

from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.pod_generator import PodDefaults
from airflow.settings import pod_mutation_hook
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State

if TYPE_CHECKING:
    from kubernetes.client.models.v1_pod import V1Pod

warnings.warn(
    """
    Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_manager`

    To use this module install the provider package by installing this pip package:

    https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/

    """,
    RemovedInAirflow3Warning,
    stacklevel=2,
)


[docs]class PodStatus: """Status of the pods."""
[docs] PENDING = "pending"
[docs] RUNNING = "running"
[docs] FAILED = "failed"
[docs] SUCCEEDED = "succeeded"
[docs]class PodLauncher(LoggingMixin): """ Deprecated class for launching pods. Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead. """ def __init__( self, kube_client: client.CoreV1Api = None, in_cluster: bool = True, cluster_context: str | None = None, extract_xcom: bool = False, ): """ Launch pods; DEPRECATED. Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead to create the launcher. :param kube_client: kubernetes client :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster :param extract_xcom: whether we should extract xcom """ super().__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) self._watch = watch.Watch() self.extract_xcom = extract_xcom
[docs] def run_pod_async(self, pod: V1Pod, **kwargs): """Run pod asynchronously.""" pod_mutation_hook(pod) sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) json_pod = json.dumps(sanitized_pod, indent=2) self.log.debug("Pod Creation Request: \n%s", json_pod) try: resp = self._client.create_namespaced_pod( body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs ) self.log.debug("Pod Creation Response: %s", resp) except Exception as e: self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod) raise e return resp
[docs] def delete_pod(self, pod: V1Pod): """Delete pod.""" try: self._client.delete_namespaced_pod( pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions() ) except ApiException as e: # If the pod is already deleted if e.status != 404: raise
[docs] def start_pod(self, pod: V1Pod, startup_timeout: int = 120): """ Launch the pod synchronously and wait for completion. :param pod: :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) :return: """ resp = self.run_pod_async(pod) start_time = time.monotonic() if resp.status.start_time is None: while self.pod_not_started(pod): self.log.warning("Pod not yet started: %s", pod.metadata.name) if time.monotonic() >= start_time + startup_timeout: raise AirflowException("Pod took too long to start") time.sleep(1)
[docs] def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]: """ Monitor a pod and return the final state. :param pod: pod spec that will be monitored :param get_logs: whether to read the logs locally """ if get_logs: read_logs_since_sec = None last_log_time = None while True: logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec) for line in logs: timestamp, message = self.parse_log_line(line.decode("utf-8")) if timestamp: last_log_time = pendulum.parse(timestamp) self.log.info(message) time.sleep(1) if not self.base_container_is_running(pod): break self.log.warning("Pod %s log read interrupted", pod.metadata.name) if last_log_time: delta = pendulum.now() - last_log_time # Prefer logs duplication rather than loss read_logs_since_sec = math.ceil(delta.total_seconds()) result = None if self.extract_xcom: while self.base_container_is_running(pod): self.log.info("Container %s has state %s", pod.metadata.name, State.RUNNING) time.sleep(2) result = self._extract_xcom(pod) self.log.info(result) result = json.loads(result) while self.pod_is_running(pod): self.log.info("Pod %s has state %s", pod.metadata.name, State.RUNNING) time.sleep(2) return self._task_status(self.read_pod(pod)), result
[docs] def parse_log_line(self, line: str) -> tuple[str | None, str]: """ Parse K8s log line and returns the final state. :param line: k8s log line :return: timestamp and log message """ timestamp, sep, message = line.strip().partition(" ") if not sep: self.log.error( "Error parsing timestamp (no timestamp in message: %r). " "Will continue execution but won't update timestamp", line, ) return None, line return timestamp, message
def _task_status(self, event): self.log.info("Event: %s had an event of type %s", event.metadata.name, event.status.phase) status = self.process_status(event.metadata.name, event.status.phase) return status
[docs] def pod_not_started(self, pod: V1Pod): """Test if pod has not started.""" state = self._task_status(self.read_pod(pod)) return state == State.QUEUED
[docs] def pod_is_running(self, pod: V1Pod): """Test if pod is running.""" state = self._task_status(self.read_pod(pod)) return state not in (State.SUCCESS, State.FAILED)
[docs] def base_container_is_running(self, pod: V1Pod): """Test if base container is running.""" event = self.read_pod(pod) status = next((s for s in event.status.container_statuses if s.name == "base"), None) if not status: return False return status.state.running is not None
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
[docs] def read_pod_logs( self, pod: V1Pod, tail_lines: int | None = None, timestamps: bool = False, since_seconds: int | None = None, ): """Read log from the pod.""" additional_kwargs = {} if since_seconds: additional_kwargs["since_seconds"] = since_seconds if tail_lines: additional_kwargs["tail_lines"] = tail_lines try: return self._client.read_namespaced_pod_log( name=pod.metadata.name, namespace=pod.metadata.namespace, container="base", follow=True, timestamps=timestamps, _preload_content=False, **additional_kwargs, ) except HTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}")
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
[docs] def read_pod_events(self, pod): """Read events from the pod.""" try: return self._client.list_namespaced_event( namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}" ) except HTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}")
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
[docs] def read_pod(self, pod: V1Pod): """Read pod information.""" try: return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) except HTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}")
def _extract_xcom(self, pod: V1Pod): resp = kubernetes_stream( self._client.connect_get_namespaced_pod_exec, pod.metadata.name, pod.metadata.namespace, container=PodDefaults.SIDECAR_CONTAINER_NAME, command=["/bin/sh"], stdin=True, stdout=True, stderr=True, tty=False, _preload_content=False, ) try: result = self._exec_pod_command(resp, f"cat {PodDefaults.XCOM_MOUNT_PATH}/return.json") self._exec_pod_command(resp, "kill -s SIGINT 1") finally: resp.close() if result is None: raise AirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}") return result def _exec_pod_command(self, resp, command): if resp.is_open(): self.log.info("Running command... %s\n", command) resp.write_stdin(command + "\n") while resp.is_open(): resp.update(timeout=1) if resp.peek_stdout(): return resp.read_stdout() if resp.peek_stderr(): self.log.info(resp.read_stderr()) break return None
[docs] def process_status(self, job_id, status): """Process status information for the job.""" status = status.lower() if status == PodStatus.PENDING: return State.QUEUED elif status == PodStatus.FAILED: self.log.error("Event with job id %s Failed", job_id) return State.FAILED elif status == PodStatus.SUCCEEDED: self.log.info("Event with job id %s Succeeded", job_id) return State.SUCCESS elif status == PodStatus.RUNNING: return State.RUNNING else: self.log.error("Event: Invalid state %s on job %s", status, job_id) return State.FAILED

Was this entry helpful?