def _run_airflow_command()

in images/airflow/2.9.2/python/mwaa/execute_command.py [0:0]


def _run_airflow_command(cmd: str, environ: Dict[str, str]):
    """
    Run the given Airflow command in a subprocess.

    :param cmd - The command to run, e.g. "worker".
    :param environ: A dictionary containing the environment variables.
    """

    match cmd:
        case "scheduler":
            conditions = _create_airflow_process_conditions('scheduler')
            subprocesses = _create_airflow_scheduler_subprocesses(environ, conditions)
            # Schedulers, triggers, and DAG processors are all essential processes and
            # if any fails, we want to exit the container and let it restart.
            run_subprocesses(subprocesses)
        case "worker":
            run_subprocesses(_create_airflow_worker_subprocesses(environ))
        case "webserver":
            run_subprocesses(_create_airflow_webserver_subprocesses(environ))
        # Hybrid runs the scheduler and celery worker processes is a single container.
        case "hybrid":
            # The Sidecar healthcheck is currently limited to one healthcheck per port
            # so the hybrid container can only include healthchecks for one subprocess.
            # Only the worker healcheck conditions are enabled to monitor container health, so
            # we pass an empty list of conditions to the scheduler process and make worker
            # process essential.
            scheduler_subprocesses = _create_airflow_scheduler_subprocesses(environ, [])

            # Since both scheduler and workers are launched together as essential
            # the default patience interval for worker needs to be increased to better
            # allow for in-flight tasks to complete in case of scheduler failure.
            try:
                worker_patience_interval_seconds = os.getenv('MWAA__HYBRID_CONTAINER__SIGTERM_PATIENCE_INTERVAL', None)
                if worker_patience_interval_seconds is not None:
                    worker_patience_interval = timedelta(seconds=int(worker_patience_interval_seconds))
                else:
                    worker_patience_interval = HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT
            except (ValueError, TypeError):
                worker_patience_interval = HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT

            worker_subprocesses = _create_airflow_worker_subprocesses(environ,
                                                                   sigterm_patience_interval=worker_patience_interval)
            run_subprocesses(scheduler_subprocesses + worker_subprocesses)

        case _:
            raise ValueError(f"Unexpected command: {cmd}")