images/airflow/2.10.1/python/mwaa/config/airflow.py [18:243]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger(__name__)


def _get_essential_airflow_executor_config(executor_type: str) -> Dict[str, str]:
    """
    Retrieve the environment variables required for executor. Currently, two executors
    are supported:
        - LocalExecutor: All tasks are run on a local process
        - CeleryExecutor (Default): All tasks are run on Celery worker processes

    :param executor_type A string indicating the type of executor to use.

    :returns A dictionary containing the environment variables.
    """

    match executor_type.lower():
        case 'localexecutor':
            return {
                "AIRFLOW__CORE__EXECUTOR": "LocalExecutor",
            }
        case 'celeryexecutor':
            celery_config_module_path = "mwaa.config.celery.MWAA_CELERY_CONFIG"
            return {
                "AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT": "43200",
                "AIRFLOW__CELERY__BROKER_URL": get_sqs_endpoint(),
                "AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS": celery_config_module_path,
                "AIRFLOW__CELERY__RESULT_BACKEND": f"db+{get_db_connection_string()}",
                "AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL": "False",
                "AIRFLOW__CORE__EXECUTOR": "CeleryExecutor",
                "AIRFLOW__OPERATORS__DEFAULT_QUEUE": get_sqs_queue_name(),
            }
        case _:
            raise ValueError(f"Executor type {executor_type} is not supported.")


def _get_essential_airflow_core_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "core" configuration section.

    :returns A dictionary containing the environment variables.
    """

    fernet_key = {}

    fernet_secret_json = os.environ.get("MWAA__CORE__FERNET_KEY")
    if fernet_secret_json:
        try:
            fernet_key = {
                "AIRFLOW__CORE__FERNET_KEY": json.loads(fernet_secret_json)["FernetKey"]
            }
        except:
            logger.warning(
                "Invalid value for fernet secret. Value not printed for security reasons.",
            )

    return {
        "AIRFLOW__CORE__LOAD_EXAMPLES": "False",
        **fernet_key,
    }


def _get_opinionated_airflow_core_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "core" configuration section.

    :returns A dictionary containing the environment variables.
    """

    return {
        "AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER": "True",
    }


def get_user_airflow_config() -> Dict[str, str]:
    """
    Retrieve the user-defined environment variables for Airflow configuration.

    The user is able to specify additional Airflow configuration by using the
    `MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS`, which should be a JSON object, with key-value
    pairs pertaining to what Airflow configuration environment variables the user wants
    to set.

    :returns A dictionary containing the environment variables.
    """

    airflow_config = {}

    airflow_config_secret = os.environ.get("MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS")
    if airflow_config_secret:
        try:
            airflow_config = json.loads(airflow_config_secret)
        except:
            logger.warning(
                "Invalid value for Airflow config secret. Value not printed for security reasons.",
            )

    return {**airflow_config}


def _get_essential_airflow_db_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "database" configuration section.

    :returns A dictionary containing the environment variables.
    """
    conn_string = get_db_connection_string()
    return {
        "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": conn_string,
    }


def _get_essential_airflow_logging_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "logging" configuration section.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS": "mwaa.logging.config.LOGGING_CONFIG",
    }


@cache
def _get_mwaa_cloudwatch_integration_config() -> Dict[str, str]:
    """
    Retrieve the environment variables required to enable CloudWatch Metrics integration.

    :returns A dictionary containing the environment variables.
    """
    enabled = (
        os.environ.get("MWAA__CLOUDWATCH_METRICS_INTEGRATION__ENABLED", "false").lower()
        == "true"
    )
    if not enabled:
        # MWAA CloudWatch Metrics integration isn't enabled.
        logging.info("MWAA CloudWatch Metrics integration is NOT enabled.")
        return {}

    logging.info("MWAA CloudWatch Metrics integration is enabled.")

    metrics_section = conf.getsection("metrics")
    if metrics_section is None:
        raise RuntimeError(
            "Unexpected error: couldn't find 'metrics' section in Airflow configuration."
        )
    metrics_defaults = {
        f"AIRFLOW__METRICS__{option.upper()}": conf.get_default_value("metrics", option)  # type: ignore
        for option in metrics_section.keys()
    }

    # In MWAA, we use the metrics for monitoring purposes, hence we don't allow the user
    # to override the Airflow configurations for metrics. However, we still give the
    # customer the ability to control metrics via the options below, which we process in
    # the sidecar. Hence, we save the customer-provided values for these metrics in a
    # volume that the sidecar has access to, but then force enable them in Airflow so
    # the latter always publish metrics.
    customer_config_path = os.environ.get(
        "MWAA__CLOUDWATCH_METRICS_INTEGRATION__CUSTOMER_CONFIG_PATH"
    )
    if customer_config_path:
        user_config = get_user_airflow_config()
        for option, default_value in [
            ("statsd_on", "True"),
            ("metrics_block_list", ""),
            ("metrics_allow_list", ""),
        ]:
            c = user_config.get(f"AIRFLOW__METRICS__{option.upper()}", default_value)
            config_path = os.path.join(customer_config_path, f"{option}.txt")
            try:
                with open(config_path, "w") as f:
                    print(c, file=f)  # type: ignore
            except:
                logger.error(
                    f"Failed to write {option} to {config_path}. This might "
                    f"result in metrics misconfiguration."
                )

    return {
        # We don't allow the user to change the metrics configuration as that can break
        # the integration with CloudWatch Metrics.
        **metrics_defaults,
        "AIRFLOW__METRICS__STATSD_ON": "True",
        "AIRFLOW__METRICS__STATSD_HOST": "localhost",
        "AIRFLOW__METRICS__STATSD_PORT": "8125",
        "AIRFLOW__METRICS__STATSD_PREFIX": "airflow",
        "AIRFLOW__METRICS__METRICS_BLOCK_LIST": "",
        "AIRFLOW__METRICS__METRICS_ALLOW_LIST": "",
    }


def _get_essential_airflow_scheduler_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "scheduler" configuration section.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR": "True",
    }


def _get_opinionated_airflow_scheduler_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "scheduler" configuration section.

    The difference between this and get_airflow_scheduler_config is that the config set
    here can be overridden by the user.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION": "False",
        "AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT": "1800.0",
    }


def _get_opinionated_airflow_secrets_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "secrets" configuration section.

    :returns A dictionary containing the environment variables.
    """
    connection_lookup_pattern = {"connections_lookup_pattern": "^(?!aws_default$).*$"}
    return {
        "AIRFLOW__SECRETS__BACKEND_KWARGS": json.dumps(connection_lookup_pattern),
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



images/airflow/2.9.2/python/mwaa/config/airflow.py [18:243]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger(__name__)


def _get_essential_airflow_executor_config(executor_type: str) -> Dict[str, str]:
    """
    Retrieve the environment variables required for executor. Currently, two executors
    are supported:
        - LocalExecutor: All tasks are run on a local process
        - CeleryExecutor (Default): All tasks are run on Celery worker processes

    :param executor_type A string indicating the type of executor to use.

    :returns A dictionary containing the environment variables.
    """

    match executor_type.lower():
        case 'localexecutor':
            return {
                "AIRFLOW__CORE__EXECUTOR": "LocalExecutor",
            }
        case 'celeryexecutor':
            celery_config_module_path = "mwaa.config.celery.MWAA_CELERY_CONFIG"
            return {
                "AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT": "43200",
                "AIRFLOW__CELERY__BROKER_URL": get_sqs_endpoint(),
                "AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS": celery_config_module_path,
                "AIRFLOW__CELERY__RESULT_BACKEND": f"db+{get_db_connection_string()}",
                "AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL": "False",
                "AIRFLOW__CORE__EXECUTOR": "CeleryExecutor",
                "AIRFLOW__OPERATORS__DEFAULT_QUEUE": get_sqs_queue_name(),
            }
        case _:
            raise ValueError(f"Executor type {executor_type} is not supported.")


def _get_essential_airflow_core_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "core" configuration section.

    :returns A dictionary containing the environment variables.
    """

    fernet_key = {}

    fernet_secret_json = os.environ.get("MWAA__CORE__FERNET_KEY")
    if fernet_secret_json:
        try:
            fernet_key = {
                "AIRFLOW__CORE__FERNET_KEY": json.loads(fernet_secret_json)["FernetKey"]
            }
        except:
            logger.warning(
                "Invalid value for fernet secret. Value not printed for security reasons.",
            )

    return {
        "AIRFLOW__CORE__LOAD_EXAMPLES": "False",
        **fernet_key,
    }


def _get_opinionated_airflow_core_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "core" configuration section.

    :returns A dictionary containing the environment variables.
    """

    return {
        "AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER": "True",
    }


def get_user_airflow_config() -> Dict[str, str]:
    """
    Retrieve the user-defined environment variables for Airflow configuration.

    The user is able to specify additional Airflow configuration by using the
    `MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS`, which should be a JSON object, with key-value
    pairs pertaining to what Airflow configuration environment variables the user wants
    to set.

    :returns A dictionary containing the environment variables.
    """

    airflow_config = {}

    airflow_config_secret = os.environ.get("MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS")
    if airflow_config_secret:
        try:
            airflow_config = json.loads(airflow_config_secret)
        except:
            logger.warning(
                "Invalid value for Airflow config secret. Value not printed for security reasons.",
            )

    return {**airflow_config}


def _get_essential_airflow_db_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "database" configuration section.

    :returns A dictionary containing the environment variables.
    """
    conn_string = get_db_connection_string()
    return {
        "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": conn_string,
    }


def _get_essential_airflow_logging_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "logging" configuration section.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS": "mwaa.logging.config.LOGGING_CONFIG",
    }


@cache
def _get_mwaa_cloudwatch_integration_config() -> Dict[str, str]:
    """
    Retrieve the environment variables required to enable CloudWatch Metrics integration.

    :returns A dictionary containing the environment variables.
    """
    enabled = (
        os.environ.get("MWAA__CLOUDWATCH_METRICS_INTEGRATION__ENABLED", "false").lower()
        == "true"
    )
    if not enabled:
        # MWAA CloudWatch Metrics integration isn't enabled.
        logging.info("MWAA CloudWatch Metrics integration is NOT enabled.")
        return {}

    logging.info("MWAA CloudWatch Metrics integration is enabled.")

    metrics_section = conf.getsection("metrics")
    if metrics_section is None:
        raise RuntimeError(
            "Unexpected error: couldn't find 'metrics' section in Airflow configuration."
        )
    metrics_defaults = {
        f"AIRFLOW__METRICS__{option.upper()}": conf.get_default_value("metrics", option)  # type: ignore
        for option in metrics_section.keys()
    }

    # In MWAA, we use the metrics for monitoring purposes, hence we don't allow the user
    # to override the Airflow configurations for metrics. However, we still give the
    # customer the ability to control metrics via the options below, which we process in
    # the sidecar. Hence, we save the customer-provided values for these metrics in a
    # volume that the sidecar has access to, but then force enable them in Airflow so
    # the latter always publish metrics.
    customer_config_path = os.environ.get(
        "MWAA__CLOUDWATCH_METRICS_INTEGRATION__CUSTOMER_CONFIG_PATH"
    )
    if customer_config_path:
        user_config = get_user_airflow_config()
        for option, default_value in [
            ("statsd_on", "True"),
            ("metrics_block_list", ""),
            ("metrics_allow_list", ""),
        ]:
            c = user_config.get(f"AIRFLOW__METRICS__{option.upper()}", default_value)
            config_path = os.path.join(customer_config_path, f"{option}.txt")
            try:
                with open(config_path, "w") as f:
                    print(c, file=f)  # type: ignore
            except:
                logger.error(
                    f"Failed to write {option} to {config_path}. This might "
                    f"result in metrics misconfiguration."
                )

    return {
        # We don't allow the user to change the metrics configuration as that can break
        # the integration with CloudWatch Metrics.
        **metrics_defaults,
        "AIRFLOW__METRICS__STATSD_ON": "True",
        "AIRFLOW__METRICS__STATSD_HOST": "localhost",
        "AIRFLOW__METRICS__STATSD_PORT": "8125",
        "AIRFLOW__METRICS__STATSD_PREFIX": "airflow",
        "AIRFLOW__METRICS__METRICS_BLOCK_LIST": "",
        "AIRFLOW__METRICS__METRICS_ALLOW_LIST": "",
    }


def _get_essential_airflow_scheduler_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "scheduler" configuration section.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR": "True",
    }


def _get_opinionated_airflow_scheduler_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "scheduler" configuration section.

    The difference between this and get_airflow_scheduler_config is that the config set
    here can be overridden by the user.

    :returns A dictionary containing the environment variables.
    """
    return {
        "AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION": "False",
        "AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT": "1800.0",
    }


def _get_opinionated_airflow_secrets_config() -> Dict[str, str]:
    """
    Retrieve the environment variables for Airflow's "secrets" configuration section.

    :returns A dictionary containing the environment variables.
    """
    connection_lookup_pattern = {"connections_lookup_pattern": "^(?!aws_default$).*$"}
    return {
        "AIRFLOW__SECRETS__BACKEND_KWARGS": json.dumps(connection_lookup_pattern),
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



