images/airflow/2.10.1/python/mwaa/logging/config.py [40:185]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
}


def _get_kms_key_arn():
    return os.environ.get("MWAA__CORE__KMS_KEY_ARN", None)


def get_mwaa_logging_env_vars(source: str):
    """
    Retrieve the environment variables used to configure logging from a certain source.

    Each logging source, e.g. scheduler, DAG processing, is configurable via three
    environment variables:

    - MWAA__LOGGING__AIRFLOW_<source>_LOG_GROUP_ARN: The ARN of the log group to
      publish logs to. If this is not set, CloudWatch Logs integration will not be
      enabled.
    - MWAA__LOGGING__AIRFLOW_<source>_LOG_LEVEL: The level of logging, e.g. INFO.
    - MWAA__LOGGING__AIRFLOW_<source>_LOGS_ENABLED: Whether logging for this source is
      enabled or not.

    Given a certain source, this method returns a tuple of the values of these three
    variables.

    :param source: The source of logging. Can be any of the following: dagprocessor,
      scheduler, triggerer, task, webserver, or worker.

    :returns A tuple of (log_group_arn, log_level, logging_enabled)
    """
    log_group_arn = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_GROUP_ARN", None
    )
    log_level = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_LEVEL",
        logging.getLevelName(logging.INFO),
    )
    logging_enabled = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOGS_ENABLED", "false"
    )

    return (
        log_group_arn,
        log_level,
        logging_enabled.lower() == "true",
    )


def _configure_task_logging():
    log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars("task")
    if log_group_arn:
        # Setup CloudWatch logging.
        LOGGING_CONFIG["handlers"]["task"] = {
            "class": qualified_name(cloudwatch_handlers.TaskLogHandler),
            "formatter": "airflow",
            "filters": ["mask_secrets"],
            "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.task"].update(
            {
                "level": log_level,
                "propagate": False,
            }
        )


def _configure_dag_processing_logging():
    log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(
        "dagprocessor"
    )
    if log_group_arn:
        # Setup CloudWatch logging for DAG Processor Manager.
        LOGGING_CONFIG["handlers"]["processor_manager"] = {
            "class": qualified_name(cloudwatch_handlers.DagProcessorManagerLogHandler),
            "formatter": "airflow",
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name": os.path.basename(DAG_PROCESSOR_MANAGER_LOG_LOCATION),
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.processor_manager"] = {
            "handlers": ["processor_manager"],
            "level": log_level,
            "propagate": False,
        }

        # Setup CloudWatch logging for DAG processing.
        LOGGING_CONFIG["handlers"]["processor"] = {
            "class": qualified_name(cloudwatch_handlers.DagProcessingLogHandler),
            "formatter": "airflow",
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name_template": PROCESSOR_FILENAME_TEMPLATE,
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.processor"] = {
            "handlers": ["processor"],
            "level": log_level,
            "propagate": False,
        }


def _configure_subprocesses_logging(
    subprocess_name: str,
    log_group_arn: str | None,
    log_stream_name_prefix: str,
    log_level: str,
    logging_enabled: bool,
    log_formatter: logging.Formatter | None = None,
):
    logger_name = MWAA_LOGGERS[subprocess_name.lower()]
    handler_name = logger_name.replace(".", "_")
    if log_group_arn:
        LOGGING_CONFIG["handlers"][handler_name] = {
            "class": qualified_name(cloudwatch_handlers.SubprocessLogHandler),
            "formatter": "airflow",
            "filters": ["mask_secrets"],
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name_prefix": log_stream_name_prefix,
            "logs_source": subprocess_name,
            "enabled": logging_enabled,
            "log_formatter": log_formatter,
        }
        # Setup CloudWatch logging.
        LOGGING_CONFIG["loggers"][logger_name] = {
            "handlers": [handler_name],
            "level": log_level,
            "propagate": False,
        }


def _configure():
    _configure_task_logging()
    _configure_dag_processing_logging()
    # We run a standalone DAG Processor but we don't create a special logger for it
    # because Airflow already has a dedicated logger for it, so we just use that when
    # we run the "dag-processor" Airflow command.
    for comp in ["Worker", "Scheduler", "WebServer", "Triggerer"]:
        log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(comp)
        _configure_subprocesses_logging(
            comp,
            log_group_arn=log_group_arn,
            log_stream_name_prefix=comp.lower(),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



images/airflow/2.9.2/python/mwaa/logging/config.py [46:191]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
}


def _get_kms_key_arn():
    return os.environ.get("MWAA__CORE__KMS_KEY_ARN", None)


def get_mwaa_logging_env_vars(source: str):
    """
    Retrieve the environment variables used to configure logging from a certain source.

    Each logging source, e.g. scheduler, DAG processing, is configurable via three
    environment variables:

    - MWAA__LOGGING__AIRFLOW_<source>_LOG_GROUP_ARN: The ARN of the log group to
      publish logs to. If this is not set, CloudWatch Logs integration will not be
      enabled.
    - MWAA__LOGGING__AIRFLOW_<source>_LOG_LEVEL: The level of logging, e.g. INFO.
    - MWAA__LOGGING__AIRFLOW_<source>_LOGS_ENABLED: Whether logging for this source is
      enabled or not.

    Given a certain source, this method returns a tuple of the values of these three
    variables.

    :param source: The source of logging. Can be any of the following: dagprocessor,
      scheduler, triggerer, task, webserver, or worker.

    :returns A tuple of (log_group_arn, log_level, logging_enabled)
    """
    log_group_arn = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_GROUP_ARN", None
    )
    log_level = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_LEVEL",
        logging.getLevelName(logging.INFO),
    )
    logging_enabled = os.environ.get(
        f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOGS_ENABLED", "false"
    )

    return (
        log_group_arn,
        log_level,
        logging_enabled.lower() == "true",
    )


def _configure_task_logging():
    log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars("task")
    if log_group_arn:
        # Setup CloudWatch logging.
        LOGGING_CONFIG["handlers"]["task"] = {
            "class": qualified_name(cloudwatch_handlers.TaskLogHandler),
            "formatter": "airflow",
            "filters": ["mask_secrets"],
            "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.task"].update(
            {
                "level": log_level,
                "propagate": False,
            }
        )


def _configure_dag_processing_logging():
    log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(
        "dagprocessor"
    )
    if log_group_arn:
        # Setup CloudWatch logging for DAG Processor Manager.
        LOGGING_CONFIG["handlers"]["processor_manager"] = {
            "class": qualified_name(cloudwatch_handlers.DagProcessorManagerLogHandler),
            "formatter": "airflow",
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name": os.path.basename(DAG_PROCESSOR_MANAGER_LOG_LOCATION),
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.processor_manager"] = {
            "handlers": ["processor_manager"],
            "level": log_level,
            "propagate": False,
        }

        # Setup CloudWatch logging for DAG processing.
        LOGGING_CONFIG["handlers"]["processor"] = {
            "class": qualified_name(cloudwatch_handlers.DagProcessingLogHandler),
            "formatter": "airflow",
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name_template": PROCESSOR_FILENAME_TEMPLATE,
            "enabled": logging_enabled,
        }
        LOGGING_CONFIG["loggers"]["airflow.processor"] = {
            "handlers": ["processor"],
            "level": log_level,
            "propagate": False,
        }


def _configure_subprocesses_logging(
    subprocess_name: str,
    log_group_arn: str | None,
    log_stream_name_prefix: str,
    log_level: str,
    logging_enabled: bool,
    log_formatter: logging.Formatter | None = None,
):
    logger_name = MWAA_LOGGERS[subprocess_name.lower()]
    handler_name = logger_name.replace(".", "_")
    if log_group_arn:
        LOGGING_CONFIG["handlers"][handler_name] = {
            "class": qualified_name(cloudwatch_handlers.SubprocessLogHandler),
            "formatter": "airflow",
            "filters": ["mask_secrets"],
            "log_group_arn": log_group_arn,
            "kms_key_arn": _get_kms_key_arn(),
            "stream_name_prefix": log_stream_name_prefix,
            "logs_source": subprocess_name,
            "enabled": logging_enabled,
            "log_formatter": log_formatter,
        }
        # Setup CloudWatch logging.
        LOGGING_CONFIG["loggers"][logger_name] = {
            "handlers": [handler_name],
            "level": log_level,
            "propagate": False,
        }


def _configure():
    _configure_task_logging()
    _configure_dag_processing_logging()
    # We run a standalone DAG Processor but we don't create a special logger for it
    # because Airflow already has a dedicated logger for it, so we just use that when
    # we run the "dag-processor" Airflow command.
    for comp in ["Worker", "Scheduler", "WebServer", "Triggerer"]:
        log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(comp)
        _configure_subprocesses_logging(
            comp,
            log_group_arn=log_group_arn,
            log_stream_name_prefix=comp.lower(),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



