in airflow-core/src/airflow/jobs/scheduler_job_runner.py [0:0]
def _run_scheduler_loop(self) -> None:
"""
Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop.
The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
"""
is_unit_test: bool = conf.getboolean("core", "unit_test_mode")
timers = EventScheduler()
# Check on start up, then every configured interval
self.adopt_or_reset_orphaned_tasks()
timers.call_regular_interval(
conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
self.adopt_or_reset_orphaned_tasks,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "trigger_timeout_check_interval", fallback=15.0),
self.check_trigger_timeouts,
)
timers.call_regular_interval(
30,
self._mark_backfills_complete,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
self._emit_pool_metrics,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0),
self._emit_running_ti_metrics,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
self._find_and_purge_task_instances_without_heartbeats,
)
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._handle_tasks_stuck_in_queued,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._update_asset_orphanage,
)
if any(x.is_local for x in self.job.executors):
bundle_cleanup_mgr = BundleUsageTrackingManager()
check_interval = conf.getint(
section="dag_processor",
key="stale_bundle_cleanup_interval",
)
if check_interval > 0:
timers.call_regular_interval(
delay=check_interval,
action=bundle_cleanup_mgr.remove_stale_bundle_versions,
)
for loop_count in itertools.count(start=1):
with (
Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span,
Stats.timer("scheduler.scheduler_loop_duration") as timer,
):
span.set_attributes(
{
"category": "scheduler",
"loop_count": loop_count,
}
)
with create_session() as session:
self._end_spans_of_externally_ended_ops(session)
# This will schedule for as many executors as possible.
num_queued_tis = self._do_scheduling(session)
# Don't keep any objects alive -- we've possibly just looked at 500+ ORM objects!
session.expunge_all()
# Heartbeat all executors, even if they're not receiving new tasks this loop. It will be
# either a no-op, or they will check-in on currently running tasks and send out new
# events to be processed below.
for executor in self.job.executors:
executor.heartbeat()
with create_session() as session:
num_finished_events = 0
for executor in self.job.executors:
num_finished_events += self._process_executor_events(
executor=executor, session=session
)
for executor in self.job.executors:
try:
with create_session() as session:
self._process_task_event_logs(executor._task_event_logs, session)
except Exception:
self.log.exception("Something went wrong when trying to save task event logs.")
# Heartbeat the scheduler periodically
perform_heartbeat(
job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
)
# Run any pending timed events
next_event = timers.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)
self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)
if span.is_recording():
span.add_event(
name="Ran scheduling loop",
attributes={
"duration in seconds": timer.duration,
},
)
if not is_unit_test and not num_queued_tis and not num_finished_events:
# If the scheduler is doing things, don't sleep. This means when there is work to do, the
# scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
# usage when "idle"
time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
if loop_count >= self.num_runs > 0:
self.log.info(
"Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
self.num_runs,
loop_count,
)
if span.is_recording():
span.add_event("Exiting scheduler loop as requested number of runs has been reached")
break