pathology/shared_libs/logging_lib/cloud_logging_client.py (259 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Wrapper for cloud ops structured logging."""
from __future__ import annotations
import os
import sys
import threading
from typing import Any, Mapping, Optional, Union
from absl import flags
from absl import logging
import google.auth
import psutil
from pathology.shared_libs.flags import secret_flag_utils
from pathology.shared_libs.logging_lib import cloud_logging_client_instance
# name of cloud ops log
CLOUD_OPS_LOG_NAME_FLG = flags.DEFINE_string(
'ops_log_name',
secret_flag_utils.get_secret_or_env('CLOUD_OPS_LOG_NAME', 'python'),
'Cloud ops log name to write logs to.',
)
CLOUD_OPS_LOG_PROJECT_FLG = flags.DEFINE_string(
'ops_log_project',
secret_flag_utils.get_secret_or_env('CLOUD_OPS_LOG_PROJECT', None),
'GCP project name to write log to. Undefined = default',
)
POD_HOSTNAME_FLG = flags.DEFINE_string(
'pod_hostname',
secret_flag_utils.get_secret_or_env('HOSTNAME', None),
'Host name of GKE pod. Set by container ENV. '
'Set to mock value in unit test.',
)
POD_UID_FLG = flags.DEFINE_string(
'pod_uid',
secret_flag_utils.get_secret_or_env('MY_POD_UID', None),
'UID of GKE pod. Do not set unless in test.',
)
ENABLE_STRUCTURED_LOGGING_FLG = flags.DEFINE_boolean(
'enable_structured_logging',
secret_flag_utils.get_bool_secret_or_env(
'ENABLE_STRUCTURED_LOGGING',
not secret_flag_utils.get_bool_secret_or_env(
'DISABLE_STRUCTURED_LOGGING'
),
),
'Enable structured logging.',
)
ENABLE_LOGGING_FLG = flags.DEFINE_boolean(
'enable_logging',
secret_flag_utils.get_bool_secret_or_env('ENABLE_LOGGING_FLG', True),
'Enable logging.',
)
_DEBUG_LOGGING_USE_ABSL_LOGGING_FLG = flags.DEFINE_boolean(
'debug_logging_use_absl_logging',
# Confusing double negative is used to enable external env be a postive
# statement and override the existing default
not secret_flag_utils.get_bool_secret_or_env(
'ENABLE_CLOUD_LOGGING',
not cloud_logging_client_instance.DEBUG_LOGGING_USE_ABSL_LOGGING,
),
'Debug/testing option to logs to absl.logger. Automatically set when '
'running unit tests.',
)
LOG_ALL_PYTHON_LOGS_TO_CLOUD_FLG = flags.DEFINE_boolean(
'log_all_python_logs_to_cloud',
secret_flag_utils.get_bool_secret_or_env('LOG_ALL_PYTHON_LOGS_TO_CLOUD'),
'Logs every modules log to Cloud Ops.',
)
PER_THREAD_LOG_SIGNATURES_FLG = flags.DEFINE_boolean(
'per_thread_log_signatures',
secret_flag_utils.get_bool_secret_or_env('PER_THREAD_LOG_SIGNATURES', True),
'If True Log signatures are not shared are across threads if false '
'Process threads share a common log signature',
)
def _are_flags_initialized() -> bool:
"""Returns True if flags are initialized."""
try:
return CLOUD_OPS_LOG_PROJECT_FLG.value is not None
except (flags.UnparsedFlagAccessError, AttributeError):
return False
def _get_flags() -> Mapping[str, str]:
load_flags = {}
unparsed_flags = []
for flag_name in flags.FLAGS:
try:
load_flags[flag_name] = flags.FLAGS.__getattr__(flag_name)
except flags.UnparsedFlagAccessError:
unparsed_flags.append(flag_name)
if unparsed_flags:
load_flags['unparsed_flags'] = ', '.join(unparsed_flags)
return load_flags
def _default_gcp_project() -> str:
try:
_, project = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
return project if project is not None else ''
except google.auth.exceptions.DefaultCredentialsError:
return ''
class CloudLoggingClient(
cloud_logging_client_instance.CloudLoggingClientInstance
):
"""Wrapper for cloud ops structured logging.
Automatically adds signature to structured logs to make traceable.
"""
# lock for log makes access to singleton
# safe across threads. Logging used in main thread and ack_timeout_mon
_singleton_instance: Optional[CloudLoggingClient] = None
_startup_message_logged = False
_singleton_lock = threading.RLock()
@classmethod
def _init_fork_module_state(cls) -> None:
cls._singleton_instance = None
cls._startup_message_logged = True
cls._singleton_lock = threading.RLock()
@classmethod
def _fork_shutdown(cls) -> None:
with cls._singleton_lock:
cls._singleton_instance = None
@classmethod
def _set_absl_skip_frames(cls) -> None:
"""Sets absl logging attribution to skip over internal logging frames."""
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='debug',
)
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='timed_debug',
)
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='info',
)
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='warning',
)
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='error',
)
logging.ABSLLogger.register_frame_to_skip(
__file__,
function_name='critical',
)
def __init__(self):
with CloudLoggingClient._singleton_lock:
if not _are_flags_initialized():
# if flags are not initialize then init logging flags
flags.FLAGS(sys.argv, known_only=True)
if CloudLoggingClient._singleton_instance is not None:
raise cloud_logging_client_instance.CloudLoggerInstanceExceptionError(
'Singleton already initialized.'
)
CloudLoggingClient._set_absl_skip_frames()
gcp_project = (
_default_gcp_project()
if CLOUD_OPS_LOG_PROJECT_FLG.value is None
else CLOUD_OPS_LOG_PROJECT_FLG.value
)
pod_host_name = (
'' if POD_HOSTNAME_FLG.value is None else POD_HOSTNAME_FLG.value
)
pod_uid = '' if POD_UID_FLG.value is None else POD_UID_FLG.value
super().__init__(
log_name=CLOUD_OPS_LOG_NAME_FLG.value,
gcp_project_to_write_logs_to=gcp_project,
gcp_credentials=None,
pod_hostname=pod_host_name,
pod_uid=pod_uid,
enable_structured_logging=ENABLE_STRUCTURED_LOGGING_FLG.value,
use_absl_logging=_DEBUG_LOGGING_USE_ABSL_LOGGING_FLG.value,
log_all_python_logs_to_cloud=LOG_ALL_PYTHON_LOGS_TO_CLOUD_FLG.value,
per_thread_log_signatures=PER_THREAD_LOG_SIGNATURES_FLG.value,
enabled=ENABLE_LOGGING_FLG.value,
)
CloudLoggingClient._singleton_instance = self
def startup_msg(self) -> None:
"""Logs default messages after logger fully initialized."""
if self.use_absl_logging() or CloudLoggingClient._startup_message_logged:
return
CloudLoggingClient._startup_message_logged = True
pid = os.getpid()
process_name = psutil.Process(pid).name()
self.debug(
'Container process started.',
{'process_name': process_name, 'process_id': pid},
)
self.debug(
'Container environmental variables.', os.environ
) # pytype: disable=wrong-arg-types # kwargs-checking
vm = psutil.virtual_memory()
self.debug(
'Compute instance',
{
'processors(count)': os.cpu_count(),
'total_system_mem_(bytes)': vm.total,
'available_system_mem_(bytes)': vm.available,
},
)
self.debug('Initalized flags', _get_flags())
project_name = self.gcp_project_name if self.gcp_project_name else 'DEFAULT'
self.debug(f'Logging to GCP project: {project_name}')
@classmethod
def logger(cls, show_startup_msg: bool = True) -> CloudLoggingClient:
if cls._singleton_instance is None:
with cls._singleton_lock: # makes instance creation thread safe.
if cls._singleton_instance is None:
cls._singleton_instance = CloudLoggingClient()
if not show_startup_msg:
cls._startup_message_logged = True
else:
cls._singleton_instance.startup_msg() # pytype: disable=attribute-error
return cls._singleton_instance # pytype: disable=bad-return-type
def logger() -> CloudLoggingClient:
return CloudLoggingClient.logger()
def do_not_log_startup_msg() -> None:
CloudLoggingClient.logger(show_startup_msg=False)
def debug(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with debug severity.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().debug(msg, *struct, stack_frames_back=stack_frames_back + 1)
def timed_debug(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with debug severity and elapsed time since last timed debug log.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().timed_debug(msg, *struct, stack_frames_back=stack_frames_back + 1)
def info(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with info severity.
Args:
msg: message to log (string).
*struct: zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().info(msg, *struct, stack_frames_back=stack_frames_back + 1)
def warning(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with warning severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().warning(msg, *struct, stack_frames_back=stack_frames_back + 1)
def error(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with error severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().error(msg, *struct, stack_frames_back=stack_frames_back + 1)
def critical(
msg: str,
*struct: Union[Mapping[str, Any], Exception, None],
stack_frames_back: int = 0,
) -> None:
"""Logs with critical severity.
Args:
msg: Message to log (string).
*struct: Zero or more dict or exception to log in structured log.
stack_frames_back: Additional stack frames back to log source_location.
"""
logger().critical(msg, *struct, stack_frames_back=stack_frames_back + 1)
def clear_log_signature() -> None:
logger().clear_log_signature()
def get_log_signature() -> Mapping[str, Any]:
return logger().log_signature
def set_log_signature(sig: Mapping[str, Any]) -> None:
logger().log_signature = sig
def set_per_thread_log_signatures(val: bool) -> None:
logger().per_thread_log_signatures = val
def get_build_version(clip_length: Optional[int] = None) -> str:
if clip_length is not None and clip_length >= 0:
return logger().build_version[:clip_length]
return logger().build_version
def set_build_version(build_version: str) -> None:
logger().build_version = build_version
def set_log_trace_key(key: str) -> None:
logger().trace_key = key
# Logging interfaces are used from processes which are forked (gunicorn,
# DICOM Proxy, Orchestrator, Refresher). In Python, forked processes do not
# copy threads running within parent processes or re-initalize global/module
# state. This can result in forked modules being executed with invalid global
# state, e.g., acquired locks that will not release or references to invalid
# state. The cloud logging library utilizes a background thread transporting
# logs to cloud. The background threading is not compatiable with forking and
# will seg-fault (python queue wait). This can be avoided, by stoping and
# the background transport prior to forking and then restarting the transport
# following the fork.
os.register_at_fork(
before=CloudLoggingClient._fork_shutdown, # pylint: disable=protected-access
after_in_child=CloudLoggingClient._init_fork_module_state, # pylint: disable=protected-access
)