in images/airflow/2.10.3/python/mwaa/entrypoint.py [0:0]
def run_airflow_command(cmd: str, environ: Dict[str, str]):
"""
Run the given Airflow command in a subprocess.
:param cmd - The command to run, e.g. "worker".
:param environ: A dictionary containing the environment variables.
"""
match cmd:
case "scheduler":
conditions = _create_airflow_process_conditions('scheduler')
subprocesses = _create_airflow_scheduler_subprocesses(environ, conditions)
# Schedulers, triggers, and DAG processors are all essential processes and
# if any fails, we want to exit the container and let it restart.
run_subprocesses(subprocesses)
case "worker":
run_subprocesses(_create_airflow_worker_subprocesses(environ))
case "webserver":
run_subprocesses(_create_airflow_webserver_subprocesses(environ))
# Hybrid runs the scheduler and celery worker processes is a single container.
case "hybrid":
# The Sidecar healthcheck is currently limited to one healthcheck per port
# so the hybrid container can only include healthchecks for one subprocess.
# Only the worker healcheck conditions are enabled to monitor container health, so
# we pass an empty list of conditions to the scheduler process and make worker
# process essential.
scheduler_subprocesses = _create_airflow_scheduler_subprocesses(environ, [])
# Since both scheduler and workers are launched together as essential
# the default patience interval for worker needs to be increased to better
# allow for in-flight tasks to complete in case of scheduler failure.
try:
worker_patience_interval_seconds = os.getenv('MWAA__HYBRID_CONTAINER__SIGTERM_PATIENCE_INTERVAL', None)
if worker_patience_interval_seconds is not None:
worker_patience_interval = timedelta(seconds=int(worker_patience_interval_seconds))
else:
worker_patience_interval = HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT
except (ValueError, TypeError):
worker_patience_interval = HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT
worker_subprocesses = _create_airflow_worker_subprocesses(environ,
sigterm_patience_interval=worker_patience_interval)
run_subprocesses(scheduler_subprocesses + worker_subprocesses)
case _:
raise ValueError(f"Unexpected command: {cmd}")