mysqloperator/controller/innodbcluster/logs/logs_api.py (113 lines of code) (raw):
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
from enum import Enum
from typing import Optional, Union, List, Callable, Dict, Tuple
from logging import Logger
from ...api_utils import dget_dict, dget_str, dget_list, ApiSpecError
from ...kubeutils import client as api_client
from .logs_collector_fluentd_api import FluentdSpec
from .logs_types_api import ServerLogType, GeneralLogSpec, ErrorLogSpec, SlowQueryLogSpec, MySQLLogSpecBase
#from ..cluster_api import AddToStsHandler
lc_default_container_name = "logcollector"
class LogCollectorSpec:
def __init__(self, namespace: str, cluster_name: str):
self.namespace: str = namespace
self.cluster_name: str = cluster_name
self.image: Optional[str] = None
self.container : Optional[str] = lc_default_container_name
self.envs: dict = {}
self.collector: Union[FluentdSpec, None] = None
self._prefix: str = None
def parse(self, spec: dict, prefix: str, logger: Logger) -> None:
if not spec:
return
self._prefix = prefix
if "image" in spec:
self.image = dget_str(spec, "image", prefix)
if "containerName" in spec:
self.container = dget_str(spec, "containerName", prefix)
if "env" in spec:
self.envs = dget_list(spec, "env", prefix, content_type=dict)
if "fluentd" in spec:
self.collector = FluentdSpec(self.namespace, self.cluster_name)
self.collector.parse(dget_dict(spec, "fluentd", prefix, {}), f"{prefix}.fluentd")
def validate(self, logHandlers: Dict[ServerLogType, MySQLLogSpecBase]) -> None:
if self.collect(logHandlers) and self.image is None:
raise ApiSpecError(f"No collector image set")
if self.collector is None and self._prefix:
raise ApiSpecError(f"No collector configured under {self._prefix}")
@property
def image_name(self) -> str:
return self.image
@property
def container_name(self) -> str:
return self.container if self.container else lc_default_container_name
def collect(self, logHandlers: Dict[ServerLogType, MySQLLogSpecBase]) -> bool:
return True in [logger.collect for logger in logHandlers.values()]
def remove_from_sts_spec(self, sts: Union[dict, api_client.V1StatefulSet], logHandlers: Dict[ServerLogType, MySQLLogSpecBase], logger: Logger) -> None:
if self.collect(logHandlers):
if self.collector is None:
raise ApiSpecError(f"No collector configured")
self.collector.remove_from_sts_spec(sts, self.container_name, self.image_name, self.envs, logger)
def add_to_sts_spec(self, sts: Union[dict, api_client.V1StatefulSet], patcher: 'InnoDBClusterObjectModifier', logHandlers: Dict[ServerLogType, MySQLLogSpecBase], add: bool, logger: Logger) -> None:
if self.collect(logHandlers):
if self.collector is None:
raise ApiSpecError(f"No collector configured")
self.collector.add_to_sts_spec(sts, patcher, self.container_name, self.image_name, self.envs, add, logger)
def get_config_maps(self, logHandlers: Dict[ServerLogType, MySQLLogSpecBase]) -> List[Dict]:
return self.collector.get_config_maps(logHandlers) if self.collect(logHandlers) and self.collector else []
class LogsSpec:
def __init__(self, namespace: str, cluster_name: str):
self.logs: Dict[ServerLogType, MySQLLogSpecBase] = {
ServerLogType.GENERAL.value: GeneralLogSpec(),
ServerLogType.ERROR.value: ErrorLogSpec(),
ServerLogType.SLOW_QUERY.value: SlowQueryLogSpec(),
}
self.cluster_name: str = cluster_name
self.collector: LogCollectorSpec = LogCollectorSpec(namespace, cluster_name)
self.cm_name = self.cluster_name + "-logs-config"
def parse(self, spec: dict, prefix: str, logger: Logger) -> None:
for (logName, logHandler) in self.logs.items():
if logName in spec:
logSpec = dget_dict(spec, logName, prefix, None)
logHandler.parse(logSpec, prefix + f".{logName}", logger)
if "collector" in spec:
self.collector.parse(dget_dict(spec, "collector", prefix, {}), prefix + ".collector", logger)
def validate(self) -> None:
for log in self.logs.values():
log.validate()
self.collector.validate(self.logs)
@property
def enabled(self) -> bool:
return True in [logger.enabled for logger in self.logs.values()]
@property
def collect(self) -> bool:
return True in [logger.collect for logger in self.logs.values()]
def get_add_to_initconf_cb(self) -> Optional[Callable[[Dict, str, Logger], None]]:
def cb(configmap: dict, prefix: str, logger: Logger) -> None:
pass
return cb
def get_remove_from_sts_cb(self) -> Optional[Callable[[Union[dict, api_client.V1StatefulSet], Logger], None]]:
return (lambda sts, logger: self.collector.remove_from_sts_spec(sts, self.logs, logger))
def get_add_to_sts_cb(self) -> Optional['AddToStsHandler']:
def cb(sts: Union[dict,api_client.V1StatefulSet], patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None:
enabled = self.enabled
for logName in self.logs:
container_name = "mysql"
self.logs[logName].add_to_sts_spec(sts, patcher, container_name, self.cm_name, enabled, logger)
self.collector.add_to_sts_spec(sts, patcher, self.logs, enabled, logger)
return cb
def get_configmaps_cb(self) -> Optional['GetConfigMapHandler']:
def cb(prefix: str, logger: Logger) -> Optional[List[Tuple[str, Optional[Dict]]]]:
logs_configmap = {
'apiVersion' : "v1",
'kind': 'ConfigMap',
'metadata': {
'name': self.cm_name, # must be the same as in get_config_maps_names
},
'data' : {
}
}
for logName in self.logs:
cm_data = self.logs[logName].get_cm_data(logger)
for cm_key in cm_data:
logs_configmap["data"][f"{prefix}{cm_key}"] = cm_data[cm_key]
return [(self.cm_name, logs_configmap)] + self.collector.get_config_maps(self.logs)
return cb