def _create_airflow_worker_subprocesses()

in images/airflow/2.10.3/python/mwaa/entrypoint.py [0:0]


def _create_airflow_worker_subprocesses(environ: Dict[str, str], sigterm_patience_interval: timedelta | None = None):
    conditions = _create_airflow_process_conditions('worker')
    # MWAA__CORE__TASK_MONITORING_ENABLED is set to 'true' for workers where we want to monitor count of tasks currently getting
    # executed on the worker. This will be used to determine if idle worker checks are to be enabled.
    task_monitoring_enabled = (
        os.environ.get("MWAA__CORE__TASK_MONITORING_ENABLED", "false").lower() == "true"
    )
    # If MWAA__CORE__TERMINATE_IF_IDLE is set to 'true', then as part of the task monitoring if the task count reaches zero, then the
    # worker will be terminated.
    terminate_if_idle = (
        os.environ.get("MWAA__CORE__TERMINATE_IF_IDLE", "false").lower() == "true" and task_monitoring_enabled
    )
    # If MWAA__CORE__MWAA_SIGNAL_HANDLING_ENABLED is set to 'true', then as part of the task monitoring, the monitor will expect certain
    # signals to be sent from MWAA. These signals will represent MWAA service side events such as start of an environment update.
    mwaa_signal_handling_enabled = (
        os.environ.get("MWAA__CORE__MWAA_SIGNAL_HANDLING_ENABLED", "false").lower() == "true" and task_monitoring_enabled
    )

    mwaa_worker_idleness_verification_interval = int(environ.get("MWAA__CORE__WORKER_IDLENESS_VERIFICATION_INTERVAL",
                                                               "20"))

    if task_monitoring_enabled:
        logger.info(f"Worker task monitoring is enabled with idleness verification interval: "
                    f"{mwaa_worker_idleness_verification_interval}")
        # Initializing the monitor responsible for performing idle worker checks if enabled.
        worker_task_monitor = WorkerTaskMonitor(mwaa_signal_handling_enabled, mwaa_worker_idleness_verification_interval)
    else:
        logger.info("Worker task monitoring is NOT enabled.")
        worker_task_monitor = None

    if worker_task_monitor:
        conditions.append(TaskMonitoringCondition(worker_task_monitor, terminate_if_idle))

    def on_sigterm() -> None:
        # When a SIGTERM is caught, we pause the Airflow Task consumption and wait 5 seconds in order
        # for any in-flight messages in the SQS broker layer to be processed and
        # corresponding Airflow task instance to be created. Once that is done, we can
        # start gracefully shutting down the worker. Without this, the SQS broker may
        # consume messages from the queue, terminate before creating the corresponding
        # Airflow task instance and abandon SQS messages in-flight.
        if worker_task_monitor:
            worker_task_monitor.pause_task_consumption()
            time.sleep(5)

    # Finally, return the worker subprocesses.
    return [
            create_airflow_subprocess(
                ["celery", "worker"],
                environ=environ,
                logger_name=WORKER_LOGGER_NAME,
                friendly_name="worker",
                conditions=conditions,
                on_sigterm=on_sigterm,
                sigterm_patience_interval=sigterm_patience_interval
            )
        ]