pathology/shared_libs/logging_lib/cloud_logging_client_instance.py (545 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Wrapper for cloud ops structured logging.
Get instance of logger:
logger() -> CloudLoggingClient
logger().log_signature = dict to append to logs.
"""
import collections
import copy
import enum
import inspect
import logging
import math
import os
import sys
import threading
import time
import traceback
from typing import Any, Mapping, MutableMapping, Optional, Tuple, Union
from absl import logging as absl_logging
import google.auth
from google.cloud import logging as cloud_logging
# Debug/testing option to logs to absl.logger. Automatically set when
# running unit tests
DEBUG_LOGGING_USE_ABSL_LOGGING = bool(
'UNITTEST_ON_FORGE' in os.environ or 'unittest' in sys.modules
)
class _LogSeverity(enum.Enum):
CRITICAL = logging.CRITICAL
ERROR = logging.ERROR
WARNING = logging.WARNING
INFO = logging.INFO
DEBUG = logging.DEBUG
MAX_LOG_SIZE = 246000
class CloudLoggerInstanceExceptionError(Exception):
pass
def _merge_struct(
dict_tuple: Tuple[Union[Mapping[str, Any], Exception, None], ...],
) -> Optional[MutableMapping[str, str]]:
"""Merges a list of dict and ordered dicts.
* for dict adds item in key sorted order
* preserves order for ordered dicts.
Args:
dict_tuple: dicts and ordered dicts to merge
Returns:
merged dict.
"""
if not dict_tuple:
return None
return_dict = collections.OrderedDict()
for dt in dict_tuple:
if dt is None:
continue
if isinstance(dt, Exception):
# Log exception text and exception stack trace.
exception_str = str(dt)
if exception_str:
exception_str = f'{exception_str}\n'
return_dict['exception'] = f'{exception_str}{traceback.format_exc()}'
else:
keylist = list(dt)
if not isinstance(dt, collections.OrderedDict):
keylist = sorted(keylist)
for key in keylist:
return_dict[key] = str(dt[key])
return return_dict
def _absl_log(msg: str, severity: _LogSeverity = _LogSeverity.INFO) -> None:
"""Logs using absl logging.
Args:
msg: Message to log.
severity: Severity of message.
"""
if severity == _LogSeverity.DEBUG:
absl_logging.debug(msg)
elif severity == _LogSeverity.WARNING:
absl_logging.warning(msg)
elif severity == _LogSeverity.INFO:
absl_logging.info(msg)
else:
absl_logging.error(msg)
def _py_log(
dpas_logger: logging.Logger,
msg: str,
extra: Mapping[str, Any],
severity: _LogSeverity,
) -> None:
"""Logs msg and structured logging using python logger."""
if severity == _LogSeverity.DEBUG:
dpas_logger.debug(msg, extra=extra)
elif severity == _LogSeverity.WARNING:
dpas_logger.warning(msg, extra=extra)
elif severity == _LogSeverity.INFO:
dpas_logger.info(msg, extra=extra)
elif severity == _LogSeverity.CRITICAL:
dpas_logger.critical(msg, extra=extra)
elif severity == _LogSeverity.ERROR:
dpas_logger.error(msg, extra=extra)
else:
raise CloudLoggerInstanceExceptionError(
f'Unsupported logging severity level; Severity="{severity}"'
)
def _get_source_location_to_log(stack_frames_back: int) -> Mapping[str, Any]:
"""Adds Python source location information to cloud structured logs.
The source location is added by adding (and overwriting if present) a
"source_location" key to the provided additional_parameters.
The value corresponding to that key is a dict mapping:
"file" to the name of the file (str) containing the logging statement,
"function" python function/method calling logging method
"line" to the line number where the log was recorded (int).
Args:
stack_frames_back: Additional stack frames back to log source_location.
Returns:
Source location formatted for structured logging.
Raises:
ValueError: If stack frame cannot be found for specified position.
"""
source_location = {}
current_frame = inspect.currentframe()
for _ in range(stack_frames_back + 1):
if current_frame is None:
raise ValueError('Cannot get stack frame for specified position.')
current_frame = current_frame.f_back
try:
frame_info = inspect.getframeinfo(current_frame)
source_location['source_location'] = dict(
file=frame_info.filename,
function=frame_info.function,
line=frame_info.lineno,
)
finally:
# https://docs.python.org/3/library/inspect.html
del current_frame # explicitly deleting
return source_location
def _add_trace_to_log(
project_id: str, trace_key: str, struct: Mapping[str, Any]
) -> Mapping[str, Any]:
if not project_id or not trace_key:
return {}
trace_id = struct.get(trace_key, '')
if trace_id:
return {'trace': f'projects/{project_id}/traces/{trace_id}'}
return {}
class CloudLoggingClientInstance:
"""Wrapper for cloud ops structured logging.
Automatically adds signature to structured logs to make traceable.
"""
# global state to prevent duplicate initialization of cloud logging interfaces
# within a process.
_global_lock = threading.Lock()
# Cloud logging handler init at process level.
_cloud_logging_handler: Optional[
cloud_logging.handlers.CloudLoggingHandler
] = None
_cloud_logging_handler_init_params = ''
@classmethod
def _init_fork_module_state(cls) -> None:
cls._global_lock = threading.Lock()
cls._cloud_logging_handler = None
cls._cloud_logging_handler_init_params = ''
@classmethod
def fork_shutdown(cls) -> None:
with cls._global_lock:
cls._cloud_logging_handler_init_params = ''
handler = cls._cloud_logging_handler
if handler is None:
return
handler.transport.worker.stop()
logging.getLogger().removeHandler(handler)
handler.close()
cls._cloud_logging_handler = None
@classmethod
def _set_absl_skip_frames(cls) -> None:
"""Sets absl logging attribution to skip over internal logging frames."""
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='_log',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='_absl_log',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='debug',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='timed_debug',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='info',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='warning',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='error',
)
absl_logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='critical',
)
def __init__(
self,
log_name: str = 'python',
gcp_project_to_write_logs_to: str = '',
gcp_credentials: Optional[google.auth.credentials.Credentials] = None,
pod_hostname: str = '',
pod_uid: str = '',
enable_structured_logging: bool = True,
use_absl_logging: bool = DEBUG_LOGGING_USE_ABSL_LOGGING,
log_all_python_logs_to_cloud: bool = False,
enabled: bool = True,
log_error_level: int = _LogSeverity.DEBUG.value,
per_thread_log_signatures: bool = True,
trace_key: str = '',
build_version: str = '',
):
"""Constructor.
Args:
log_name: Log name to write logs to.
gcp_project_to_write_logs_to: GCP project name to write log to. Undefined
= default.
gcp_credentials: The OAuth2 Credentials to use for this client
(None=default).
pod_hostname: Host name of GKE pod. Should be empty if not running in GKE.
pod_uid: UID of GKE pod. Should be empty if not running in GKE.
enable_structured_logging: Enable structured logging.
use_absl_logging: Send logs to absl logging instead of cloud_logging.
log_all_python_logs_to_cloud: Logs everything to cloud.
enabled: If disabled, logging is not initialized and logging operations
are nops.
log_error_level: Error level at which logger will log.
per_thread_log_signatures: Log signatures reported per thread.
trace_key: Log key value which contains a trace id value.
build_version: Build version to embedd in logs container.
"""
# lock for log makes access to singleton
# safe across threads. Logging used in main thread and ack_timeout_mon
CloudLoggingClientInstance._set_absl_skip_frames()
self._build_version = build_version
self._enabled = enabled
self._trace_key = trace_key
self._log_error_level = log_error_level
self._log_lock = threading.RLock()
self._log_name = log_name.strip()
self._pod_hostname = pod_hostname.strip()
self._pod_uid = pod_uid.strip()
self._per_thread_log_signatures = per_thread_log_signatures
self._thread_local_storage = threading.local()
self._shared_log_signature = self._signature_defaults(0)
self._enable_structured_logging = enable_structured_logging
self._debug_log_time = time.time()
self._gcp_project_name = gcp_project_to_write_logs_to.strip()
self._use_absl_logging = use_absl_logging
self._log_all_python_logs_to_cloud = log_all_python_logs_to_cloud
self._gcp_credentials = gcp_credentials
absl_logging.set_verbosity(absl_logging.INFO)
self._python_logger = self._init_cloud_handler()
@property
def trace_key(self) -> str:
return self._trace_key
@trace_key.setter
def trace_key(self, val: str) -> None:
self._trace_key = val
@property
def per_thread_log_signatures(self) -> bool:
return self._per_thread_log_signatures
@per_thread_log_signatures.setter
def per_thread_log_signatures(self, val: bool) -> None:
with self._log_lock:
self._per_thread_log_signatures = val
@property
def python_logger(self) -> logging.Logger:
if (
self._enabled
and not self._use_absl_logging
and CloudLoggingClientInstance._cloud_logging_handler is None
):
self._python_logger = self._init_cloud_handler()
return self._python_logger
def _get_python_logger_name(self) -> Optional[str]:
return None if self._log_all_python_logs_to_cloud else 'DPASLogger'
def _get_cloud_logging_handler_init_params(self) -> str:
return (
f'GCP_PROJECT_NAME: {self._gcp_project_name}; LOG_NAME:'
f' {self._log_name}; LOG_ALL: {self._log_all_python_logs_to_cloud}'
)
def _init_cloud_handler(self) -> logging.Logger:
"""Initializes cloud logging handler and returns python logger."""
# Instantiates a cloud logging client to generate text logs for cloud
# operations
with CloudLoggingClientInstance._global_lock:
if not self._enabled or self._use_absl_logging:
return logging.getLogger() # Default PY logger
handler_instance_init_params = (
self._get_cloud_logging_handler_init_params()
)
if CloudLoggingClientInstance._cloud_logging_handler is not None:
running_handler_init_params = (
CloudLoggingClientInstance._cloud_logging_handler_init_params
)
if running_handler_init_params != handler_instance_init_params:
# Call fork_shutdown to shutdown the process's named logging handler.
raise CloudLoggerInstanceExceptionError(
'Cloud logging handler is running with parameters that do not'
' match instance defined parameters. Running handler parameters:'
f' {running_handler_init_params}; Instance parameters:'
f' {handler_instance_init_params}'
)
return logging.getLogger(self._get_python_logger_name())
log_name = self.log_name
struct_log = {}
struct_log['log_name'] = log_name
struct_log['log_all_python_logs'] = self._log_all_python_logs_to_cloud
try:
# Attach default python & absl logger to also write to named log.
logging_client = cloud_logging.Client(
project=self._gcp_project_name if self._gcp_project_name else None,
credentials=self._gcp_credentials,
)
logging_client.project = (
self._gcp_project_name if self._gcp_project_name else None
)
handler = cloud_logging.handlers.CloudLoggingHandler(
client=logging_client,
name=log_name,
)
CloudLoggingClientInstance._cloud_logging_handler = handler
CloudLoggingClientInstance._cloud_logging_handler_init_params = (
handler_instance_init_params
)
cloud_logging.handlers.setup_logging(
handler,
log_level=logging.DEBUG
if self._log_all_python_logs_to_cloud
else logging.INFO,
)
dpas_python_logger = logging.getLogger(self._get_python_logger_name())
dpas_python_logger.setLevel(
logging.DEBUG
) # pytype: disable=attribute-error
return dpas_python_logger
except google.auth.exceptions.DefaultCredentialsError as exp:
self._use_absl_logging = True
self.error('Error initializing logging.', struct_log, exp)
return logging.getLogger()
except Exception as exp:
self._use_absl_logging = True
self.error('Error unexpected exception.', struct_log, exp)
raise
def __getstate__(self) -> MutableMapping[str, Any]:
"""Returns log state for pickle removes lock."""
dct = copy.copy(self.__dict__)
del dct['_log_lock']
del dct['_python_logger']
del dct['_thread_local_storage']
return dct
def __setstate__(self, dct: MutableMapping[str, Any]):
"""Un-pickles class and re-creates log lock."""
self.__dict__ = dct
self._log_lock = threading.RLock()
self._thread_local_storage = threading.local()
# Re-init logging in process.
self._python_logger = self._init_cloud_handler()
def use_absl_logging(self) -> bool:
return self._use_absl_logging
@property
def enable_structured_logging(self) -> bool:
return self._enable_structured_logging
@enable_structured_logging.setter
def enable_structured_logging(self, val: bool) -> None:
with self._log_lock:
self._enable_structured_logging = val
@property
def enable(self) -> bool:
return self._enabled
@enable.setter
def enable(self, val: bool) -> None:
with self._log_lock:
self._enabled = val
@property
def gcp_project_name(self) -> str:
return self._gcp_project_name
@property
def log_name(self) -> str:
if not self._log_name:
raise ValueError('Undefined Log Name')
return self._log_name
@property
def hostname(self) -> str:
if not self._pod_hostname:
raise ValueError('POD_HOSTNAME name is not defined.')
return self._pod_hostname
@property
def build_version(self) -> str:
"""Returns build version # for container."""
return self._build_version
@build_version.setter
def build_version(self, version: str) -> None:
"""Returns build version # for container."""
self._build_version = version
with self._log_lock:
if self._per_thread_log_signatures:
if not hasattr(self._thread_local_storage, 'signature'):
self._thread_local_storage.signature = self._signature_defaults(
threading.get_native_id()
)
log_sig = self._thread_local_storage.signature
else:
log_sig = self._shared_log_signature
if not version and 'BUILD_VERSION' in log_sig:
del log_sig['BUILD_VERSION']
else:
log_sig['BUILD_VERSION'] = str(version)
@property
def pod_uid(self) -> str:
if not self._pod_uid:
raise ValueError('Undefined POD UID')
return self._pod_uid
def _get_thread_signature(self) -> MutableMapping[str, Any]:
if not self._per_thread_log_signatures:
return self._shared_log_signature
if not hasattr(self._thread_local_storage, 'signature'):
self._thread_local_storage.signature = self._signature_defaults(
threading.get_native_id()
)
return self._thread_local_storage.signature
@property
def log_signature(self) -> MutableMapping[str, Any]:
"""Returns log signature.
Log signature returned may not match what is currently being logged.
if thread is set to log using another threads log signature.
"""
with self._log_lock:
return copy.copy(self._get_thread_signature())
def _signature_defaults(self, thread_id: int) -> MutableMapping[str, str]:
"""Returns default log signature."""
log_signature = collections.OrderedDict()
if self._pod_hostname:
log_signature['HOSTNAME'] = str(self._pod_hostname)
if self._pod_uid:
log_signature['POD_UID'] = str(self._pod_uid)
if self.build_version:
log_signature['BUILD_VERSION'] = str(self.build_version)
if self._per_thread_log_signatures:
log_signature['THREAD_ID'] = str(thread_id)
return log_signature
@log_signature.setter
def log_signature(self, sig: Mapping[str, Any]) -> None:
"""Sets log signature.
Log signature of thread may not be altered if thread is set to log using
another threads log signature.
Args:
sig: Signature for threads to logs to use.
"""
with self._log_lock:
if self._per_thread_log_signatures:
thread_id = threading.get_native_id()
if not hasattr(self._thread_local_storage, 'log_signature'):
self._thread_local_storage.signature = collections.OrderedDict()
log_sig = self._thread_local_storage.signature
else:
thread_id = 0
log_sig = self._shared_log_signature
log_sig.clear()
if sig is not None:
for key in sorted(sig):
log_sig[str(key)] = str(sig[key])
log_sig.update(self._signature_defaults(thread_id))
def clear_log_signature(self) -> None:
"""Clears thread log signature."""
with self._log_lock:
if self._per_thread_log_signatures:
self._thread_local_storage.signature = self._signature_defaults(
threading.get_native_id()
)
else:
self._shared_log_signature = self._signature_defaults(0)
def _clip_struct_log(
self, log: MutableMapping[str, Any], max_log_size: int
) -> None:
"""Clip log if structed log exceeds structured log size limits.
Clipping logic:
log size = total sum of key + value sizes of log structure
log['message'] and signature components are not clipped to keep
log message text un-altered and message traceability preserved.
log structure keys are not altered.
Structured logs exceeding size typically have a massive component which.
First try to clip the log by just clipping the largest clippable value.
If log still exceeds size. Proportionally clip log values.
Args:
log: Structured log.
max_log_size: Max_size of the log.
Returns:
None
Raises:
ValueError if log cannot be clipped to maxsize.
"""
# determine length of total message key + value
total_size = 0
total_value_size = 0
for key, value in log.items():
total_size += len(key)
total_value_size += len(value)
total_size += total_value_size
exceeds_log = total_size - max_log_size
if exceeds_log <= 0:
return
# remove keys for log values not being adjusted
# message is not adjust
# message signature is not adjusted
log_keys = set(log)
excluded_key_msg_length = 0
excluded_key_msg_value_length = 0
excluded_keys = list(self._get_thread_signature())
excluded_keys.append('message')
for excluded_key in excluded_keys:
if excluded_key not in log_keys:
continue
value_len = len(str(log[excluded_key]))
excluded_key_msg_length += len(excluded_key) + value_len
excluded_key_msg_value_length += value_len
log_keys.remove(excluded_key)
if excluded_key_msg_length >= max_log_size:
raise ValueError('Message exceeds logging msg length limit.')
total_value_size -= excluded_key_msg_value_length
# message exceeded length limits due to components in structure log
self._log(
'Next log message exceed cloud ops length limit and as clipped.',
severity=_LogSeverity.WARNING,
struct=tuple(),
stack_frames_back=0,
)
# First clip largest entry. In most cases a message will have one
# very large tag which causes the length issue first clip the single
# largest entry so its at most not bigger than the second largest entry.
if len(log_keys) > 1:
key_size_list = []
for key in log_keys:
key_size_list.append((key, len(log[key])))
key_size_list = sorted(key_size_list, key=lambda x: x[1])
largest_key = key_size_list[-1][0]
# difference in size between largest and second largest entry
largest_key_size_delta = key_size_list[-1][1] - key_size_list[-2][1]
clip_len = min(largest_key_size_delta, exceeds_log)
if clip_len > 0:
log[largest_key] = log[largest_key][:-clip_len]
# adjust length that needs to be trimmed
exceeds_log -= clip_len
if exceeds_log == 0:
return
# adjust total size of trimmable value component
total_value_size -= clip_len
# Proportionally clip all tags
new_exceeds_log = exceeds_log
# iterate over a sorted list to make clipping deterministic
for key in sorted(list(log_keys)):
entry_size = len(log[key])
clip_len = math.ceil(entry_size * exceeds_log / total_value_size)
clip_len = min(min(clip_len, new_exceeds_log), entry_size)
if clip_len > 0:
log[key] = log[key][:-clip_len]
new_exceeds_log -= clip_len
if new_exceeds_log == 0:
return
raise ValueError('Message exceeds logging msg length limit.')
def _merge_signature(
self, struct: Optional[MutableMapping[str, Any]]
) -> MutableMapping[str, Any]:
"""Adds signature to logging struct.
Args:
struct: logging struct.
Returns:
Dict to log
"""
if struct is None:
struct = collections.OrderedDict()
struct.update(self._get_thread_signature())
return struct
def _log(
self,
msg: str,
severity: _LogSeverity,
struct: Tuple[Union[Mapping[str, Any], Exception, None], ...],
stack_frames_back: int = 0,
):
"""Posts structured log message, adds current_msg id to log structure.
Args:
msg: Message to log.
severity: Severity level of message.
struct: Structure to log.
stack_frames_back: Additional stack frames back to log source_location.
"""
if not self._enabled:
return
with self._log_lock:
if severity.value < self._log_error_level:
return
struct = self._merge_signature(_merge_struct(struct))
if not self.use_absl_logging() and self._enable_structured_logging:
# Log using structured logs
source_location = _get_source_location_to_log(stack_frames_back + 1)
trace = _add_trace_to_log(
self._gcp_project_name, self._trace_key, struct
)
self._clip_struct_log(struct, MAX_LOG_SIZE)
_py_log(
self.python_logger,
msg,
extra={'json_fields': struct, **source_location, **trace},
severity=severity,
)
return
# Log using unstructured logs.
structure_str = [msg]
for key in struct:
structure_str.append(f'{key}: {struct[key]}')
_absl_log('; '.join(structure_str), severity=severity)
def debug(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with debug severity.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
self._log(msg, _LogSeverity.DEBUG, struct, 1 + stack_frames_back)
def timed_debug(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with debug severity and elapsed time since last timed debug log.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
time_now = time.time()
elapsed_time = '%.3f' % (time_now - self._debug_log_time)
self._debug_log_time = time_now
msg = f'[{elapsed_time}] {msg}'
self._log(msg, _LogSeverity.DEBUG, struct, 1 + stack_frames_back)
def info(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with info severity.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
self._log(msg, _LogSeverity.INFO, struct, 1 + stack_frames_back)
def warning(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with warning severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
self._log(msg, _LogSeverity.WARNING, struct, 1 + stack_frames_back)
def error(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with error severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
self._log(msg, _LogSeverity.ERROR, struct, 1 + stack_frames_back)
def critical(
self,
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with critical severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
self._log(msg, _LogSeverity.CRITICAL, struct, 1 + stack_frames_back)
@property
def log_error_level(self) -> int:
return self._log_error_level
@log_error_level.setter
def log_error_level(self, level: int) -> None:
with self._log_lock:
self._log_error_level = level
# Logging interfaces are used from processes which are forked (gunicorn,
# DICOM Proxy, Orchestrator, Refresher). In Python, forked processes do not
# copy threads running within parent processes or re-initialize global/module
# state. This can result in forked modules being executed with invalid global
# state, e.g., acquired locks that will not release or references to invalid
# state. The cloud logging library utilizes a background thread transporting
# logs to cloud. The background threading is not compatible with forking and
# will seg-fault (python queue wait). This can be avoided, by stopping the
# background transport prior to forking and then restarting the transport
# following the fork.
os.register_at_fork(
before=CloudLoggingClientInstance.fork_shutdown, # pylint: disable=protected-access
after_in_child=CloudLoggingClientInstance._init_fork_module_state, # pylint: disable=protected-access
)