images/airflow/2.9.2/python/mwaa/logging/cloudwatch_handlers.py (193 lines of code) (raw):

""" This module contain multiple log handlers to support integration with CloudWatch Logs. It contains the BaseLogHandler class, which has some common functionality needed by all of the handlers. It also contains a couple other handlers for different log categories, e.g. TaskLogHandler for handling task logs, SubprocessLogHandler for handling scheduler/worker/etc logs, and so on. """ # Python imports import logging import os import re import sys import traceback # 3rd party imports from airflow.models.taskinstance import TaskInstance from airflow.providers.amazon.aws.log.cloudwatch_task_handler import ( CloudwatchTaskHandler, ) from airflow.utils.helpers import parse_template_string from mypy_boto3_logs.client import CloudWatchLogsClient from typing import Dict import boto3 import socket import time import watchtower # Our imports from mwaa.logging.utils import parse_arn, throttle from mwaa.utils.statsd import get_statsd LOG_GROUP_INIT_WAIT_SECONDS = 900 ERROR_REPORTING_WAIT_SECONDS = 60 # fmt: off # IMPORTANT NOTE: The time complexity of log inspection is O(M*N) where M is the number # of logs and N is the number of log patterns. As such, each new pattern added will # increase the time complexity by O(M), which could be substantial adition for noisy # environments. Care should, thus, be taken to only add the really important patterns. # Additionally, there is no reason why we shouldn't add those _PATTERNS = [ # psycopg2.OperationalError (re.compile(r"psycopg2\.OperationalError"), "psycopg2"), # Timeout errors (re.compile( r"airflow\.exceptions\.AirflowTaskTimeout: DagBag import timeout for .+ after"), "DagImportTimeout"), (re.compile(r"airflow\.exceptions\.AirflowTaskTimeout"), "TaskTimeout"), # base_executor.py (re.compile(r"could not queue task"), "TaskQueueingFailure"), # celery_executor.py (re.compile(r"Adopted tasks were still pending after"), "AdoptedTaskStillPending"), (re.compile(r"Celery command failed on host:"), "CeleryCommandFailure"), (re.compile(r"Failed to execute task"), "CeleryTaskExecutionFailure"), (re.compile(r"execute_command encountered a CalledProcessError"), "ExecuteCommandCalledProcessError"), # dag_processing.py (re.compile( r"DagFileProcessorManager \(PID=.+\) last sent a heartbeat .+ seconds ago! Restarting it"), "DagFileProcessorManagerNoHeartbeat"), # dagrun.py (re.compile(r"Marking run .+ failed"), "DagRunFailure"), (re.compile(r"Deadlock; marking run .+ failed"), "DagRunDeadlock"), # taskinstance.py (re.compile(r"Recording the task instance as FAILED"), "TaskInstanceFailure"), # taskinstance.py and local_task_job.py (re.compile(r"Received SIGTERM\. Terminating subprocesses."), "SIGTERM"), # scheduler_job.py (re.compile(r"Couldn\'t find dag .+ in DagBag/DB!"), "DagNotFound"), (re.compile(r"Execution date is in future:"), "ExecutionDateInFuture"), # standard_task_runner.py (re.compile( r"Job .+ was killed before it finished (likely due to running out of memory)"), "JobKilled"), ] # fmt: on class BaseLogHandler(logging.Handler): """Shared functionality across our internal CloudWatch log handlers.""" def __init__(self, log_group_arn: str, kms_key_arn: str | None, enabled: bool): """ Initialize the instance. Arguments: log_group_arn - The ARN of the log group where logs will be published. kms_key_arn - The ARN of the KMS key to use when creating the log group if necessary. enabled - Whether this handler is actually enabled, or just does nothing. This makes it easier to control enabling and disabling logging without much changes to the logging configuration. """ self.log_group_arn = log_group_arn self.kms_key_arn = kms_key_arn self.enabled = enabled if not self.enabled: self._print( "CloudWatch logging is disabled for %s" % self.__class__.__name__ ) self.log_group_name, self.region_name = parse_arn(log_group_arn) self.handler = None self.logs_source = "Unknown" # TODO Find a nice and unambiguous solution to the craziness of super() and MRO. logging.Handler.__init__(self) self.stats = get_statsd() def create_watchtower_handler( self, stream_name: str, logs_source: str, send_interval_seconds: int = 10, use_queues: bool = True, ): """ Create the underlying Watchtower handler that we use to publish logs. Arguments: stream_name - The name of the log stream to publish logs under. logs_source - A string identifying the source the logs are coming from, e.g. "scheduler". This is used when publishing metrics about logging. send_interval_seconds - The interval at which to send logs to CloudWatch. use_queues - Whether to use batching to publish logs or not. This is usually desired for efficiency, but can have certain problems when used with multiprocessing. Use with extra care. """ logs_client: CloudWatchLogsClient = boto3.client("logs") # type: ignore if self.enabled: self.handler = watchtower.CloudWatchLogHandler( log_group_name=self.log_group_name, log_stream_name=stream_name, boto3_client=logs_client, use_queues=use_queues, send_interval=send_interval_seconds, create_log_group=False, ) if self.formatter: self.handler.setFormatter(self.formatter) self.logs_source = logs_source def close(self): """Close the log handler (by closing the underlying log handler).""" if self.handler is not None: self.handler.close() self.handler = None @throttle(ERROR_REPORTING_WAIT_SECONDS) def _report_logging_error(self, msg: str): """ Report an error related to logging. This method is used to report an error related to logging, along with the stack information to aid with debugging. This method is throttled to avoid logs pollution. Arguments: msg - The error message to report. """ self._print(f"MWAA logging error: {msg}") exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_exception(exc_type, exc_value, exc_traceback) def _print(self, msg: str): print(msg) def emit(self, record: logging.LogRecord): """ Emit log records. Arguments: record - The log record to emit. """ if self.handler: # This is a potentially noisy warning that we started seeing because we # are still not using pattern matching for metrics allow/block-listing. # As a temporary work-around, we are dropping these messages at the handler # level. We should, however, fix this issue by setting to True the # `metrics_use_pattern_match` flag. # More context: https://github.com/aws/amazon-mwaa-docker-images/issues/98 if "The basic metric validator will be deprecated" in record.getMessage(): return try: self.handler.emit(record) # type: ignore self.sniff_errors(record) except Exception: self.stats.incr(f"mwaa.logging.{self.logs_source}.emit_error", 1) self._report_logging_error("Failed to emit log record.") def sniff_errors(self, record: logging.LogRecord): """ Check the content of the logs for known errors and report them as metrics. Privacy Note: The logs here are customer logs and, thus, we cannot store them, so we only check them against a predefined list of Airflow errors and report a metric. :param record: The log record being sniffed. """ if not hasattr(record, "message"): return for pattern, metric_dim in _PATTERNS: if pattern.search(record.message): self.stats.incr(f"mwaa.error_log.{self.logs_source}.{metric_dim}") break def flush(self): """Flush remaining log records.""" if not self.handler: return try: self.handler.flush() except Exception: self.stats.incr(f"mwaa.logging.{self.logs_source}.flush_error", 1) self._report_logging_error("Failed to flush log records.") class TaskLogHandler(BaseLogHandler, CloudwatchTaskHandler): """A log handler used for Airflow task logs.""" # this option is required to be able to serve triggerer logs trigger_should_wrap = True def __init__( self, base_log_folder: str, log_group_arn: str, kms_key_arn: str | None, enabled: bool, ): """ Initialize the instance. :param log_group_arn - The ARN of the log group where logs will be published. :param kms_key_arn - The ARN of the KMS key to use when creating the log group if necessary. :param enabled - Whether this handler is actually enabled, or just does nothing. This makes it easier to control enabling and disabling logging without much changes to the logging configuration. """ BaseLogHandler.__init__(self, log_group_arn, kms_key_arn, enabled) CloudwatchTaskHandler.__init__( self, log_group_arn=log_group_arn, base_log_folder="", # We only push to CloudWatch Logs. ) def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: """ Provide context to the logger. This method is called by Airflow to provide the necessary context to configure the handler. In this case, Airflow is passing us the task instance the logs are for. :param ti: The task instance generating the logs. :param identifier: Airflow uses this when relaying exceptional messages to task logs from a context other than task itself. We ignore this parameter in this handler, i.e. those exceptional messages will go to the same log stream. """ # TODO Consider making use of the 'identifier' argument: # https://github.com/aws/amazon-mwaa-docker-images/issues/57 logs_client: CloudWatchLogsClient = boto3.client("logs") # type: ignore if self.enabled: # identical to open-source implementation, except create_log_group set to False self.handler = watchtower.CloudWatchLogHandler( log_group_name=self.log_group_name, log_stream_name=self._render_filename(ti, ti.try_number), # type: ignore boto3_client=logs_client, use_queues=True, create_log_group=False, ) if self.formatter: self.handler.setFormatter(self.formatter) else: self.handler = None def _event_to_str(self, event: Dict[str, str]) -> str: # When rendering logs in the UI, the open-source implementation prefixes the # logs by their timestamp metadata from the Cloudwatch response. Since the # default log format already includes a timestamp within the message, this # causes duplicate timestamps to appear and result in "invalid date" rendering # in the UI # # Open-source code: https://github.com/apache/airflow/blob/v2-7-stable/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py#L120 # # TODO: explore option to condition this function by the log_format. i.e. if it # does not include a timestamp, add it here. Since customers can freely change # the log_format, we should work towards accommodating both cases, but the # default behavior is more important return event["message"] class DagProcessorManagerLogHandler(BaseLogHandler): """ A log handler for logs generated by Airflow's DAG Processor Manager. The DAG Processor Manager is represented by the DagFileProcessorProcess class from Airflow ([1]), and shouldn't be confused with the DagFileProcessorProcess class [2] which is responsible for processing a single DAG file. [1] https://github.com/apache/airflow/blob/2.9.2/airflow/dag_processing/manager.py#L331 [2] https://github.com/apache/airflow/blob/2.9.2/airflow/dag_processing/processor.py#L69 """ def __init__( self, log_group_arn: str, kms_key_arn: str, stream_name: str, enabled: bool ): """ Initialize the instance. Arguments: log_group_arn - The ARN of the log group where logs will be published. kms_key_arn - The ARN of the KMS key to use when creating the log group if necessary. stream_name - The name of the stream under which logs will be published. enabled - Whether this handler is actually enabled, or just does nothing. This makes it easier to control enabling and disabling logging without much changes to the logging configuration. [1] https://airflow.apache.org/docs/apache-airflow/2.9.2/configurations-ref.html#config-logging-log-processor-filename-template """ super().__init__(log_group_arn, kms_key_arn, enabled) self.create_watchtower_handler(stream_name, "DAGProcessorManager") def _print(self, msg: str): # The DAG processing loggers are not started in the same way that the Web # Server, Scheduler, Worker are (which are standalone processes we start and # control). Instead, the DAG Processor is started from within Airflow code. All # of the output from that process is captured and fed to logging. So if the # logger itself emits logs, it creates a cycle. pass class DagProcessingLogHandler(BaseLogHandler): """ A log handler for logs generated during processing of a certain DAG. This shouldn't be confused with the DagProcessorManagerLogHandler class. See the documentation on the latter class for more information. """ def __init__( self, log_group_arn: str, kms_key_arn: str | None, stream_name_template: str, enabled: bool, ): """ Initialize the instance. Arguments: log_group_arn - The ARN of the log group where logs will be published. kms_key_arn - The ARN of the KMS key to use when creating the log group if necessary. stream_name_template - The template to use for generating the stream name. Currently, in the config.py file, we pass the "[logging] LOG_PROCESSOR_FILENAME_TEMPLATE" Airflow configuration [1]. enabled - Whether this handler is actually enabled, or just does nothing. This makes it easier to control enabling and disabling logging without much changes to the logging configuration. [1] https://airflow.apache.org/docs/apache-airflow/2.9.2/configurations-ref.html#config-logging-log-processor-filename-template """ super().__init__(log_group_arn, kms_key_arn, enabled) self.stream_name_template, self.filename_jinja_template = parse_template_string( stream_name_template ) def set_context(self, filename: str): """ Provide context to the logger. This method is called by Airflow to provide the necessary context to configure the handler. In this case, Airflow is passing us the name of the DAG file being processed. :param filename: The name of the DAG file being processed. """ stream_name = self._render_filename(filename) self.create_watchtower_handler( stream_name, logs_source="DAGProcessing", # cannot use queues/batching with DAG processing, since the DAG processor # process gets terminated without properly calling the flush() method on the # handler (most probably due to some multi-threading-related complexity.), # resulting in losing logs. use_queues=False, ) def _render_filename(self, filename: str) -> str: filename = os.path.basename(filename) if self.filename_jinja_template: formatted_filename = self.filename_jinja_template.render(filename=filename) elif self.stream_name_template: formatted_filename = self.stream_name_template.format(filename=filename) else: # Not expected to be run, but covering all bases. formatted_filename = filename return "scheduler_" + formatted_filename def _print(self, msg: str): # The DAG processing loggers are not started in the same way that the Web # Server, Scheduler, Worker are (which are standalone processes we start and # control). Instead, the DAG Processor is started from within Airflow code. All # of the output from that process is captured and fed to logging. So if the # logger itself emits logs, it creates a cycle. pass class SubprocessLogHandler(BaseLogHandler): """ A log handler for logs generated by subprocesses we run. This handler is used when standard Python logging mechanisms are not directly applicable, such as for logs from scheduler, worker, and other similar components. In such scenarios, we create a sub-process, capture its stdout and stderr, and push them to CloudWatch Logs. In contrast, for logs related to task execution or DAG processing, Airflow has a dedicated logger name that we can just define a logger for and we are all set. Hence, in summary, if we want to capture logs which are known to be generated via some Python loggers, then we shouldn't use this class, and instead out for one of the other classes in this module, or create a new one. If, however, we want to capture logs for a subprocess, e.g. scheduler, then we need to use this handler. """ def __init__( self, log_group_arn: str, kms_key_arn: str, stream_name_prefix: str, logs_source: str, enabled: bool, log_formatter: logging.Formatter | None = None, ): """ Initialize the instance. Arguments: log_group_arn - The ARN of the log group where logs will be published. kms_key_arn - The ARN of the KMS key to use when creating the log group if necessary. stream_name_prefix - The template to use for generating the stream name. Currently, in the config.py file, we pass the "[logging] LOG_PROCESSOR_FILENAME_TEMPLATE" Airflow configuration [1]. logs_source - A string identifying the source the logs are coming from, e.g. "scheduler". This is used when publishing metrics about logging. enabled - Whether this handler is actually enabled, or just does nothing. This makes it easier to control enabling and disabling logging without much changes to the logging configuration. """ super().__init__(log_group_arn, kms_key_arn, enabled) self.formatter = log_formatter hostname = socket.gethostname() epoch = time.time() # Use hostname and epoch timestamp as a combined primary key for stream name. # The hostname is very helpful for mapping between task logs and the worker that # executed the task. But the ECS Fargate hostnames (which are just an IP) are # not guaranteed unique and may be reused so include an epoch for uniqueness and # easy sorting chronologically. _stream_name = "%s_%s_%s.log" % (stream_name_prefix, hostname, epoch) self.create_watchtower_handler(_stream_name, logs_source)