in providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py [0:0]
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
# Skip cleaning the pod in the following scenarios.
# 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
# there. Cleaning it up again will raise an exception (which might cause retry).
# 2. remote pod is null (ex: pod creation failed)
if self._killed or not remote_pod:
return
istio_enabled = self.is_istio_enabled(remote_pod)
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None
# if the pod fails or success, but we don't want to delete it
if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
self.patch_already_checked(remote_pod, reraise=False)
failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name)
)
if failed:
if self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)
self.process_pod_deletion(remote_pod, reraise=False)
if self.skip_on_exit_code:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
) or []
base_container_status = next(
(x for x in container_statuses if x.name == self.base_container_name), None
)
exit_code = (
base_container_status.state.terminated.exit_code
if base_container_status
and base_container_status.state
and base_container_status.state.terminated
else None
)
if exit_code in self.skip_on_exit_code:
raise AirflowSkipException(
f"Pod {pod and pod.metadata.name} returned exit code {exit_code}. Skipping."
)
if failed:
error_message = get_container_termination_message(remote_pod, self.base_container_name)
raise AirflowException(
"\n".join(
filter(
None,
[
f"Pod {pod and pod.metadata.name} returned a failure.",
error_message if isinstance(error_message, str) else None,
f"remote_pod: {remote_pod}" if self.log_pod_spec_on_failure else None,
],
)
)
)