def _run_scheduler_loop()

in airflow-core/src/airflow/jobs/scheduler_job_runner.py [0:0]


    def _run_scheduler_loop(self) -> None:
        """
        Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop.

        The main steps in the loop are:
            #. Harvest DAG parsing results through DagFileProcessorAgent
            #. Find and queue executable tasks
                #. Change task instance state in DB
                #. Queue tasks in executor
            #. Heartbeat executor
                #. Execute queued tasks in executor asynchronously
                #. Sync on the states of running tasks
        """
        is_unit_test: bool = conf.getboolean("core", "unit_test_mode")

        timers = EventScheduler()

        # Check on start up, then every configured interval
        self.adopt_or_reset_orphaned_tasks()

        timers.call_regular_interval(
            conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
            self.adopt_or_reset_orphaned_tasks,
        )

        timers.call_regular_interval(
            conf.getfloat("scheduler", "trigger_timeout_check_interval", fallback=15.0),
            self.check_trigger_timeouts,
        )

        timers.call_regular_interval(
            30,
            self._mark_backfills_complete,
        )

        timers.call_regular_interval(
            conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
            self._emit_pool_metrics,
        )

        timers.call_regular_interval(
            conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0),
            self._emit_running_ti_metrics,
        )

        timers.call_regular_interval(
            conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
            self._find_and_purge_task_instances_without_heartbeats,
        )

        timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)

        timers.call_regular_interval(
            conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
            self._handle_tasks_stuck_in_queued,
        )

        timers.call_regular_interval(
            conf.getfloat("scheduler", "parsing_cleanup_interval"),
            self._update_asset_orphanage,
        )

        if any(x.is_local for x in self.job.executors):
            bundle_cleanup_mgr = BundleUsageTrackingManager()
            check_interval = conf.getint(
                section="dag_processor",
                key="stale_bundle_cleanup_interval",
            )
            if check_interval > 0:
                timers.call_regular_interval(
                    delay=check_interval,
                    action=bundle_cleanup_mgr.remove_stale_bundle_versions,
                )

        for loop_count in itertools.count(start=1):
            with (
                Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span,
                Stats.timer("scheduler.scheduler_loop_duration") as timer,
            ):
                span.set_attributes(
                    {
                        "category": "scheduler",
                        "loop_count": loop_count,
                    }
                )

                with create_session() as session:
                    self._end_spans_of_externally_ended_ops(session)

                    # This will schedule for as many executors as possible.
                    num_queued_tis = self._do_scheduling(session)
                    # Don't keep any objects alive -- we've possibly just looked at 500+ ORM objects!
                    session.expunge_all()

                # Heartbeat all executors, even if they're not receiving new tasks this loop. It will be
                # either a no-op, or they will check-in on currently running tasks and send out new
                # events to be processed below.
                for executor in self.job.executors:
                    executor.heartbeat()

                with create_session() as session:
                    num_finished_events = 0
                    for executor in self.job.executors:
                        num_finished_events += self._process_executor_events(
                            executor=executor, session=session
                        )

                for executor in self.job.executors:
                    try:
                        with create_session() as session:
                            self._process_task_event_logs(executor._task_event_logs, session)
                    except Exception:
                        self.log.exception("Something went wrong when trying to save task event logs.")

                # Heartbeat the scheduler periodically
                perform_heartbeat(
                    job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
                )

                # Run any pending timed events
                next_event = timers.run(blocking=False)
                self.log.debug("Next timed event is in %f", next_event)

            self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)
            if span.is_recording():
                span.add_event(
                    name="Ran scheduling loop",
                    attributes={
                        "duration in seconds": timer.duration,
                    },
                )

            if not is_unit_test and not num_queued_tis and not num_finished_events:
                # If the scheduler is doing things, don't sleep. This means when there is work to do, the
                # scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
                # usage when "idle"
                time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))

            if loop_count >= self.num_runs > 0:
                self.log.info(
                    "Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
                    self.num_runs,
                    loop_count,
                )
                if span.is_recording():
                    span.add_event("Exiting scheduler loop as requested number of runs has been reached")
                break