def sync()

in providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py [0:0]


    def sync(self) -> None:
        """Synchronize task state."""
        if TYPE_CHECKING:
            assert self.scheduler_job_id
            assert self.kube_scheduler
            assert self.kube_config
            assert self.result_queue
            assert self.task_queue

        if self.running:
            self.log.debug("self.running: %s", self.running)
        if self.queued_tasks:
            self.log.debug("self.queued: %s", self.queued_tasks)
        self.kube_scheduler.sync()

        last_resource_version: dict[str, str] = defaultdict(lambda: "0")
        with contextlib.suppress(Empty):
            while True:
                results = self.result_queue.get_nowait()
                try:
                    key, state, pod_name, namespace, resource_version = results
                    last_resource_version[namespace] = resource_version
                    self.log.info("Changing state of %s to %s", results, state)
                    try:
                        self._change_state(key, state, pod_name, namespace)
                    except Exception as e:
                        self.log.exception(
                            "Exception: %s when attempting to change state of %s to %s, re-queueing.",
                            e,
                            results,
                            state,
                        )
                        self.result_queue.put(results)
                finally:
                    self.result_queue.task_done()

        from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion

        resource_instance = ResourceVersion()
        for ns in resource_instance.resource_version:
            resource_instance.resource_version[ns] = (
                last_resource_version[ns] or resource_instance.resource_version[ns]
            )

        from kubernetes.client.rest import ApiException

        with contextlib.suppress(Empty):
            for _ in range(self.kube_config.worker_pods_creation_batch_size):
                task = self.task_queue.get_nowait()

                try:
                    key, command, kube_executor_config, pod_template_file = task
                    self.kube_scheduler.run_next(task)
                    self.task_publish_retries.pop(key, None)
                except PodReconciliationError as e:
                    self.log.exception(
                        "Pod reconciliation failed, likely due to kubernetes library upgrade. "
                        "Try clearing the task to re-run.",
                    )
                    self.fail(task[0], e)
                except ApiException as e:
                    body = json.loads(e.body)
                    retries = self.task_publish_retries[key]
                    # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries
                    if (
                        str(e.status) == "403"
                        and "exceeded quota" in body["message"]
                        and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries)
                    ):
                        self.log.warning(
                            "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
                            self.task_publish_retries[key] + 1,
                            self.task_publish_max_retries,
                            key,
                            e.reason,
                            body["message"],
                        )
                        self.task_queue.put(task)
                        self.task_publish_retries[key] = retries + 1
                    else:
                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
                        key, _, _, _ = task
                        self.fail(key, e)
                        self.task_publish_retries.pop(key, None)
                except PodMutationHookException as e:
                    key, _, _, _ = task
                    self.log.error(
                        "Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
                        key,
                        e.__cause__,
                    )
                    self.fail(key, e)
                finally:
                    self.task_queue.task_done()