images/airflow/2.10.1/python/mwaa/logging/config.py (163 lines of code) (raw):
"""
A module which provides the logging configuration we use.
All loggers and handlers we use are defined here, both for Airflow and for our code.
For Airflow logging, we use their defaults and make the necessary changes to publish
logs to CloudWatch.
Airflow has a predefined list of loggers that are used throughout their code. For
example, a task logger called "airflow.task" is expected to be available for logs
related to tasks.
However, not everything being generated by Airflow uses the the standard Python logging
mechanism. For example, there is no single logger defined by Airflow that we can use to
capture all logs generated by the scheduler. Hence, to standardize the process, we run
sub-processes for cases like the schedulers, workers, etc., and then capture their
stdout and stderr and send them to a Python logger of our defining, e.g.
"mwaa.schedulers" for scheduler.
"""
# Python imports
import logging
import os
# 3rd party imports
from airflow.config_templates.airflow_local_settings import (
BASE_LOG_FOLDER,
DAG_PROCESSOR_MANAGER_LOG_LOCATION,
DEFAULT_LOGGING_CONFIG,
PROCESSOR_FILENAME_TEMPLATE,
)
# Our imports
from mwaa.logging import cloudwatch_handlers
from mwaa.utils import qualified_name
# We adopt the default logging configuration from Airflow and do the necessary changes
# to setup logging with CloudWatch Logs.
LOGGING_CONFIG = {
**DEFAULT_LOGGING_CONFIG,
}
def _get_kms_key_arn():
return os.environ.get("MWAA__CORE__KMS_KEY_ARN", None)
def get_mwaa_logging_env_vars(source: str):
"""
Retrieve the environment variables used to configure logging from a certain source.
Each logging source, e.g. scheduler, DAG processing, is configurable via three
environment variables:
- MWAA__LOGGING__AIRFLOW_<source>_LOG_GROUP_ARN: The ARN of the log group to
publish logs to. If this is not set, CloudWatch Logs integration will not be
enabled.
- MWAA__LOGGING__AIRFLOW_<source>_LOG_LEVEL: The level of logging, e.g. INFO.
- MWAA__LOGGING__AIRFLOW_<source>_LOGS_ENABLED: Whether logging for this source is
enabled or not.
Given a certain source, this method returns a tuple of the values of these three
variables.
:param source: The source of logging. Can be any of the following: dagprocessor,
scheduler, triggerer, task, webserver, or worker.
:returns A tuple of (log_group_arn, log_level, logging_enabled)
"""
log_group_arn = os.environ.get(
f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_GROUP_ARN", None
)
log_level = os.environ.get(
f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOG_LEVEL",
logging.getLevelName(logging.INFO),
)
logging_enabled = os.environ.get(
f"MWAA__LOGGING__AIRFLOW_{source.upper()}_LOGS_ENABLED", "false"
)
return (
log_group_arn,
log_level,
logging_enabled.lower() == "true",
)
def _configure_task_logging():
log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars("task")
if log_group_arn:
# Setup CloudWatch logging.
LOGGING_CONFIG["handlers"]["task"] = {
"class": qualified_name(cloudwatch_handlers.TaskLogHandler),
"formatter": "airflow",
"filters": ["mask_secrets"],
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"log_group_arn": log_group_arn,
"kms_key_arn": _get_kms_key_arn(),
"enabled": logging_enabled,
}
LOGGING_CONFIG["loggers"]["airflow.task"].update(
{
"level": log_level,
"propagate": False,
}
)
def _configure_dag_processing_logging():
log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(
"dagprocessor"
)
if log_group_arn:
# Setup CloudWatch logging for DAG Processor Manager.
LOGGING_CONFIG["handlers"]["processor_manager"] = {
"class": qualified_name(cloudwatch_handlers.DagProcessorManagerLogHandler),
"formatter": "airflow",
"log_group_arn": log_group_arn,
"kms_key_arn": _get_kms_key_arn(),
"stream_name": os.path.basename(DAG_PROCESSOR_MANAGER_LOG_LOCATION),
"enabled": logging_enabled,
}
LOGGING_CONFIG["loggers"]["airflow.processor_manager"] = {
"handlers": ["processor_manager"],
"level": log_level,
"propagate": False,
}
# Setup CloudWatch logging for DAG processing.
LOGGING_CONFIG["handlers"]["processor"] = {
"class": qualified_name(cloudwatch_handlers.DagProcessingLogHandler),
"formatter": "airflow",
"log_group_arn": log_group_arn,
"kms_key_arn": _get_kms_key_arn(),
"stream_name_template": PROCESSOR_FILENAME_TEMPLATE,
"enabled": logging_enabled,
}
LOGGING_CONFIG["loggers"]["airflow.processor"] = {
"handlers": ["processor"],
"level": log_level,
"propagate": False,
}
def _configure_subprocesses_logging(
subprocess_name: str,
log_group_arn: str | None,
log_stream_name_prefix: str,
log_level: str,
logging_enabled: bool,
log_formatter: logging.Formatter | None = None,
):
logger_name = MWAA_LOGGERS[subprocess_name.lower()]
handler_name = logger_name.replace(".", "_")
if log_group_arn:
LOGGING_CONFIG["handlers"][handler_name] = {
"class": qualified_name(cloudwatch_handlers.SubprocessLogHandler),
"formatter": "airflow",
"filters": ["mask_secrets"],
"log_group_arn": log_group_arn,
"kms_key_arn": _get_kms_key_arn(),
"stream_name_prefix": log_stream_name_prefix,
"logs_source": subprocess_name,
"enabled": logging_enabled,
"log_formatter": log_formatter,
}
# Setup CloudWatch logging.
LOGGING_CONFIG["loggers"][logger_name] = {
"handlers": [handler_name],
"level": log_level,
"propagate": False,
}
def _configure():
_configure_task_logging()
_configure_dag_processing_logging()
# We run a standalone DAG Processor but we don't create a special logger for it
# because Airflow already has a dedicated logger for it, so we just use that when
# we run the "dag-processor" Airflow command.
for comp in ["Worker", "Scheduler", "WebServer", "Triggerer"]:
log_group_arn, log_level, logging_enabled = get_mwaa_logging_env_vars(comp)
_configure_subprocesses_logging(
comp,
log_group_arn=log_group_arn,
log_stream_name_prefix=comp.lower(),
log_level=log_level,
logging_enabled=logging_enabled,
)
_configure_subprocesses_logging(
f"{comp}_requirements",
log_group_arn=log_group_arn,
log_stream_name_prefix="requirements_install",
log_level="INFO", # We always want to publish requirements logs.
logging_enabled=logging_enabled,
log_formatter=logging.Formatter('[%(levelname)s] - %(message)s')
)
_configure_subprocesses_logging(
f"{comp}_startup",
log_group_arn=log_group_arn,
log_stream_name_prefix="startup_script_execution",
log_level="INFO", # We always want to publish startup script logs.
logging_enabled=logging_enabled,
log_formatter=logging.Formatter('[%(levelname)s] - %(message)s')
)
# Airflow has a dedicated logger for the DAG Processor Manager
DAG_PROCESSOR_LOGGER_NAME = "airflow.processor_manager"
SCHEDULER_LOGGER_NAME = "mwaa.scheduler"
SCHEDULER_REQUIREMENTS_LOGGER_NAME = "mwaa.scheduler_requirements"
SCHEDULER_STARTUP_LOGGER_NAME = "mwaa.scheduler_startup"
TRIGGERER_LOGGER_NAME = "mwaa.triggerer"
TRIGGERER_REQUIREMENTS_LOGGER_NAME = "mwaa.triggerer_requirements"
TRIGGERER_STARTUP_LOGGER_NAME = "mwaa.triggerer_startup"
WEBSERVER_LOGGER_NAME = "mwaa.webserver"
WEBSERVER_REQUIREMENTS_LOGGER_NAME = "mwaa.webserver_requirements"
WEBSERVER_STARTUP_LOGGER_NAME = "mwaa.webserver_startup"
WORKER_LOGGER_NAME = "mwaa.worker"
WORKER_REQUIREMENTS_LOGGER_NAME = "mwaa.worker_requirements"
WORKER_STARTUP_LOGGER_NAME = "mwaa.worker_startup"
MWAA_LOGGERS = {
"scheduler": SCHEDULER_LOGGER_NAME,
"scheduler_requirements": SCHEDULER_REQUIREMENTS_LOGGER_NAME,
"scheduler_startup": SCHEDULER_STARTUP_LOGGER_NAME,
"triggerer": TRIGGERER_LOGGER_NAME,
"triggerer_requirements": TRIGGERER_REQUIREMENTS_LOGGER_NAME,
"triggerer_startup": TRIGGERER_STARTUP_LOGGER_NAME,
"webserver": WEBSERVER_LOGGER_NAME,
"webserver_requirements": WEBSERVER_REQUIREMENTS_LOGGER_NAME,
"webserver_startup": WEBSERVER_STARTUP_LOGGER_NAME,
"worker": WORKER_LOGGER_NAME,
"worker_requirements": WORKER_REQUIREMENTS_LOGGER_NAME,
"worker_startup": WORKER_STARTUP_LOGGER_NAME,
}
_configure()