images/airflow/2.10.3/python/mwaa/logging/config.py (163 lines of code) (raw):

""" A module which provides the logging configuration we use. All loggers and handlers we use are defined here, both for Airflow and for our code. For Airflow logging, we use their defaults and make the necessary changes to publish logs to CloudWatch. Airflow has a predefined list of loggers that are used throughout their code. For example, a task logger called "airflow.task" is expected to be available for logs related to tasks. However, not everything being generated by Airflow uses the the standard Python logging mechanism. For example, there is no single logger defined by Airflow that we can use to capture all logs generated by the scheduler. Hence, to standardize the process, we run sub-processes for cases like the schedulers, workers, etc., and then capture their stdout and stderr and send them to a Python logger of our defining, e.g. "mwaa.schedulers" for scheduler. """ # Python imports import logging import os # 3rd party imports from airflow.config_templates.airflow_local_settings import ( BASE_LOG_FOLDER, DAG_PROCESSOR_MANAGER_LOG_LOCATION, DEFAULT_LOGGING_CONFIG, PROCESSOR_FILENAME_TEMPLATE, ) # Our imports from mwaa.logging import cloudwatch_handlers from mwaa.utils import qualified_name # We adopt the default logging configuration from Airflow and do the necessary changes # to setup logging with CloudWatch Logs. LOGGING_CONFIG = { **DEFAULT_LOGGING_CONFIG, } def _get_kms_key_arn(): return os.environ.get("MWAA__CORE__KMS_KEY_ARN", None) def get_mwaa_logging_env_vars(source: str): """ Retrieve the environment variables used to configure logging from a certain source. Each logging source, e.g. scheduler, DAG processing, is configurable via three environment variables: - MWAA__LOGGING__AIRFLOW_<source>_LOG_GROUP_ARN: The ARN of the log group to publish logs to. If this is not set, CloudWatch Logs integration will not be enabled. - MWAA__LOGGING__AIRFLOW_<source>_LOG_LEVEL: The level of logging, e.g. INFO. - MWAA__LOGGING__AIRFLOW_<source>_LOGS_ENABLED: Whether logging for this source is enabled or not. Given a certain source, this method returns a tuple of the values of these three variables. :param source: The source of logging. Can be any of the following: dagprocessor, scheduler, triggerer, task, webserver, or worker. :returns A tuple of (log_group_arn, log_level, logging_enabled) """ log_group_arn = os.environ.get( f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_GROUP_ARN", None ) log_level = os.environ.get( f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_LEVEL", logging.getLevelName(logging.INFO), ) logging_enabled = os.environ.get( f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOGS_ENABLED", "false" ) return ( log_group_arn, log_level, logging_enabled.lower() == "true", ) def _configure_task_logging(): log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars("task") if log_group_arn: # Setup CloudWatch logging. LOGGING_CONFIG["handlers"]["task"] = { "class": qualified_name(cloudwatch_handlers.TaskLogHandler), "formatter": "airflow", "filters": ["mask_secrets"], "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), "log_group_arn": log_group_arn, "kms_key_arn": _get_kms_key_arn(), "enabled": logging_enabled, } LOGGING_CONFIG["loggers"]["airflow.task"].update( { "level": log_level, "propagate": False, } ) def _configure_dag_processing_logging(): log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars( "dagprocessor" ) if log_group_arn: # Setup CloudWatch logging for DAG Processor Manager. LOGGING_CONFIG["handlers"]["processor_manager"] = { "class": qualified_name(cloudwatch_handlers.DagProcessorManagerLogHandler), "formatter": "airflow", "log_group_arn": log_group_arn, "kms_key_arn": _get_kms_key_arn(), "stream_name": os.path.basename(DAG_PROCESSOR_MANAGER_LOG_LOCATION), "enabled": logging_enabled, } LOGGING_CONFIG["loggers"]["airflow.processor_manager"] = { "handlers": ["processor_manager"], "level": log_level, "propagate": False, } # Setup CloudWatch logging for DAG processing. LOGGING_CONFIG["handlers"]["processor"] = { "class": qualified_name(cloudwatch_handlers.DagProcessingLogHandler), "formatter": "airflow", "log_group_arn": log_group_arn, "kms_key_arn": _get_kms_key_arn(), "stream_name_template": PROCESSOR_FILENAME_TEMPLATE, "enabled": logging_enabled, } LOGGING_CONFIG["loggers"]["airflow.processor"] = { "handlers": ["processor"], "level": log_level, "propagate": False, } def _configure_subprocesses_logging( subprocess_name: str, log_group_arn: str | None, log_stream_name_prefix: str, log_level: str, logging_enabled: bool, log_formatter: logging.Formatter | None = None, ): logger_name = MWAA_LOGGERS[subprocess_name.lower()] handler_name = logger_name.replace(".", "_") if log_group_arn: LOGGING_CONFIG["handlers"][handler_name] = { "class": qualified_name(cloudwatch_handlers.SubprocessLogHandler), "formatter": "airflow", "filters": ["mask_secrets"], "log_group_arn": log_group_arn, "kms_key_arn": _get_kms_key_arn(), "stream_name_prefix": log_stream_name_prefix, "logs_source": subprocess_name, "enabled": logging_enabled, "log_formatter": log_formatter, } # Setup CloudWatch logging. LOGGING_CONFIG["loggers"][logger_name] = { "handlers": [handler_name], "level": log_level, "propagate": False, } def _configure(): _configure_task_logging() _configure_dag_processing_logging() # We run a standalone DAG Processor but we don't create a special logger for it # because Airflow already has a dedicated logger for it, so we just use that when # we run the "dag-processor" Airflow command. for comp in ["Worker", "Scheduler", "WebServer", "Triggerer"]: log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(comp) _configure_subprocesses_logging( comp, log_group_arn=log_group_arn, log_stream_name_prefix=comp.lower(), log_level=log_level, logging_enabled=logging_enabled, ) _configure_subprocesses_logging( f"{comp}_requirements", log_group_arn=log_group_arn, log_stream_name_prefix="requirements_install", log_level="INFO", # We always want to publish requirements logs. logging_enabled=logging_enabled, log_formatter=logging.Formatter('[%(levelname)s] - %(message)s') ) _configure_subprocesses_logging( f"{comp}_startup", log_group_arn=log_group_arn, log_stream_name_prefix="startup_script_execution", log_level="INFO", # We always want to publish startup script logs. logging_enabled=logging_enabled, log_formatter=logging.Formatter('[%(levelname)s] - %(message)s') ) # Airflow has a dedicated logger for the DAG Processor Manager DAG_PROCESSOR_LOGGER_NAME = "airflow.processor_manager" SCHEDULER_LOGGER_NAME = "mwaa.scheduler" SCHEDULER_REQUIREMENTS_LOGGER_NAME = "mwaa.scheduler_requirements" SCHEDULER_STARTUP_LOGGER_NAME = "mwaa.scheduler_startup" TRIGGERER_LOGGER_NAME = "mwaa.triggerer" TRIGGERER_REQUIREMENTS_LOGGER_NAME = "mwaa.triggerer_requirements" TRIGGERER_STARTUP_LOGGER_NAME = "mwaa.triggerer_startup" WEBSERVER_LOGGER_NAME = "mwaa.webserver" WEBSERVER_REQUIREMENTS_LOGGER_NAME = "mwaa.webserver_requirements" WEBSERVER_STARTUP_LOGGER_NAME = "mwaa.webserver_startup" WORKER_LOGGER_NAME = "mwaa.worker" WORKER_REQUIREMENTS_LOGGER_NAME = "mwaa.worker_requirements" WORKER_STARTUP_LOGGER_NAME = "mwaa.worker_startup" MWAA_LOGGERS = { "scheduler": SCHEDULER_LOGGER_NAME, "scheduler_requirements": SCHEDULER_REQUIREMENTS_LOGGER_NAME, "scheduler_startup": SCHEDULER_STARTUP_LOGGER_NAME, "triggerer": TRIGGERER_LOGGER_NAME, "triggerer_requirements": TRIGGERER_REQUIREMENTS_LOGGER_NAME, "triggerer_startup": TRIGGERER_STARTUP_LOGGER_NAME, "webserver": WEBSERVER_LOGGER_NAME, "webserver_requirements": WEBSERVER_REQUIREMENTS_LOGGER_NAME, "webserver_startup": WEBSERVER_STARTUP_LOGGER_NAME, "worker": WORKER_LOGGER_NAME, "worker_requirements": WORKER_REQUIREMENTS_LOGGER_NAME, "worker_startup": WORKER_STARTUP_LOGGER_NAME, } _configure()