in airflow-core/src/airflow/jobs/scheduler_job_runner.py [0:0]
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
# added all() to save runtime, otherwise query is executed more than once
dag_runs: Collection[DagRun] = DagRun.get_queued_dag_runs_to_set_running(session).all()
query = (
select(
DagRun.dag_id,
DagRun.backfill_id,
func.count(DagRun.id).label("num_running"),
)
.where(DagRun.state == DagRunState.RUNNING)
.group_by(DagRun.dag_id, DagRun.backfill_id)
)
active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id, num in session.execute(query)})
@add_span
def _update_state(dag: DAG, dag_run: DagRun):
span = Trace.get_current_span()
span.set_attributes(
{
"state": str(DagRunState.RUNNING),
"run_id": dag_run.run_id,
"type": dag_run.run_type,
"dag_id": dag_run.dag_id,
}
)
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if (
dag.timetable.periodic
and dag_run.run_type != DagRunType.MANUAL
and dag_run.triggered_by != DagRunTriggeredByType.ASSET
and dag_run.clear_number < 1
):
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
# We only publish these metrics for scheduled dag runs and only
# when ``run_type`` is *MANUAL* and ``clear_number`` is 0.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)
Stats.timing(
"dagrun.schedule_delay",
schedule_delay,
tags={"dag_id": dag.dag_id},
)
if span.is_recording():
span.add_event(
name="schedule_delay",
attributes={"dag_id": dag.dag_id, "schedule_delay": str(schedule_delay)},
)
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
partial(self.scheduler_dag_bag.get_dag, session=session)
)
span = Trace.get_current_span()
for dag_run in dag_runs:
dag_id = dag_run.dag_id
run_id = dag_run.run_id
backfill_id = dag_run.backfill_id
backfill = dag_run.backfill
dag = dag_run.dag = cached_get_dag(dag_run)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
active_runs = active_runs_of_dags[(dag_id, backfill_id)]
if backfill_id is not None:
if active_runs >= backfill.max_active_runs:
# todo: delete all "candidate dag runs" from list for this dag right now
self.log.info(
"dag cannot be started due to backfill max_active_runs constraint; "
"active_runs=%s max_active_runs=%s dag_id=%s run_id=%s",
active_runs,
backfill.max_active_runs,
dag_id,
run_id,
)
continue
elif dag.max_active_runs:
if active_runs >= dag.max_active_runs:
# todo: delete all candidate dag runs for this dag from list right now
self.log.info(
"dag cannot be started due to dag max_active_runs constraint; "
"active_runs=%s max_active_runs=%s dag_id=%s run_id=%s",
active_runs,
dag_run.max_active_runs,
dag_run.dag_id,
dag_run.run_id,
)
continue
if span.is_recording():
span.add_event(
name="dag_run",
attributes={
"run_id": dag_run.run_id,
"dag_id": dag_run.dag_id,
"conf": str(dag_run.conf),
},
)
active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1
_update_state(dag, dag_run)
dag_run.notify_dagrun_state_changed()