in providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py [0:0]
def sync(self) -> None:
"""Synchronize task state."""
if TYPE_CHECKING:
assert self.scheduler_job_id
assert self.kube_scheduler
assert self.kube_config
assert self.result_queue
assert self.task_queue
if self.running:
self.log.debug("self.running: %s", self.running)
if self.queued_tasks:
self.log.debug("self.queued: %s", self.queued_tasks)
self.kube_scheduler.sync()
last_resource_version: dict[str, str] = defaultdict(lambda: "0")
with contextlib.suppress(Empty):
while True:
results = self.result_queue.get_nowait()
try:
key, state, pod_name, namespace, resource_version = results
last_resource_version[namespace] = resource_version
self.log.info("Changing state of %s to %s", results, state)
try:
self._change_state(key, state, pod_name, namespace)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
e,
results,
state,
)
self.result_queue.put(results)
finally:
self.result_queue.task_done()
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion
resource_instance = ResourceVersion()
for ns in resource_instance.resource_version:
resource_instance.resource_version[ns] = (
last_resource_version[ns] or resource_instance.resource_version[ns]
)
from kubernetes.client.rest import ApiException
with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
try:
key, command, kube_executor_config, pod_template_file = task
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
self.log.exception(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
)
self.fail(task[0], e)
except ApiException as e:
body = json.loads(e.body)
retries = self.task_publish_retries[key]
# In case of exceeded quota errors, requeue the task as per the task_publish_max_retries
if (
str(e.status) == "403"
and "exceeded quota" in body["message"]
and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries)
):
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
self.task_publish_retries[key] + 1,
self.task_publish_max_retries,
key,
e.reason,
body["message"],
)
self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1
else:
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.fail(key, e)
self.task_publish_retries.pop(key, None)
except PodMutationHookException as e:
key, _, _, _ = task
self.log.error(
"Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
key,
e.__cause__,
)
self.fail(key, e)
finally:
self.task_queue.task_done()