ecs_logging/_stdlib.py (188 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import collections.abc import logging import sys import time from functools import lru_cache from traceback import format_tb from ._meta import ECS_VERSION from ._utils import ( de_dot, flatten_dict, json_dumps, merge_dicts, ) from typing import Any, Callable, Dict, Optional, Sequence, Union try: from typing import Literal # type: ignore except ImportError: from typing_extensions import Literal # type: ignore # Load the attributes of a LogRecord so if some are # added in the future we won't mistake them for 'extra=...' try: _LOGRECORD_DIR = set(dir(logging.LogRecord("", 0, "", 0, "", (), None))) except Exception: # LogRecord signature changed? _LOGRECORD_DIR = set() class StdlibFormatter(logging.Formatter): """ECS Formatter for the standard library ``logging`` module""" _LOGRECORD_DICT = { "name", "msg", "args", "asctime", "levelname", "levelno", "pathname", "filename", "module", "exc_info", "exc_text", "stack_info", "lineno", "funcName", "created", "msecs", "relativeCreated", "thread", "threadName", "processName", "process", "message", } | _LOGRECORD_DIR converter = time.gmtime def __init__( self, fmt: Optional[str] = None, datefmt: Optional[str] = None, style: Union[Literal["%"], Literal["{"], Literal["$"]] = "%", validate: Optional[bool] = None, stack_trace_limit: Optional[int] = None, extra: Optional[Dict[str, Any]] = None, exclude_fields: Sequence[str] = (), ) -> None: """Initialize the ECS formatter. :param int stack_trace_limit: Specifies the maximum number of frames to include for stack traces. Defaults to ``None`` which includes all available frames. Setting this to zero will suppress stack traces. This setting doesn't affect ``LogRecord.stack_info`` because this attribute is typically already pre-formatted. :param Optional[Dict[str, Any]] extra: Specifies the collection of meta-data fields to add to all records. :param Sequence[str] exclude_fields: Specifies any fields that should be suppressed from the resulting fields, expressed with dot notation:: exclude_keys=["error.stack_trace"] You can also use field prefixes to exclude whole groups of fields:: exclude_keys=["error"] """ _kwargs = {} if validate is not None: # validate was introduced in py3.8 so we need to only provide it if the user provided it _kwargs["validate"] = validate super().__init__( # type: ignore[call-arg] fmt=fmt, datefmt=datefmt, style=style, **_kwargs # type: ignore[arg-type] ) if stack_trace_limit is not None: if not isinstance(stack_trace_limit, int): raise TypeError( "'stack_trace_limit' must be None, or a non-negative integer" ) elif stack_trace_limit < 0: raise ValueError( "'stack_trace_limit' must be None, or a non-negative integer" ) if ( not isinstance(exclude_fields, collections.abc.Sequence) or isinstance(exclude_fields, str) or any(not isinstance(item, str) for item in exclude_fields) ): raise TypeError("'exclude_fields' must be a sequence of strings") self._extra = extra self._exclude_fields = frozenset(exclude_fields) self._stack_trace_limit = stack_trace_limit def _record_error_type(self, record: logging.LogRecord) -> Optional[str]: exc_info = record.exc_info if not exc_info: # exc_info is either an iterable or bool. If it doesn't # evaluate to True, then no error type is used. return None if isinstance(exc_info, bool): # if it is a bool, then look at sys.exc_info exc_info = sys.exc_info() if isinstance(exc_info, (list, tuple)) and exc_info[0] is not None: return exc_info[0].__name__ return None def _record_error_message(self, record: logging.LogRecord) -> Optional[str]: exc_info = record.exc_info if not exc_info: # exc_info is either an iterable or bool. If it doesn't # evaluate to True, then no error message is used. return None if isinstance(exc_info, bool): # if it is a bool, then look at sys.exc_info exc_info = sys.exc_info() if isinstance(exc_info, (list, tuple)) and exc_info[1]: return str(exc_info[1]) return None def format(self, record: logging.LogRecord) -> str: result = self.format_to_ecs(record) return json_dumps(result) def format_to_ecs(self, record: logging.LogRecord) -> Dict[str, Any]: """Function that can be overridden to add additional fields to (or remove fields from) the JSON before being dumped into a string. .. code-block: python class MyFormatter(StdlibFormatter): def format_to_ecs(self, record): result = super().format_to_ecs(record) del result["log"]["original"] # remove unwanted field(s) result["my_field"] = "my_value" # add custom field return result """ extractors: Dict[str, Callable[[logging.LogRecord], Any]] = { "@timestamp": self._record_timestamp, "ecs.version": lambda _: ECS_VERSION, "log.level": lambda r: (r.levelname.lower() if r.levelname else None), "log.origin.function": self._record_attribute("funcName"), "log.origin.file.line": self._record_attribute("lineno"), "log.origin.file.name": self._record_attribute("filename"), "log.original": lambda r: r.getMessage(), "log.logger": self._record_attribute("name"), "process.pid": self._record_attribute("process"), "process.name": self._record_attribute("processName"), "process.thread.id": self._record_attribute("thread"), "process.thread.name": self._record_attribute("threadName"), "error.type": self._record_error_type, "error.message": self._record_error_message, "error.stack_trace": self._record_error_stack_trace, } result: Dict[str, Any] = {} for field in set(extractors.keys()).difference(self._exclude_fields): if self._is_field_excluded(field): continue value = extractors[field](record) if value is not None: # special case ecs.version that should not be de-dotted if field == "ecs.version": field_dict = {field: value} else: field_dict = de_dot(field, value) merge_dicts(field_dict, result) available = record.__dict__ # This is cleverness because 'message' is NOT a member # key of ``record.__dict__`` the ``getMessage()`` method # is effectively ``msg % args`` (actual keys) By manually # adding 'message' to ``available``, it simplifies the code available["message"] = record.getMessage() # Pull all extras and flatten them to be sent into '_is_field_excluded' # since they can be defined as 'extras={"http": {"method": "GET"}}' extra_keys = set(available).difference(self._LOGRECORD_DICT) extras = flatten_dict({key: available[key] for key in extra_keys}) # Merge in any global extra's if self._extra is not None: for field, value in self._extra.items(): merge_dicts(de_dot(field, value), extras) # Pop all Elastic APM extras and add them # to standard tracing ECS fields. extras.setdefault("span.id", extras.pop("elasticapm_span_id", None)) extras.setdefault( "transaction.id", extras.pop("elasticapm_transaction_id", None) ) extras.setdefault("trace.id", extras.pop("elasticapm_trace_id", None)) extras.setdefault("service.name", extras.pop("elasticapm_service_name", None)) extras.setdefault( "service.environment", extras.pop("elasticapm_service_environment", None) ) # Merge in any keys that were set within 'extra={...}' for field, value in extras.items(): if field.startswith("elasticapm_labels."): continue # Unconditionally remove, we don't need this info. if value is None or self._is_field_excluded(field): continue merge_dicts(de_dot(field, value), result) # The following is mostly for the ecs format. You can't have 2x # 'message' keys in _WANTED_ATTRS, so we set the value to # 'log.original' in ecs, and this code block guarantees it # still appears as 'message' too. if not self._is_field_excluded("message"): result.setdefault("message", available["message"]) return result @lru_cache() def _is_field_excluded(self, field: str) -> bool: field_path = [] for path in field.split("."): field_path.append(path) if ".".join(field_path) in self._exclude_fields: return True return False def _record_timestamp(self, record: logging.LogRecord) -> str: return "%s.%03dZ" % ( self.formatTime(record, datefmt="%Y-%m-%dT%H:%M:%S"), record.msecs, ) def _record_attribute( self, attribute: str ) -> Callable[[logging.LogRecord], Optional[Any]]: return lambda r: getattr(r, attribute, None) def _record_error_stack_trace(self, record: logging.LogRecord) -> Optional[str]: # Using stack_info=True will add 'error.stack_trace' even # if the type is not 'error', exc_info=True only gathers # when there's an active exception. if ( record.exc_info and record.exc_info[2] is not None and (self._stack_trace_limit is None or self._stack_trace_limit > 0) ): return ( "".join(format_tb(record.exc_info[2], limit=self._stack_trace_limit)) or None ) # LogRecord only has 'stack_info' if it's passed via .log(..., stack_info=True) stack_info = getattr(record, "stack_info", None) if stack_info: return str(stack_info) return None