esrally/log.py (108 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
import logging.config
import os
import time
import typing
import ecs_logging
from esrally import paths
from esrally.utils import collections, io
# pylint: disable=unused-argument
def configure_utc_formatter(*args: typing.Any, **kwargs: typing.Any) -> logging.Formatter:
"""
Logging formatter that renders timestamps UTC, or in the local system time zone when the user requests it.
"""
formatter = logging.Formatter(fmt=kwargs["format"], datefmt=kwargs["datefmt"])
user_tz = kwargs.get("timezone", None)
if user_tz == "localtime":
formatter.converter = time.localtime
else:
formatter.converter = time.gmtime
return formatter
MutatorType = typing.Callable[[logging.LogRecord, dict[str, typing.Any]], None]
class RallyEcsFormatter(ecs_logging.StdlibFormatter):
def __init__(
self,
*args: typing.Any,
mutators: typing.Optional[list[MutatorType]] = None,
**kwargs: typing.Any,
):
super().__init__(*args, **kwargs)
self.mutators = mutators or []
def format_to_ecs(self, record: logging.LogRecord) -> dict[str, typing.Any]:
log_dict = super().format_to_ecs(record)
self.apply_mutators(record, log_dict)
return log_dict
def apply_mutators(self, record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
for mutator in self.mutators:
mutator(record, log_dict)
def rename_actor_fields(record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
fields = {}
if log_dict.get("actorAddress"):
fields["address"] = log_dict.pop("actorAddress")
if fields:
collections.deep_update(log_dict, {"rally": {"thespian": fields}})
# Special case for asyncio fields as they are not part of the standard ECS log dict
def rename_async_fields(record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
fields = {}
if hasattr(record, "taskName") and record.taskName is not None:
fields["task"] = record.taskName
if fields:
collections.deep_update(log_dict, {"python": {"asyncio": fields}})
def configure_ecs_formatter(*args: typing.Any, **kwargs: typing.Any) -> ecs_logging.StdlibFormatter:
"""
ECS Logging formatter
"""
fmt = kwargs.pop("format", None)
configurator = logging.config.BaseConfigurator({})
mutators = kwargs.pop("mutators", [rename_actor_fields, rename_async_fields])
mutators = [fn if callable(fn) else configurator.resolve(fn) for fn in mutators]
formatter = RallyEcsFormatter(fmt=fmt, mutators=mutators, *args, **kwargs)
return formatter
def log_config_path():
"""
:return: The absolute path to Rally's log configuration file.
"""
return os.path.join(paths.rally_confdir(), "logging.json")
def add_missing_loggers_to_config():
"""
Ensures that any missing top level loggers in resources/logging.json are
appended to an existing log configuration
"""
def _missing_loggers(source, target):
"""
Returns any top-level loggers present in 'source', but not in 'target'
:return: A dict of all loggers present in 'source', but not in 'target'
"""
missing_loggers = {}
for logger in source:
if logger in source and logger in target:
continue
else:
missing_loggers[logger] = source[logger]
return missing_loggers
source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json"))
with open(log_config_path(), encoding="UTF-8") as target:
with open(source_path, encoding="UTF-8") as src:
template = json.load(src)
existing_logging_config = json.load(target)
if missing_loggers := _missing_loggers(source=template["loggers"], target=existing_logging_config["loggers"]):
existing_logging_config["loggers"].update(missing_loggers)
updated_config = json.dumps(existing_logging_config, indent=2)
if missing_loggers:
with open(log_config_path(), "w", encoding="UTF-8") as target:
target.write(updated_config)
def install_default_log_config():
"""
Ensures a log configuration file is present on this machine. The default
log configuration is based on the template in resources/logging.json.
It also ensures that the default log path has been created so log files
can be successfully opened in that directory.
"""
log_config = log_config_path()
if not io.exists(log_config):
io.ensure_dir(io.dirname(log_config))
source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json"))
with open(log_config, "w", encoding="UTF-8") as target:
with open(source_path, encoding="UTF-8") as src:
contents = src.read()
target.write(contents)
add_missing_loggers_to_config()
io.ensure_dir(paths.logs())
# pylint: disable=unused-argument
def configure_file_handler(*args, **kwargs) -> logging.Handler:
"""
Configures the WatchedFileHandler supporting expansion of `~` and `${LOG_PATH}` to the user's home and the log path respectively.
"""
filename = kwargs.pop("filename").replace("${LOG_PATH}", paths.logs())
return logging.handlers.WatchedFileHandler(filename=filename, encoding=kwargs["encoding"], delay=kwargs.get("delay", False))
def configure_profile_file_handler(*args, **kwargs) -> logging.Handler:
"""
Configures the FileHandler supporting expansion of `~` and `${LOG_PATH}` to the user's home and the log path respectively.
"""
filename = kwargs.pop("filename").replace("${LOG_PATH}", paths.logs())
return logging.FileHandler(filename=filename, encoding=kwargs["encoding"], delay=kwargs.get("delay", False))
def load_configuration():
"""
Loads the logging configuration. This is a low-level method and usually
`configure_logging()` should be used instead.
:return: The logging configuration as `dict` instance.
"""
with open(log_config_path()) as f:
return json.load(f)
def post_configure_actor_logging():
"""
Reconfigures all loggers in actor processes.
See https://groups.google.com/forum/#!topic/thespianpy/FntU9umtvhc for the rationale.
"""
# see configure_logging()
logging.captureWarnings(True)
# at this point we can assume that a log configuration exists. It has been created already during startup.
logger_configuration = load_configuration()
if "root" in logger_configuration and "level" in logger_configuration["root"]:
root_logger = logging.getLogger()
root_logger.setLevel(logger_configuration["root"]["level"])
if "loggers" in logger_configuration:
for lgr, cfg in load_configuration()["loggers"].items():
if "level" in cfg:
logging.getLogger(lgr).setLevel(cfg["level"])
def configure_logging():
"""
Configures logging for the current process.
"""
logging.config.dictConfig(load_configuration())
# Avoid failures such as the following (shortened a bit):
#
# ---------------------------------------------------------------------------------------------
# "esrally/driver/driver.py", line 220, in create_client
# "thespian-3.8.0-py3.5.egg/thespian/actors.py", line 187, in createActor
# [...]
# "thespian-3.8.0-py3.5.egg/thespian/system/multiprocCommon.py", line 348, in _startChildActor
# "python3.5/multiprocessing/process.py", line 105, in start
# "python3.5/multiprocessing/context.py", line 267, in _Popen
# "python3.5/multiprocessing/popen_fork.py", line 18, in __init__
# sys.stderr.flush()
#
# OSError: [Errno 5] Input/output error
# ---------------------------------------------------------------------------------------------
#
# This is caused by urllib3 wanting to send warnings about insecure SSL connections to stderr when we disable them (in client.py) with:
#
# urllib3.disable_warnings()
#
# The filtering functionality of the warnings module causes the error above on some systems. If we instead redirect the warning output
# to our logs instead of stderr (which is the warnings module's default), we can disable warnings safely.
logging.captureWarnings(True)