operators/gcp_container_operator.py (87 lines of code) (raw):

import logging import kubernetes.client as k8s from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase from airflow.providers.google.cloud.links.kubernetes_engine import KubernetesEnginePodLink from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKEStartPodOperator as UpstreamGKEPodOperator, ) from airflow.utils.context import Context logger = logging.getLogger(__name__) class GKEPodOperatorCallbacks(KubernetesPodOperatorCallback): @staticmethod def on_pod_completion( *, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs ) -> None: # Allow eviction of completed pods so they don't prevent the cluster from scaling down. pod_patch = k8s.V1Pod( metadata=k8s.V1ObjectMeta( annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"} ) ) client.patch_namespaced_pod( pod.metadata.name, pod.metadata.namespace, pod_patch ) class GKEPodOperator(UpstreamGKEPodOperator): """ Based off GKEStartPodOperator. - In 1.10.x this inherited from upstream GKEPodOperator, rather than GKEStartPodOperator(v2) - In 1.10.x we needed to override the execute and helper methods to set an environment variable for authentication to work (CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE). Fixed in v2 - We will keep this class and call the upstream GkeStartPodOperator now, because numerous places in our code references it still - Overrides init to default image_pull_policy=Always, in_cluster=False, do_xcom_push=False and GKE params - Defaults reattach_on_restart=False to address a 1.10.12 regression where GkePodOperators reruns will simply attach to an existing pod and not perform any new work. - Hard sets reattach_on_restart=False when do_xcom_push=True to address an error Retrying a failed task with do_xcom_push=True causes airflow to reattach to the pod eventually causing a 'Handshake status 500 Internal Server Error'. Logs will indicate 'found a running pod with ... different try_number. Will attach to this pod and monitor instead of starting new one'. """ def __init__( self, image_pull_policy="Always", in_cluster=False, startup_timeout_seconds=240, do_xcom_push=False, reattach_on_restart=False, on_finish_action="keep_pod", # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id="google_cloud_airflow_gke", project_id="moz-fx-data-airflow-gke-prod", location="us-west1", cluster_name="workloads-prod-v1", namespace="default", *args, **kwargs, ): # Hard set reattach_on_restart = False when do_xcom_push is enabled. if do_xcom_push: reattach_on_restart = False super().__init__( *args, image_pull_policy=image_pull_policy, in_cluster=in_cluster, startup_timeout_seconds=startup_timeout_seconds, do_xcom_push=do_xcom_push, reattach_on_restart=reattach_on_restart, on_finish_action=on_finish_action, gcp_conn_id=gcp_conn_id, project_id=project_id, location=location, cluster_name=cluster_name, namespace=namespace, callbacks=GKEPodOperatorCallbacks, **kwargs, ) def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod: """Set GKE pod link during pod creation. Workaround for https://github.com/apache/airflow/issues/46658 This is meant for apache-airflow-providers-google==14.0.0 """ pod = super().get_or_create_pod(pod_request_obj, context) self.pod = pod KubernetesEnginePodLink.persist(context=context, task_instance=self) return pod def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): if pod is None: return # Since we default to on_finish_action="keep_pod" the pod may be left running if the task # was stopped for a reason other than the pod succeeding/failing, like a pod startup timeout # or a task execution timeout (this could be considered a bug in the Kubernetes provider). # As a workaround we delete the pod during cleanup if it's still running. try: remote_pod: k8s.V1Pod = self.client.read_namespaced_pod( pod.metadata.name, pod.metadata.namespace ) if ( remote_pod.status.phase not in PodPhase.terminal_states and self.on_finish_action != OnFinishAction.DELETE_POD ): logger.info( f"Deleting {remote_pod.status.phase.lower()} pod: {pod.metadata.name}" ) self.pod_manager.delete_pod(remote_pod) else: super().process_pod_deletion(pod, reraise=reraise) except Exception as e: if isinstance(e, k8s.ApiException) and e.status == 404: # Ignore "404 Not Found" errors. logger.warning(f'Pod "{pod.metadata.name}" not found.') elif reraise: raise else: logger.exception(e)