def cleanup()

in providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py [0:0]


    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
        # Skip cleaning the pod in the following scenarios.
        # 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
        # there. Cleaning it up again will raise an exception (which might cause retry).
        # 2. remote pod is null (ex: pod creation failed)
        if self._killed or not remote_pod:
            return

        istio_enabled = self.is_istio_enabled(remote_pod)
        pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None

        # if the pod fails or success, but we don't want to delete it
        if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
            self.patch_already_checked(remote_pod, reraise=False)

        failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
            istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name)
        )

        if failed:
            if self.log_events_on_failure:
                self._read_pod_events(pod, reraise=False)

        self.process_pod_deletion(remote_pod, reraise=False)

        if self.skip_on_exit_code:
            container_statuses = (
                remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
            ) or []
            base_container_status = next(
                (x for x in container_statuses if x.name == self.base_container_name), None
            )
            exit_code = (
                base_container_status.state.terminated.exit_code
                if base_container_status
                and base_container_status.state
                and base_container_status.state.terminated
                else None
            )
            if exit_code in self.skip_on_exit_code:
                raise AirflowSkipException(
                    f"Pod {pod and pod.metadata.name} returned exit code {exit_code}. Skipping."
                )

        if failed:
            error_message = get_container_termination_message(remote_pod, self.base_container_name)
            raise AirflowException(
                "\n".join(
                    filter(
                        None,
                        [
                            f"Pod {pod and pod.metadata.name} returned a failure.",
                            error_message if isinstance(error_message, str) else None,
                            f"remote_pod: {remote_pod}" if self.log_pod_spec_on_failure else None,
                        ],
                    )
                )
            )