gui/backend/gui_plugin/core/Logger.py (176 lines of code) (raw):

# Copyright (c) 2021, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import mysqlsh import datetime from contextlib import contextmanager import traceback import sys import os import json from enum import IntEnum from gui_plugin.core.Error import MSGException from mysqlsh.plugin_manager import plugin_function # pylint: disable=no-name-in-module from gui_plugin.core.Protocols import Response from gui_plugin.core.BackendDbLogger import BackendDbLogger from gui_plugin.core import Filtering _db_files_count = 0 _files_count = 0 _process_count = 0 _thread_count = 0 SENSITIVE_KEYWORDS = ["password"] SENSITIVE_DATA_REPLACEMENT = "****" class LogLevel(IntEnum): NONE = 1 INTERNAL_ERROR = 2 ERROR = 3 WARNING = 4 INFO = 5 DEBUG = 6 DEBUG2 = 7 DEBUG3 = 8 MAX_LEVEL = 8 @staticmethod def from_string(log_level: str) -> 'LogLevel': try: return LogLevel[log_level] except KeyError: return LogLevel.INFO class BackendLogger: __instance = None __log_level = None __log_filters = [] @staticmethod def get_instance() -> 'BackendLogger': if BackendLogger.__instance == None: BackendLogger() return BackendLogger.__instance def __init__(self): if BackendLogger.__instance != None: raise Exception( "This class is a singleton, use get_instance function to get an instance.") else: BackendLogger.__instance = self log_level = os.environ.get('LOG_LEVEL', LogLevel.INFO.name) self.set_log_level(LogLevel.from_string(log_level)) self.add_filter({ "type": "key", "key": "password", "expire": Filtering.FilterExpire.Never }) def message_logger(self, log_type, message, tags=[], sensitive=False, prefix=""): now = datetime.datetime.now() message = prefix + str(self._filter(message) if sensitive else message) if 'session' in tags: BackendDbLogger.log(event_type=log_type.name, message=message) if 'shell' in tags: mysqlsh.globals.shell.log( log_type.name, f"[MSG] {message}") # pylint: disable=no-member if 'stdout' in tags: if self.__log_level >= log_type: print( f"{now.hour}:{now.minute}:{now.second}.{now.microsecond} {log_type.name}: {message}", file=sys.real_stdout, flush=True) def add_filter(self, options): if "type" in options: if options["type"] == "key": self.__log_filters.append(Filtering.KeyFilter(options["key"], options["expire"])) elif options["type"] == "substring": self.__log_filters.append(Filtering.SubstringFilter(options["start"], options["end"], options["expire"])) def set_log_level(self, log_level: LogLevel): BackendLogger.__log_level = log_level def get_log_level(self): return BackendLogger.__log_level def _filter(self, data): for filter in self.__log_filters: if not filter.expired(): data = filter.apply(data) self.__log_filters = [ filter for filter in self.__log_filters if not filter.expired()] return data def debug(message, tags=[], sensitive=False, prefix=""): # TODO: tweak these tags according the environment settings tags = tags + ['stdout'] BackendLogger.get_instance().message_logger( LogLevel.DEBUG, message, tags, sensitive, prefix) def info(message, tags=[], sensitive=False, prefix=""): # TODO: tweak these tags according the environment settings if 'session' not in tags: tags = tags + ['stdout', 'shell'] BackendLogger.get_instance().message_logger( LogLevel.INFO, message, tags, sensitive, prefix) def warning(message, tags=[], sensitive=False, prefix=""): # TODO: tweak these tags according the environment settings tags = tags + ['stdout', 'shell'] BackendLogger.get_instance().message_logger( LogLevel.WARNING, message, tags, sensitive, prefix) def error(message, tags=[], sensitive=False, prefix=""): # TODO: tweak these tags according the environment settings tags = tags + ['stdout', 'shell'] # convert to string in case we're logging an exception BackendLogger.get_instance().message_logger( LogLevel.ERROR, str(message), tags, sensitive, prefix) def exception(e, msg=None, tags=[]): if msg: error(msg, tags) if isinstance(e, MSGException): error(e, tags) else: exc_type, exc_value, exc_traceback = sys.exc_info() if exc_type is not None: exception_info = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) # the shell seems to be stripping initial spaces on every log line, # but I want indentation on exception reports exception_info = exception_info.strip().replace('\n', '\n-') error(f"Exception information:\n{exception_info}") else: error(e, tags) def debug2(message, tags=[], sensitive=False, prefix=""): tags = tags + ['stdout'] BackendLogger.get_instance().message_logger( LogLevel.DEBUG2, message, tags, sensitive, prefix) def debug3(message, tags=[], sensitive=False, prefix=""): tags = tags + ['stdout'] BackendLogger.get_instance().message_logger( LogLevel.DEBUG3, message, tags, sensitive, prefix) def internal_error(message, tags=[], sensitive=False, prefix=""): tags = tags + ['stdout'] BackendLogger.get_instance().message_logger( LogLevel.INTERNAL_ERROR, message, tags, sensitive, prefix) def add_filter(options): BackendLogger.get_instance().add_filter(options) def track_print(type): pass # import psutil # import threading # debug3( # f"{type} opened, open db files: {_db_files_count}, open files: {_files_count}, open processes: {_process_count}, handles: {len(psutil.Process().open_files())}, threads: {threading.active_count()} [{_thread_count}]") # for f in psutil.Process().open_files(): # debug3(f"--> {f}") def track_open(type): global _db_files_count global _files_count global _process_count global _thread_count if type == "db": _db_files_count += 1 # traceback.print_stack(limit=10) elif type == "file": _files_count += 1 elif type == "process": _process_count += 1 elif type == "thread": _thread_count += 1 # traceback.print_stack(limit=10) track_print(type) def track_close(type): global _db_files_count global _files_count global _process_count global _thread_count if type == "db": _db_files_count -= 1 if _db_files_count < 0: debug3("Trying closing db file that is already closed.") # traceback.print_stack(limit=10) elif type == "file": _files_count -= 1 if _files_count < 0: debug3("Trying closing file that is already closed.") traceback.print_stack(limit=10) elif type == "process": _process_count -= 1 if _process_count < 0: debug3("Trying closing process that is already closed.") # traceback.print_stack(limit=10) elif type == "thread": _thread_count -= 1 track_print(type) @plugin_function('gui.core.setLogLevel', shell=False, web=True) def set_log_level(log_level=LogLevel.INFO.name): """Sets the log level Change the logging level for the Backend Server, or disable logging. The 'log_level' argument can be one of: - none, - internal_error, - error, - warning, - info, - debug, - debug2, - debug3 Specifying 'none' disables logging. Level `info` is the default if you do not specify this option. Args: log_level (str): Level of logging Returns: None """ BackendLogger.get_instance().set_log_level(LogLevel[log_level.upper()]) @plugin_function('gui.core.getLogLevel', shell=False, web=True) def get_log_level(): """Gets the current log level Returns: str: log level """ return BackendLogger.get_instance().get_log_level().name