images/airflow/2.10.3/python/mwaa/config/celery.py (23 lines of code) (raw):

"""Contain functions for retrieving Airflow Celery-related configuration.""" # Python imports from typing import Any import copy # 3rd party imports from airflow.providers.celery.executors.default_celery import DEFAULT_CELERY_CONFIG # Our import from mwaa.config.aws import get_aws_region from mwaa.config.sqs import get_sqs_queue_name, get_sqs_queue_url, should_use_ssl from mwaa.celery.sqs_broker import Transport from mwaa.utils import qualified_name def create_celery_config() -> dict[str, Any]: """ Generate the configuration that will be passed to Celery. This is used in the "celery" section of the Airflow configuration. :returns A dictionary containing the Celery configuration. """ # We use Airflow's default configuration and make the changes we want. celery_config: dict[str, Any] = copy.deepcopy(DEFAULT_CELERY_CONFIG) celery_config = { **celery_config, "broker_transport": qualified_name(Transport), "broker_transport_options": { **celery_config["broker_transport_options"], "predefined_queues": {get_sqs_queue_name(): {"url": get_sqs_queue_url()}}, "is_secure": should_use_ssl(), "region": get_aws_region(), }, "pool_pre_ping": True, "pool_recycle": 1200, } return celery_config MWAA_CELERY_CONFIG = create_celery_config()