ecs_logging/_utils.py (99 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import collections.abc import json import functools from typing import Any, Dict, Mapping __all__ = [ "normalize_dict", "de_dot", "merge_dicts", "json_dumps", ] def flatten_dict(value: Mapping[str, Any]) -> Dict[str, Any]: """Adds dots to all nested fields in dictionaries. Raises an error if there are entries which are represented with different forms of nesting. (ie {"a": {"b": 1}, "a.b": 2}) """ top_level = {} for key, val in value.items(): if not isinstance(val, collections.abc.Mapping): if key in top_level: raise ValueError(f"Duplicate entry for '{key}' with different nesting") top_level[key] = val else: val = flatten_dict(val) for vkey, vval in val.items(): vkey = f"{key}.{vkey}" if vkey in top_level: raise ValueError( f"Duplicate entry for '{vkey}' with different nesting" ) top_level[vkey] = vval return top_level def normalize_dict(value: Dict[str, Any]) -> Dict[str, Any]: """Expands all dotted names to nested dictionaries""" if not isinstance(value, dict): return value keys = list(value.keys()) for key in keys: if "." in key: merge_dicts(de_dot(key, value.pop(key)), value) for key, val in value.items(): if isinstance(val, dict): normalize_dict(val) elif isinstance(val, list): val[:] = [normalize_dict(x) for x in val] return value def de_dot(dot_string: str, msg: Any) -> Dict[str, Any]: """Turn value and dotted string key into a nested dictionary""" arr = dot_string.split(".") ret = {arr[-1]: msg} for i in range(len(arr) - 2, -1, -1): ret = {arr[i]: ret} return ret def merge_dicts(from_: Dict[Any, Any], into: Dict[Any, Any]) -> Dict[Any, Any]: """Merge deeply nested dictionary structures. When called has side-effects within 'destination'. """ for key, value in from_.items(): into.setdefault(key, {}) if isinstance(value, dict) and isinstance(into[key], dict): merge_dicts(value, into[key]) elif into[key] != {}: raise TypeError( "Type mismatch at key `{}`: merging dicts would replace value `{}` with `{}`. This is likely due to " "dotted keys in the event dict being turned into nested dictionaries, causing a conflict.".format( key, into[key], value ) ) else: into[key] = value return into def json_dumps(value: Dict[str, Any]) -> str: # Ensure that the first three fields are '@timestamp', # 'log.level', and 'message' per ECS spec ordered_fields = [] try: ordered_fields.append(("@timestamp", value.pop("@timestamp"))) except KeyError: pass # log.level can either be nested or not nested so we have to try both try: ordered_fields.append(("log.level", value["log"].pop("level"))) if not value["log"]: # Remove the 'log' dictionary if it's now empty value.pop("log", None) except KeyError: try: ordered_fields.append(("log.level", value.pop("log.level"))) except KeyError: pass try: ordered_fields.append(("message", value.pop("message"))) except KeyError: pass json_dumps = functools.partial( json.dumps, sort_keys=True, separators=(",", ":"), default=_json_dumps_fallback ) # Because we want to use 'sorted_keys=True' we manually build # the first three keys and then build the rest with json.dumps() if ordered_fields: # Need to call json.dumps() on values just in # case the given values aren't strings (even though # they should be according to the spec) ordered_json = ",".join(f'"{k}":{json_dumps(v)}' for k, v in ordered_fields) if value: return "{{{},{}".format( ordered_json, json_dumps(value)[1:], ) else: return "{%s}" % ordered_json # If there are no fields with ordering requirements we # pass everything into json.dumps() else: return json_dumps(value) def _json_dumps_fallback(value: Any) -> Any: """ Fallback handler for json.dumps to handle objects json doesn't know how to serialize. """ try: # This is what structlog's json fallback does return value.__structlog__() except AttributeError: return repr(value)