operators/gcp_container_operator.py (87 lines of code) (raw):
import logging
import kubernetes.client as k8s
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase
from airflow.providers.google.cloud.links.kubernetes_engine import KubernetesEnginePodLink
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEStartPodOperator as UpstreamGKEPodOperator,
)
from airflow.utils.context import Context
logger = logging.getLogger(__name__)
class GKEPodOperatorCallbacks(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_completion(
*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs
) -> None:
# Allow eviction of completed pods so they don't prevent the cluster from scaling down.
pod_patch = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"}
)
)
client.patch_namespaced_pod(
pod.metadata.name, pod.metadata.namespace, pod_patch
)
class GKEPodOperator(UpstreamGKEPodOperator):
"""
Based off GKEStartPodOperator.
- In 1.10.x this inherited from upstream GKEPodOperator, rather than GKEStartPodOperator(v2)
- In 1.10.x we needed to override the execute and helper methods to set an environment
variable for authentication to work (CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE). Fixed in v2
- We will keep this class and call the upstream GkeStartPodOperator now, because
numerous places in our code references it still
- Overrides init to default image_pull_policy=Always, in_cluster=False,
do_xcom_push=False and GKE params
- Defaults reattach_on_restart=False to address a 1.10.12 regression where GkePodOperators
reruns will simply attach to an existing pod and not perform any new work.
- Hard sets reattach_on_restart=False when do_xcom_push=True to address an error
Retrying a failed task with do_xcom_push=True causes airflow to reattach to the pod
eventually causing a 'Handshake status 500 Internal Server Error'. Logs will indicate
'found a running pod with ... different try_number. Will attach to this pod and monitor
instead of starting new one'.
"""
def __init__(
self,
image_pull_policy="Always",
in_cluster=False,
startup_timeout_seconds=240,
do_xcom_push=False,
reattach_on_restart=False,
on_finish_action="keep_pod",
# Defined in Airflow's UI -> Admin -> Connections
gcp_conn_id="google_cloud_airflow_gke",
project_id="moz-fx-data-airflow-gke-prod",
location="us-west1",
cluster_name="workloads-prod-v1",
namespace="default",
*args,
**kwargs,
):
# Hard set reattach_on_restart = False when do_xcom_push is enabled.
if do_xcom_push:
reattach_on_restart = False
super().__init__(
*args,
image_pull_policy=image_pull_policy,
in_cluster=in_cluster,
startup_timeout_seconds=startup_timeout_seconds,
do_xcom_push=do_xcom_push,
reattach_on_restart=reattach_on_restart,
on_finish_action=on_finish_action,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
location=location,
cluster_name=cluster_name,
namespace=namespace,
callbacks=GKEPodOperatorCallbacks,
**kwargs,
)
def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod:
"""Set GKE pod link during pod creation.
Workaround for https://github.com/apache/airflow/issues/46658
This is meant for apache-airflow-providers-google==14.0.0
"""
pod = super().get_or_create_pod(pod_request_obj, context)
self.pod = pod
KubernetesEnginePodLink.persist(context=context, task_instance=self)
return pod
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
if pod is None:
return
# Since we default to on_finish_action="keep_pod" the pod may be left running if the task
# was stopped for a reason other than the pod succeeding/failing, like a pod startup timeout
# or a task execution timeout (this could be considered a bug in the Kubernetes provider).
# As a workaround we delete the pod during cleanup if it's still running.
try:
remote_pod: k8s.V1Pod = self.client.read_namespaced_pod(
pod.metadata.name, pod.metadata.namespace
)
if (
remote_pod.status.phase not in PodPhase.terminal_states
and self.on_finish_action != OnFinishAction.DELETE_POD
):
logger.info(
f"Deleting {remote_pod.status.phase.lower()} pod: {pod.metadata.name}"
)
self.pod_manager.delete_pod(remote_pod)
else:
super().process_pod_deletion(pod, reraise=reraise)
except Exception as e:
if isinstance(e, k8s.ApiException) and e.status == 404:
# Ignore "404 Not Found" errors.
logger.warning(f'Pod "{pod.metadata.name}" not found.')
elif reraise:
raise
else:
logger.exception(e)