in images/airflow/2.9.2/python/mwaa/execute_command.py [0:0]
def _create_airflow_worker_subprocesses(environ: Dict[str, str], sigterm_patience_interval: timedelta | None = None):
conditions = _create_airflow_process_conditions('worker')
# MWAA__CORE__TASK_MONITORING_ENABLED is set to 'true' for workers where we want to monitor count of tasks currently getting
# executed on the worker. This will be used to determine if idle worker checks are to be enabled.
task_monitoring_enabled = (
os.environ.get("MWAA__CORE__TASK_MONITORING_ENABLED", "false").lower() == "true"
)
# If MWAA__CORE__TERMINATE_IF_IDLE is set to 'true', then as part of the task monitoring if the task count reaches zero, then the
# worker will be terminated.
terminate_if_idle = (
os.environ.get("MWAA__CORE__TERMINATE_IF_IDLE", "false").lower() == "true" and task_monitoring_enabled
)
# If MWAA__CORE__MWAA_SIGNAL_HANDLING_ENABLED is set to 'true', then as part of the task monitoring, the monitor will expect certain
# signals to be sent from MWAA. These signals will represent MWAA service side events such as start of an environment update.
mwaa_signal_handling_enabled = (
os.environ.get("MWAA__CORE__MWAA_SIGNAL_HANDLING_ENABLED", "false").lower() == "true" and task_monitoring_enabled
)
mwaa_worker_idleness_verification_interval = int(environ.get("MWAA__CORE__WORKER_IDLENESS_VERIFICATION_INTERVAL",
"20"))
if task_monitoring_enabled:
logger.info(f"Worker task monitoring is enabled with idleness verification interval: "
f"{mwaa_worker_idleness_verification_interval}")
# Initializing the monitor responsible for performing idle worker checks if enabled.
worker_task_monitor = WorkerTaskMonitor(mwaa_signal_handling_enabled, mwaa_worker_idleness_verification_interval)
else:
logger.info("Worker task monitoring is NOT enabled.")
worker_task_monitor = None
if worker_task_monitor:
conditions.append(TaskMonitoringCondition(worker_task_monitor, terminate_if_idle))
def on_sigterm() -> None:
# When a SIGTERM is caught, we pause the Airflow Task consumption and wait 5 seconds in order
# for any in-flight messages in the SQS broker layer to be processed and
# corresponding Airflow task instance to be created. Once that is done, we can
# start gracefully shutting down the worker. Without this, the SQS broker may
# consume messages from the queue, terminate before creating the corresponding
# Airflow task instance and abandon SQS messages in-flight.
if worker_task_monitor:
worker_task_monitor.pause_task_consumption()
time.sleep(5)
# Finally, return the worker subprocesses.
return [
_create_airflow_subprocess(
["celery", "worker"],
environ=environ,
logger_name=WORKER_LOGGER_NAME,
friendly_name="worker",
conditions=conditions,
on_sigterm=on_sigterm,
sigterm_patience_interval=sigterm_patience_interval
)
]