def _start_queued_dagruns()

in airflow-core/src/airflow/jobs/scheduler_job_runner.py [0:0]


    def _start_queued_dagruns(self, session: Session) -> None:
        """Find DagRuns in queued state and decide moving them to running state."""
        # added all() to save runtime, otherwise query is executed more than once
        dag_runs: Collection[DagRun] = DagRun.get_queued_dag_runs_to_set_running(session).all()

        query = (
            select(
                DagRun.dag_id,
                DagRun.backfill_id,
                func.count(DagRun.id).label("num_running"),
            )
            .where(DagRun.state == DagRunState.RUNNING)
            .group_by(DagRun.dag_id, DagRun.backfill_id)
        )
        active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id, num in session.execute(query)})

        @add_span
        def _update_state(dag: DAG, dag_run: DagRun):
            span = Trace.get_current_span()
            span.set_attributes(
                {
                    "state": str(DagRunState.RUNNING),
                    "run_id": dag_run.run_id,
                    "type": dag_run.run_type,
                    "dag_id": dag_run.dag_id,
                }
            )

            dag_run.state = DagRunState.RUNNING
            dag_run.start_date = timezone.utcnow()
            if (
                dag.timetable.periodic
                and dag_run.run_type != DagRunType.MANUAL
                and dag_run.triggered_by != DagRunTriggeredByType.ASSET
                and dag_run.clear_number < 1
            ):
                # TODO: Logically, this should be DagRunInfo.run_after, but the
                # information is not stored on a DagRun, only before the actual
                # execution on DagModel.next_dagrun_create_after. We should add
                # a field on DagRun for this instead of relying on the run
                # always happening immediately after the data interval.
                # We only publish these metrics for scheduled dag runs and only
                # when ``run_type`` is *MANUAL* and ``clear_number`` is 0.
                expected_start_date = dag.get_run_data_interval(dag_run).end
                schedule_delay = dag_run.start_date - expected_start_date
                # Publish metrics twice with backward compatible name, and then with tags
                Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)
                Stats.timing(
                    "dagrun.schedule_delay",
                    schedule_delay,
                    tags={"dag_id": dag.dag_id},
                )
                if span.is_recording():
                    span.add_event(
                        name="schedule_delay",
                        attributes={"dag_id": dag.dag_id, "schedule_delay": str(schedule_delay)},
                    )

        # cache saves time during scheduling of many dag_runs for same dag
        cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
            partial(self.scheduler_dag_bag.get_dag, session=session)
        )

        span = Trace.get_current_span()
        for dag_run in dag_runs:
            dag_id = dag_run.dag_id
            run_id = dag_run.run_id
            backfill_id = dag_run.backfill_id
            backfill = dag_run.backfill
            dag = dag_run.dag = cached_get_dag(dag_run)
            if not dag:
                self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                continue
            active_runs = active_runs_of_dags[(dag_id, backfill_id)]
            if backfill_id is not None:
                if active_runs >= backfill.max_active_runs:
                    # todo: delete all "candidate dag runs" from list for this dag right now
                    self.log.info(
                        "dag cannot be started due to backfill max_active_runs constraint; "
                        "active_runs=%s max_active_runs=%s dag_id=%s run_id=%s",
                        active_runs,
                        backfill.max_active_runs,
                        dag_id,
                        run_id,
                    )
                    continue
            elif dag.max_active_runs:
                if active_runs >= dag.max_active_runs:
                    # todo: delete all candidate dag runs for this dag from list right now
                    self.log.info(
                        "dag cannot be started due to dag max_active_runs constraint; "
                        "active_runs=%s max_active_runs=%s dag_id=%s run_id=%s",
                        active_runs,
                        dag_run.max_active_runs,
                        dag_run.dag_id,
                        dag_run.run_id,
                    )
                    continue
            if span.is_recording():
                span.add_event(
                    name="dag_run",
                    attributes={
                        "run_id": dag_run.run_id,
                        "dag_id": dag_run.dag_id,
                        "conf": str(dag_run.conf),
                    },
                )
            active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1
            _update_state(dag, dag_run)
            dag_run.notify_dagrun_state_changed()