def _get_mwaa_cloudwatch_integration_config()

in images/airflow/2.9.2/python/mwaa/config/airflow.py [0:0]


def _get_mwaa_cloudwatch_integration_config() -> Dict[str, str]:
    """
    Retrieve the environment variables required to enable CloudWatch Metrics integration.

    :returns A dictionary containing the environment variables.
    """
    enabled = (
        os.environ.get("MWAA__CLOUDWATCH_METRICS_INTEGRATION__ENABLED", "false").lower()
        == "true"
    )
    if not enabled:
        # MWAA CloudWatch Metrics integration isn't enabled.
        logging.info("MWAA CloudWatch Metrics integration is NOT enabled.")
        return {}

    logging.info("MWAA CloudWatch Metrics integration is enabled.")

    metrics_section = conf.getsection("metrics")
    if metrics_section is None:
        raise RuntimeError(
            "Unexpected error: couldn't find 'metrics' section in Airflow configuration."
        )
    metrics_defaults = {
        f"AIRFLOW__METRICS__{option.upper()}": conf.get_default_value("metrics", option)  # type: ignore
        for option in metrics_section.keys()
    }

    # In MWAA, we use the metrics for monitoring purposes, hence we don't allow the user
    # to override the Airflow configurations for metrics. However, we still give the
    # customer the ability to control metrics via the options below, which we process in
    # the sidecar. Hence, we save the customer-provided values for these metrics in a
    # volume that the sidecar has access to, but then force enable them in Airflow so
    # the latter always publish metrics.
    customer_config_path = os.environ.get(
        "MWAA__CLOUDWATCH_METRICS_INTEGRATION__CUSTOMER_CONFIG_PATH"
    )
    if customer_config_path:
        user_config = get_user_airflow_config()
        for option, default_value in [
            ("statsd_on", "True"),
            ("metrics_block_list", ""),
            ("metrics_allow_list", ""),
        ]:
            c = user_config.get(f"AIRFLOW__METRICS__{option.upper()}", default_value)
            config_path = os.path.join(customer_config_path, f"{option}.txt")
            try:
                with open(config_path, "w") as f:
                    print(c, file=f)  # type: ignore
            except:
                logger.error(
                    f"Failed to write {option} to {config_path}. This might "
                    f"result in metrics misconfiguration."
                )

    return {
        # We don't allow the user to change the metrics configuration as that can break
        # the integration with CloudWatch Metrics.
        **metrics_defaults,
        "AIRFLOW__METRICS__STATSD_ON": "True",
        "AIRFLOW__METRICS__STATSD_HOST": "localhost",
        "AIRFLOW__METRICS__STATSD_PORT": "8125",
        "AIRFLOW__METRICS__STATSD_PREFIX": "airflow",
        "AIRFLOW__METRICS__METRICS_BLOCK_LIST": "",
        "AIRFLOW__METRICS__METRICS_ALLOW_LIST": "",
    }