mysqloperator/controller/innodbcluster/cluster_api.py (1,811 lines of code) (raw):
# Copyright (c) 2020, 2024, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import PurePosixPath
import typing, abc
from typing import Optional, Union, List, Tuple, Dict, Callable, Any, cast, overload
import kopf
from kopf._cogs.structs.bodies import Body
from logging import getLogger, Logger
from .. import fqdn
from ..k8sobject import K8sInterfaceObject
from .. import utils, config, consts
from ..backup.backup_api import BackupProfile, BackupSchedule
from ..storage_api import StorageSpec
from ..api_utils import Edition, dget_bool, dget_dict, dget_enum, dget_str, dget_int, dget_float, dget_list, ApiSpecError, ImagePullPolicy
from ..kubeutils import api_core, api_apps, api_customobj, api_policy, api_rbac, api_batch, api_cron_job
from ..kubeutils import client as api_client, ApiException
from ..kubeutils import k8s_cluster_domain
from .logs.logs_api import LogsSpec
from .logs.logs_types_api import ConfigMapMountBase, get_object_name, patch_sts_spec_template_complex_attribute
import json
import yaml
import datetime
from cryptography import x509
from kubernetes import client
AddToInitconfHandler = Callable[[dict, str, Logger], None]
RemoveFromStsHandler = Callable[[Union[dict, api_client.V1StatefulSet], Logger], None]
AddToStsHandler = Callable[[Union[dict, 'InnoDBClusterObjectModifier', api_client.V1StatefulSet], Logger], None]
GetConfigMapHandler = Callable[[str, Logger], Optional[list[tuple[str, Optional[dict]]]]]
AddToSvcHandler = Callable[[Union[dict, api_client.V1Service], Logger], None]
GetSvcMonitorHandler = Callable[[Logger], tuple[str, Optional[dict]]]
MAX_CLUSTER_NAME_LEN = 28
def escape_value_for_mycnf(value: str) -> str:
return '"'+value.replace("\\", "\\\\").replace("\"", "\\\"")+'"'
class SecretData:
secret_name: Optional[str] = None
key: Optional[str] = None
class MetricsConfigMap(ConfigMapMountBase):
def __init__(self):
super().__init__("metrics", "metrics.cnf", "/tmp/metrics")
def parse(self, spec: dict, prefix: str) -> None:
pass
def validate(self) -> None:
pass
class MetriscSpec:
enable: bool = False
image: str = ""
options: Optional[list] = None
web_config: Optional[str] = None
tls_secret: Optional[str] = None
dbuser_name: str = "mysqlmetrics"
dbuser_grants: list = ['PROCESS', 'REPLICATION CLIENT', 'SELECT']
dbuser_max_connections: int = 3
monitor: bool = False
monitor_spec: dict = {}
port = 9104
config: MetricsConfigMap = MetricsConfigMap()
container_name = "metrics"
svc_port_name = "metrics"
def __init__(self, namespace: str, cluster_name: str):
self.namespace = namespace
self.cluster_name = cluster_name
self.cm_name = f"{self.cluster_name}-metricsconf"
def parse(self, spec: dict, prefix: str) -> None:
self.enable = dget_bool(spec, "enable", prefix)
self.image = dget_str(spec, "image", prefix)
self.options = dget_list(spec, "options", prefix, default_value=[],
content_type=str)
self.web_config = dget_str(spec, "webConfig", prefix, default_value="")
self.tls_secret = dget_str(spec, "tlsSecret", prefix, default_value="")
self.monitor = dget_bool(spec, "monitor", prefix,
default_value=False)
self.monitor_spec = dget_dict(spec, "monitorSpec", prefix,
default_value={})
user_spec = dget_dict(spec, "dbUser", prefix, default_value={})
user_prefix = prefix+".dbUser"
self.dbuser_name = dget_str(user_spec, "name", user_prefix,
default_value="mysqlmetrics")
self.dbuser_grants = dget_list(user_spec, "options", user_prefix,
default_value=['PROCESS', 'REPLICATION CLIENT', 'SELECT'],
content_type=str)
self.dbuser_max_connections = dget_int(user_spec, "maxConnections",
user_prefix, default_value=3)
self.options.append("--config.my-cnf=/tmp/metrics/metrics.cnf") # see __init__
def validate(self) -> None:
pass
def _add_container_to_sts_spec(self, sts: Union[dict, api_client.V1StatefulSet], patcher: 'InnoDBClusterObjectModifier', add: bool, logger: Logger) -> None:
options = self.options
if self.web_config:
options += ["--web.config.file=/config/web.config"]
mounts = [
{
"name": "rundir",
"mountPath": "/var/run/mysqld",
}
]
if self.web_config:
mounts.append(
{
"name": f"{self.container_name}-web-config",
"mountPath" : "/config",
"readOnly": True,
}
)
if self.tls_secret:
mounts.append(
{
"name": f"{self.container_name}-tls",
"mountPath" : "/tls",
"readOnly": True,
}
)
patch = {
"containers" : [
{
"name": self.container_name,
"image": self.image,
"imagePullPolicy": "IfNotPresent", # TODO: should be self.sidecar_image_pull_policy
"args": options,
# These can't go to spec.template.spec.securityContext
# See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
# See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
# See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
# See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
"securityContext": {
"allowPrivilegeEscalation": False,
"privileged": False,
"readOnlyRootFilesystem": True,
"capabilities": {
"drop": ["ALL"]
},
# We must use same user id as auth_socket expects
"runAsUser": 2,
"runAsGroup": 27,
},
"env": [
# For BC with pre-0.15.0. 0.15.0+ will use the configuration file and skip the env totally
{
"name": "DATA_SOURCE_NAME",
"value": f"{self.dbuser_name}:@unix(/var/run/mysqld/mysql.sock)/"
}
],
"ports": [
{
"name": self.container_name,
"containerPort": self.port,
"protocol": "TCP",
}
],
"volumeMounts": mounts,
}
]
}
patch_sts_spec_template_complex_attribute(sts, patcher, patch, "containers", add)
def _add_volumes_to_sts_spec(self, sts: Union[dict, api_client.V1StatefulSet], patcher: 'InnoDBClusterObjectModifier', add: bool, logger: Logger) -> None:
volumes = []
# if there is web_config and we have to add it - add it
# if there don't have to add, the patch won't be added but still be used to clean up from remnants of previous add
if (self.web_config and add) or (not add):
volumes.append(
{
"name": f"{self.container_name}-web-config",
"configMap": {
"name" : self.web_config,
"defaultMode": 0o444,
}
}
)
if (self.tls_secret and add) or (not add):
volumes.append(
{
"name": f"{self.container_name}-tls",
"secret": {
"name" : self.tls_secret,
"defaultMode": 0o444,
}
}
)
patch = {
"volumes" : volumes
}
patch_sts_spec_template_complex_attribute(sts, patcher, patch, "volumes", add)
def get_add_to_sts_cb(self) -> Optional[AddToStsHandler]:
def cb(sts: Union[dict, api_client.V1StatefulSet], patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None:
self._add_container_to_sts_spec(sts, patcher, self.enable, logger)
self._add_volumes_to_sts_spec(sts, patcher, self.enable, logger)
self.config.add_to_sts_spec(sts, patcher, self.container_name, self.cm_name, self.enable, logger)
return cb
def get_configmaps_cb(self) -> Optional[GetConfigMapHandler]:
def cb(prefix: str, logger: Logger) -> Optional[List[Tuple[str, Optional[Tuple[str, Optional[Dict]]]]]]:
if not self.enable:
return [(self.cm_name, None)]
config = f"""[client]
user={self.dbuser_name}
socket=unix:///var/run/mysqld/mysql.sock
"""
return [
(
self.cm_name,
{
'apiVersion' : "v1",
'kind': 'ConfigMap',
'metadata': {
'name': self.cm_name,
},
'data' : {
self.config.config_file_name: config
}
}
)
]
return cb
def get_add_to_svc_cb(self) -> Optional[AddToSvcHandler]:
def cb(svc: Union[dict, api_client.V1Service], logger: Logger) -> None:
patch = {
"ports" : [
{
"name" : self.svc_port_name,
"port": self.port, # ToDo : should be cluster.mysql_metrics_port
"targetPort": self.port, # ToDo : should be cluster.mysql_metrics_port
}
]
}
svc_name = "n/a"
my_port_names = [port["name"] for port in patch["ports"]]
if isinstance(svc, dict):
svc_name = svc["metadata"]["name"]
# first filter out the old `port`
svc["spec"]["ports"] = [port for port in svc["spec"]["ports"] if port and get_object_name(port) not in my_port_names]
# Then add, if needed
if self.enable:
utils.merge_patch_object(svc["spec"], patch)
elif isinstance(svc, api_client.V1Service):
svc_name = svc.metadata.name
# first filter out the old `port`
svc.spec.ports = [port for port in svc.spec.ports if port and get_object_name(port) not in my_port_names]
# Then add, if needed
if self.enable:
svc.spec.ports += patch["ports"]
print(f"\t\t\t{'A' if self.enable else 'Not a'}dding port {self.svc_port_name} to Service {svc_name}")
return cb
# Can't include the type for the MonitoringCoreosComV1Api.ServiceMonitor
def get_svc_monitor_cb(self) -> Optional[GetSvcMonitorHandler]:
def cb(logger: Logger) -> dict:
monitor = None
requested = self.enable and self.monitor
monitor_name = self.cluster_name
if requested:
monitor = f"""
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {monitor_name}
spec:
selector:
matchLabels:
mysql.oracle.com/cluster: {self.cluster_name}
tier: mysql
endpoints:
- port: metrics
path: /metrics
"""
monitor = yaml.safe_load(monitor)
return (monitor_name, monitor)
return cb
class CloneInitDBSpec:
uri: str = ""
password_secret_name: Optional[str] = None
root_user: Optional[str] = None
def parse(self, spec: dict, prefix: str) -> None:
self.uri = dget_str(spec, "donorUrl", prefix)
self.root_user = dget_str(
spec, "rootUser", prefix, default_value="root")
key_ref = dget_dict(spec, "secretKeyRef", prefix)
self.password_secret_name = dget_str(
key_ref, "name", prefix+".secretKeyRef")
def get_password(self, ns: str) -> str:
secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
self.password_secret_name, ns))
return utils.b64decode(secret.data["rootPassword"])
class ClusterSetInitDBSpec:
uri: str = ""
password_secret_name: Optional[str] = None
root_user: Optional[str] = None
def parse(self, spec: dict, prefix: str) -> None:
self.uri = dget_str(spec, "targetUrl", prefix)
self.root_user = dget_str(
spec, "rootUser", prefix, default_value="root")
key_ref = dget_dict(spec, "secretKeyRef", prefix)
self.password_secret_name = dget_str(
key_ref, "name", prefix+".secretKeyRef")
def get_password(self, ns: str) -> str:
secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
self.password_secret_name, ns))
return utils.b64decode(secret.data["rootPassword"])
class SnapshotInitDBSpec:
storage: Optional[StorageSpec] = None
def parse(self, spec: dict, prefix: str) -> None:
self.storage = StorageSpec()
self.storage.parse(
dget_dict(spec, "storage", prefix), prefix+".storage")
class DumpInitDBSpec:
path: Optional[str] = None
storage: Optional[StorageSpec] = None
loadOptions: dict = {}
def parse(self, spec: dict, prefix: str) -> None:
# path can be "" if we're loading from a bucket
self.path = dget_str(spec, "path", prefix, default_value="")
self.storage = StorageSpec()
self.storage.parse(
dget_dict(spec, "storage", prefix), prefix+".storage")
self.loadOptions = dget_dict(spec, "options", prefix, default_value={})
class MebInitDBSpec:
s3_region: Optional[str] = None
s3_bucket: Optional[str] = None
s3_object_key_prefix: Optional[str] = None
s3_credentials: Optional[str] = None
s3_host: Optional[str] = None
oci_credentials: Optional[str] = None
full_backup: Optional[str] = None
incremental_backups: Optional[List[str]] = []
pitr_backup_file: Optional[str] = None
pitr_binlog_name: Optional[str] = None
pitr_gtid_purge: Optional[str] = None
pitr_end_term: Optional[str] = None
pitr_end_value: Optional[str] = None
def parse(self, spec: dict, prefix: str) -> None:
# TODO other storage types ....
storagespec = dget_dict(spec, "storage", prefix)
if "ociObjectStorage" in storagespec:
ocistorage = dget_dict(storagespec, "ociObjectStorage", prefix+".storage")
self.oci_credentials = dget_str(ocistorage, "credentials", prefix+".storage.credentials")
if "s3" in storagespec:
s3storage = dget_dict(storagespec, "s3", prefix+".storage")
self.s3_region = dget_str(s3storage, "region", prefix+".storage.s3.region")
self.s3_bucket= dget_str(s3storage, "region", prefix+".storage.s3.bucket")
self.s3_object_key_prefix = dget_str(s3storage, "region", prefix+".storage.s3.object_key_prefix")
self.s3_credentials = dget_str(s3storage, "region", prefix+".storage.s3.credentials")
self.s3_host = dget_str(s3storage, "region", prefix+".storage.s3.host", default_value="")
if (not self.oci_credentials and not self.s3_bucket) or (self.oci_credentials and self.s3_bucket):
raise kopf.TemporaryError("Need one of either s3 or ociObjectStorage for MEB Restore")
self.full_backup = dget_str(spec, "fullBackup", prefix)
self.incremental_backups = dget_list(spec, "incrementalBackups", prefix, default_value = [])
if "pitr" in spec:
pitr_spec = dget_dict(spec, "pitr", prefix)
self.pitr_backup_file = dget_str(pitr_spec, "backupFile", prefix+".pitr", default_value="")
self.pitr_binlog_name = dget_str(pitr_spec, "binlogName", prefix+".pitr", default_value="") # cluster.name)
self.pitr_gtid_purge = dget_str(pitr_spec, "gtidPurge", prefix+".pitr", default_value="")
if "end" in pitr_spec:
pitr_end_spec = dget_dict(pitr_spec, "end", prefix+".pitr.end")
if "afterGtids" in pitr_end_spec:
self.pitr_end_term = "SQL_AFTER_GTIDS"
self.pitr_end_value = dget_str(pitr_end_spec, "afterGtids",
prefix+".pitr.end.afterGtids")
elif "beforeGtids" in pitr_end_spec:
self.pitr_end_term = "SQL_BEFORE_GTIDS"
self.pitr_end_value = dget_str(pitr_end_spec, "beforeGtids",
prefix+".pitr.end.beforeGtids")
class SQLInitDB:
storage = None # TODO type
class InitDB:
clone: Optional[CloneInitDBSpec] = None
snapshot: Optional[SnapshotInitDBSpec] = None
dump: Optional[DumpInitDBSpec] = None
cluster_set: Optional[ClusterSetInitDBSpec] = None
meb: Optional[MebInitDBSpec] = None
def parse(self, spec: dict, prefix: str) -> None:
dump = dget_dict(spec, "dump", "spec.initDB", {})
clone = dget_dict(spec, "clone", "spec.initDB", {})
snapshot = dget_dict(spec, "snapshot", "spec.initDB", {})
meb = dget_dict(spec, "meb", "spec.initDB", {})
cluster_set = dget_dict(spec, "clusterSet", "spec.initDB", {})
if len([x for x in [dump, clone, snapshot, meb, cluster_set] if x]) > 1:
raise ApiSpecError(
"Only one of dump, snapshot, meb, clone, or clsuterSet may be specified in spec.initDB")
if not dump and not clone and not snapshot and not meb and not cluster_set:
raise ApiSpecError(
"One of dump, snapshot, meb, clone, or clusterSet may be specified in spec.initDB")
if clone:
self.clone = CloneInitDBSpec()
self.clone.parse(clone, "spec.initDB.clone")
elif dump:
self.dump = DumpInitDBSpec()
self.dump.parse(dump, "spec.initDB.dump")
elif snapshot:
self.snapshot = SnapshotInitDBSpec()
self.snapshot.parse(snapshot, "spec.initDB.snapshot")
elif meb:
self.meb = MebInitDBSpec()
self.meb.parse(meb, "spec.initDB.meb")
elif cluster_set:
self.cluster_set = ClusterSetInitDBSpec()
self.cluster_set.parse(cluster_set, "spec.initDB.clusterSet")
class KeyringConfigStorage(Enum):
CONFIGMAP = 1
SECRET = 2
class KeyringSpecBase(ABC):
@abstractmethod
def parse(self, spec: dict, prefix: str) -> None:
...
@abstractmethod
def add_to_sts_spec(self, statefulset: dict) -> None:
...
@abstractmethod
def add_to_global_manifest(self, manifest: dict) -> dict:
...
@abstractmethod
def add_component_manifest(self, data: dict, storage_type: KeyringConfigStorage) -> None:
...
@property
@abstractmethod
def component_manifest_name(self) -> str:
...
# TODO [compat8.3.0] remove this when compatibility pre 8.3.0 isn't needed anymore
def upgrade_to_component(self, sts: api_client.V1StatefulSet, spec, logger: Logger) -> Optional[tuple[dict, dict]]:
# only exists for OCI keyring
pass
class KeyringNoneSpec(KeyringSpecBase):
def parse(self, spec: dict, prefix: str) -> None:
...
def add_to_sts_spec(self, statefulset: dict) -> None:
...
def add_to_global_manifest(self, manifest: dict) -> dict:
...
def add_component_manifest(self, data: dict, storage_type: KeyringConfigStorage) -> None:
...
@property
def component_manifest_name(self) -> str:
return "invalid-component-manifest-name-call"
class KeyringFileSpec(KeyringSpecBase):
fileName: Optional[str] = None
readOnly: Optional[bool] = False
storage: Optional[dict] = None
def __init__(self, namespace: str, global_manifest_name: str, component_config_configmap_name: str, keyring_mount_path: str):
self.namespace = namespace
self.global_manifest_name = global_manifest_name
self.component_config_configmap_name = component_config_configmap_name
self.keyring_mount_path = keyring_mount_path
def parse(self, spec: dict, prefix: str) -> None:
self.fileName = dget_str(spec, "fileName", prefix)
self.readOnly = dget_bool(spec, "readOnly", prefix, default_value=False)
self.storage = dget_dict(spec, "storage", prefix)
def add_to_sts_spec(self, statefulset: dict) -> None:
self.add_conf_to_sts_spec(statefulset)
self.add_storage_to_sts_spec(statefulset)
def add_conf_to_sts_spec(self, statefulset: dict) -> None:
cm_mount_name = "keyringfile-conf"
mounts = f"""
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
"""
volumes = f"""
- name: {cm_mount_name}
configMap:
name: {self.component_config_configmap_name}
items:
- key: "{self.component_manifest_name}"
path: "{self.component_manifest_name}"
"""
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
{utils.indent(mounts, 4)}
containers:
- name: mysql
volumeMounts:
{utils.indent(mounts, 4)}
volumes:
{utils.indent(volumes, 2)}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
def add_storage_to_sts_spec(self, statefulset: dict) -> None:
if not self.storage:
return
storage_mount_name = "keyringfile-storage"
mounts = f"""
- name: {storage_mount_name}
mountPath: {self.keyring_mount_path}
"""
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
{utils.indent(mounts, 6)}
containers:
- name: mysql
volumeMounts:
{utils.indent(mounts, 6)}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
statefulset["spec"]["template"]["spec"]["volumes"].append({"name" : storage_mount_name, **self.storage})
def add_to_global_manifest(self, manifest: dict) -> dict:
component_name = "file://component_keyring_file"
if not manifest.get("components"):
manifest["components"] = f"{component_name}"
else:
manifest["components"] = manifest["components"] + f",{component_name}" # no space allowed after comma
def add_component_manifest(self, data: dict, storage_type: KeyringConfigStorage) -> None:
if storage_type == KeyringConfigStorage.CONFIGMAP:
data[self.component_manifest_name] = {
"path": str(PurePosixPath(self.keyring_mount_path)
.joinpath(self.fileName)),
"read_only": self.readOnly,
}
@property
def component_manifest_name(self) -> str:
return "component_keyring_file.cnf"
class KeyringEncryptedFileSpec(KeyringSpecBase):
# TODO: Storage must be mounted
fileName: Optional[str] = None
readOnly: Optional[bool] = False
password: Optional[str] = None
storage: Optional[dict] = None
def __init__(self, namespace: str, global_manifest_name: str, component_config_configmap_name: str, keyring_mount_path: str):
self.namespace = namespace
self.global_manifest_name = global_manifest_name
self.component_config_configmap_name = component_config_configmap_name
self.keyring_mount_path = keyring_mount_path
def parse(self, spec: dict, prefix: str) -> None:
def get_password_from_secret(secret_name: str) -> str:
expected_key = 'keyring_password'
try:
password = cast(api_client.V1Secret,
api_core.read_namespaced_secret(secret_name, self.namespace))
if not expected_key in password.data:
raise ApiSpecError(f"Secret {secret_name} has no key {expected_key}")
password = password.data[expected_key]
if not password:
raise ApiSpecError(f"Secret {secret_name}'s {expected_key} is empty")
return utils.b64decode(password)
except ApiException as e:
if e.status == 404:
raise ApiSpecError(f"Secret {secret_name} is missing")
raise
self.fileName = dget_str(spec, "fileName", prefix)
self.readOnly = dget_bool(spec, "readOnly", prefix, default_value=False)
self.password = get_password_from_secret(dget_str(spec, "password", prefix))
self.storage = dget_dict(spec, "storage", prefix)
def add_to_sts_spec(self, statefulset: dict) -> None:
self.add_conf_to_sts_spec(statefulset)
self.add_storage_to_sts_spec(statefulset)
def add_conf_to_sts_spec(self, statefulset: dict) -> None:
cm_mount_name = "keyringencfile-conf"
mounts = f"""
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
"""
volumes = f"""
- name: {cm_mount_name}
secret:
secretName: {self.component_config_configmap_name}
items:
- key: "{self.component_manifest_name}"
path: "{self.component_manifest_name}"
"""
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
{utils.indent(mounts, 4)}
containers:
- name: mysql
volumeMounts:
{utils.indent(mounts, 4)}
volumes:
{utils.indent(volumes, 2)}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
def add_storage_to_sts_spec(self, statefulset: dict) -> None:
if not self.storage:
return
storage_mount_name = "keyringencfile-storage"
mounts = f"""
- name: {storage_mount_name}
mountPath: {self.keyring_mount_path}
"""
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
{utils.indent(mounts, 6)}
containers:
- name: mysql
volumeMounts:
{utils.indent(mounts, 6)}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
statefulset["spec"]["template"]["spec"]["volumes"].append({"name" : storage_mount_name, **self.storage})
def add_to_global_manifest(self, manifest: dict) -> None:
component_name = "file://component_keyring_encrypted_file"
if not manifest.get("components"):
manifest["components"] = f"{component_name}"
else:
manifest["components"] = manifest["components"] + f",{component_name}" # no space allowed after comma
def add_component_manifest(self, data: dict, storage_type: KeyringConfigStorage) -> None:
if storage_type == KeyringConfigStorage.SECRET:
data[self.component_manifest_name] = {
"path": str(PurePosixPath(self.keyring_mount_path)
.joinpath(self.fileName)),
"read_only": self.readOnly,
"password": self.password,
}
@property
def component_manifest_name(self) -> str:
return "component_keyring_encrypted_file.cnf"
class KeyringOciSpec(KeyringSpecBase):
user: Optional[str] = None
keySecret: Optional[str] = None
keyFingerprint: Optional[str] = None
tenancy: Optional[str] = None
compartment: Optional[str] = None
virtualVault: Optional[str] = None
masterKey: Optional[str] = None
caCertificate: Optional[str] = None
endpointEncryption: Optional[str] = None
endpointManagement: Optional[str] = None
endpointVaults: Optional[str] = None
endpointSecrets: Optional[str] = None
def __init__(self, namespace: str, global_manifest_name: str, component_config_configmap_name: str, keyring_mount_path: str):
self.namespace = namespace
self.global_manifest_name = global_manifest_name
self.component_config_configmap_name = component_config_configmap_name
self.keyring_mount_path = keyring_mount_path
def parse(self, spec: dict, prefix: str) -> None:
self.user = dget_str(spec, "user", prefix)
self.keySecret = dget_str(spec, "keySecret", prefix)
self.keyFingerprint = dget_str(spec, "keyFingerprint", prefix)
self.tenancy= dget_str(spec, "tenancy", prefix)
self.compartment = dget_str(spec, "compartment", prefix)
self.virtualVault = dget_str(spec, "virtualVault", prefix)
self.masterKey = dget_str(spec, "masterKey", prefix)
self.caCertificate = dget_str(spec, "caCertificate", prefix, default_value="")
endpoints = dget_dict(spec, "endpoints", prefix, {})
if endpoints:
self.endpointEncryption = dget_str(endpoints, "encryption", prefix)
self.endpointManagement = dget_str(endpoints, "management", prefix)
self.endpointVaults = dget_str(endpoints, "vaults", prefix)
self.endpointSecrets = dget_str(endpoints, "secrets", prefix)
@property
def component_manifest_name(self) -> str:
return "component_keyring_oci.cnf"
def add_to_sts_spec(self, statefulset: dict):
cm_mount_name = "keyringfile-conf"
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
- name: ocikey
mountPath: /.oci
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
containers:
- name: mysql
volumeMounts:
- name: ocikey
mountPath: /.oci
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
volumes:
- name: ocikey
secret:
secretName: {self.keySecret}
- name: {cm_mount_name}
configMap:
name: {self.component_config_configmap_name}
items:
- key: "{self.component_manifest_name}"
path: "{self.component_manifest_name}"
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
if self.caCertificate:
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
- name: oci-keyring-ca
mountPath: /etc/mysql-keyring-ca
containers:
- name: mysql
volumeMounts:
- name: oci-keyring-ca
mountPath: /etc/mysql-keyring-ca
volumes:
- name: oci-keyring-ca
secret:
secretName: {self.caCertificate}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
def add_to_global_manifest(self, manifest: dict) -> dict:
component_name = "file://component_keyring_oci"
if not manifest.get("components"):
manifest["components"] = f"{component_name}"
else:
manifest["components"] = manifest["components"] + f",{component_name}" # no space allowed after comma
def add_component_manifest(self, data: dict, storage_type: KeyringConfigStorage) -> None:
if storage_type == KeyringConfigStorage.CONFIGMAP:
data[self.component_manifest_name] = {
"user": self.user,
"tenancy": self.tenancy,
"compartment": self.compartment,
"virtual_vault": self.virtualVault,
"master_key": self.masterKey,
"encryption_endpoint": self.endpointEncryption,
"management_endpoint": self.endpointManagement,
"vaults_endpoint": self.endpointVaults,
"secrets_endpoint": self.endpointSecrets,
"key_file": "/.oci/privatekey",
"key_fingerprint": self.keyFingerprint
}
if self.caCertificate:
data[self.component_manifest_name]["keyring_oci_ca_certificate"] = "/etc/mysql-keyring-ca/certificate"
# TODO [compat8.3.0] remove this when compatibility pre 8.3.0 isn't needed anymore
def upgrade_to_component(self, sts: api_client.V1StatefulSet, spec, logger: Logger) -> Optional[tuple[dict, dict]]:
cm_mount_name = "keyringfile-conf"
if any(v.name == cm_mount_name for v in sts.spec.template.spec.volumes):
# we already mount config map for component - nothign to do
return
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
- name: ocikey
mountPath: /.oci
- name: globalcomponentconf
mountPath: /usr/sbin/{self.global_manifest_name}
subPath: {self.global_manifest_name} #should be the same as the volume.items.path
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
containers:
- name: mysql
volumeMounts:
- name: ocikey
mountPath: /.oci
- name: globalcomponentconf
mountPath: /usr/sbin/{self.global_manifest_name}
subPath: {self.global_manifest_name} #should be the same as the volume.items.path
- name: {cm_mount_name}
mountPath: "/usr/lib64/mysql/plugin/{self.component_manifest_name}"
subPath: "{self.component_manifest_name}" #should be the same as the volume.items.path
volumes:
- name: ocikey
secret:
secretName: {self.keySecret}
- name: globalcomponentconf
configMap:
name: {self.component_config_configmap_name}
items:
- key: {self.global_manifest_name}
path: {self.global_manifest_name}
- name: {cm_mount_name}
configMap:
name: {self.component_config_configmap_name}
items:
- key: "{self.component_manifest_name}"
path: "{self.component_manifest_name}"
"""
sts_patch = yaml.safe_load(patch)
if self.caCertificate:
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
- name: oci-keyring-ca
mountPath: /etc/mysql-keyring-ca
containers:
- name: mysql
volumeMounts:
- name: oci-keyring-ca
mountPath: /etc/mysql-keyring-ca
volumes:
- name: oci-keyring-ca
secret:
secretName: {self.caCertificate}
"""
utils.merge_patch_object(sts_patch, yaml.safe_load(patch))
cm = spec.keyring.get_component_config_configmap_manifest()
return (cm, sts_patch)
# TODO: merge this with KeyringSpecBase
class KeyringSpec:
keyring: Optional[KeyringSpecBase] = KeyringNoneSpec()
global_manifest_name: str = 'mysqld.my'
keyring_mount_path: str = '/keyring'
def __init__(self, namespace: str, cluster_name: str):
self.namespace = namespace
self.cluster_name = cluster_name
def parse(self, spec: dict, prefix: str) -> None:
krFile = dget_dict(spec, "file", "spec.keyring", {})
krEncryptedFile = dget_dict(spec, "encryptedFile", "spec.keyring", {})
krOci = dget_dict(spec, "oci", "spec.keyring", {})
if len([x for x in [krFile, krEncryptedFile, krOci] if x]) > 1:
raise ApiSpecError(
"Only one of file, encryptedFile or oci may be specified in spec.keyring")
if not krFile and not krEncryptedFile and not krOci:
raise ApiSpecError(
"One of file, encryptedFile or oci must be specified in spec.keyring")
if krFile:
self.keyring = KeyringFileSpec(self.namespace, self.global_manifest_name, self.component_config_configmap_name, self.keyring_mount_path)
self.keyring.parse(krFile, "spec.keyring.file")
elif krEncryptedFile:
self.keyring = KeyringEncryptedFileSpec(self.namespace, self.global_manifest_name, self.component_config_configmap_name, self.keyring_mount_path)
self.keyring.parse(krEncryptedFile, "spec.keyring.encryptedFile")
elif krOci:
self.keyring = KeyringOciSpec(self.namespace, self.global_manifest_name, self.component_config_configmap_name, self.keyring_mount_path)
self.keyring.parse(krOci, "spec.keyring.oci")
else:
self.keyring = KeyringNoneSpec()
@property
def component_config_configmap_name(self) -> str:
return f"{self.cluster_name}-componentconf"
@property
def component_config_secret_name(self) -> str:
return f"{self.cluster_name}-componentconf"
def get_component_config_configmap_manifest(self) -> dict:
data = {
self.global_manifest_name : {}
}
self.keyring.add_to_global_manifest(data[self.global_manifest_name])
self.keyring.add_component_manifest(data, KeyringConfigStorage.CONFIGMAP)
cm = {
'apiVersion' : "v1",
'kind': 'ConfigMap',
'metadata': {
'name': self.component_config_configmap_name
},
'data' : { k: utils.dict_to_json_string(data[k]) for k in data }
}
return cm
def get_component_config_secret_manifest(self) -> Optional[Dict]:
data = {
}
self.keyring.add_component_manifest(data, KeyringConfigStorage.SECRET)
if len(data) == 0:
return None
cm = {
'apiVersion' : "v1",
'kind': 'Secret',
'metadata': {
'name': self.component_config_secret_name
},
'data' : { k: utils.b64encode(utils.dict_to_json_string(data[k])) for k in data }
}
return cm
def add_to_sts_spec_component_global_manifest(self, statefulset: dict):
if isinstance(self.keyring, KeyringNoneSpec):
# this is slight misuse of a NullObject type ...
return
mounts = f"""
- name: globalcomponentconf
mountPath: /usr/sbin/{self.global_manifest_name}
subPath: {self.global_manifest_name} #should be the same as the volume.items.path
"""
volumes = f"""
- name: globalcomponentconf
configMap:
name: {self.component_config_configmap_name}
items:
- key: {self.global_manifest_name}
path: {self.global_manifest_name}
"""
patch = f"""
spec:
initContainers:
- name: initmysql
volumeMounts:
{utils.indent(mounts, 6)}
containers:
- name: mysql
volumeMounts:
{utils.indent(mounts, 6)}
volumes:
{utils.indent(volumes, 4)}
"""
utils.merge_patch_object(statefulset["spec"]["template"], yaml.safe_load(patch))
def add_to_sts_spec(self, statefulset: dict):
self.add_to_sts_spec_component_global_manifest(statefulset)
self.keyring.add_to_sts_spec(statefulset)
# TODO [compat8.3.0] remove this when compatibility pre 8.3.0 isn't needed anymore
def upgrade_to_component(self, sts: api_client.V1StatefulSet, spec, logger: Logger) -> Optional[tuple[dict, dict]]:
# only exists for OCI keyring
if self.keyring:
return self.keyring.upgrade_to_component(sts, spec, logger)
class RouterSpec:
# number of Router instances (optional)
instances: int = 1
# Router version, if user wants to override it (latest by default)
version: str = None # config.DEFAULT_ROUTER_VERSION_TAG
podSpec: dict = {}
podAnnotations: Optional[dict] = None
podLabels: Optional[dict] = None
bootstrapOptions: list = []
options: list = []
routingOptions: dict = {}
tlsSecretName: str = ""
def parse(self, spec: dict, prefix: str) -> None:
if "instances" in spec:
self.instances = dget_int(spec, "instances", prefix)
if "version" in spec:
self.version = dget_str(spec, "version", prefix)
if "tlsSecretName" in spec:
self.tlsSecretName = dget_str(spec, "tlsSecretName", prefix)
if "podSpec" in spec: # TODO - replace with something more specific
self.podSpec = dget_dict(spec, "podSpec", prefix)
if "podAnnotations" in spec:
self.podAnnotations = dget_dict(spec, "podAnnotations", prefix)
if "podLabels" in spec:
self.podLabels = dget_dict(spec, "podLabels", prefix)
if "bootstrapOptions" in spec:
self.bootstrapOptions = dget_list(spec, "bootstrapOptions", prefix)
if "options" in spec:
self.options = dget_list(spec, "options", prefix)
if "routingOptions" in spec:
self.routingOptions = dget_dict(spec, "routingOptions", prefix)
class InstanceServiceSpec:
annotations: dict = {}
labels: dict = {}
def parse(self, spec: dict, prefix: str) -> None:
if "annotations" in spec:
self.annotations = dget_dict(spec, "annotations", prefix)
if "labels" in spec:
self.labels = dget_dict(spec, "labels", prefix)
class ServiceSpec:
type: str = "ClusterIP"
annotations: dict = {}
labels: dict = {}
defaultPort: str = "mysql-rw"
def parse(self, spec: dict, prefix: str) -> None:
if "type" in spec:
self.type = dget_str(spec, "type", prefix)
if "annotations" in spec:
self.annotations = dget_dict(spec, "annotations", prefix)
if "labels" in spec:
self.labels = dget_dict(spec, "labels", prefix)
if "defaultPort" in spec:
self.defaultPort = dget_str(spec, "defaultPort", prefix)
def get_default_port_number(self, spec: 'InnoDBClusterSpec') -> int:
ports = {
"mysql-ro": spec.router_roport,
"mysql-rw": spec.router_rwport,
"mysql-rw-split": spec.router_rwsplitport
}
return ports[self.defaultPort]
class DataDirPermissionsSpec:
setRightsUsingInitContainer: bool = True
fsGroupChangePolicy: Optional[str] = ""
def parse(self, spec: dict, prefix: str) -> None:
if "setRightsUsingInitContainer" in spec:
self.setRightsUsingInitContainer = dget_bool(spec, "setRightsUsingInitContainer", prefix)
if "fsGroupChangePolicy" in spec:
self.fsGroupChangePolicy = dget_str(spec, "fsGroupChangePolicy", prefix)
# Must correspond to the names in the CRD
class InnoDBClusterSpecProperties(Enum):
SERVICE = "service"
INSTANCE_SERVICE = "instanceService"
LOGS = "logs"
ROUTER = "router"
BACKUP_PROFILES = "backupProfiles"
BACKUP_SCHEDULES = "backupSchedules"
METRICS = "metrics"
INITDB = "initDB"
class AbstractServerSetSpec(abc.ABC):
# name of user-provided secret containing root password (optional)
secretName: Optional[str] = None
# name of secret with CA for SSL
tlsCASecretName: str = ""
# name of secret with certificate and private key (server and router)
tlsSecretName: str = ""
# whether to allow use of self-signed TLS certificates
tlsUseSelfSigned: bool = False
# MySQL server version
version: str = config.DEFAULT_VERSION_TAG
# Sidecar version: used for initconf, sidecar, batchjob (backup)
sidecarVersion: str = config.DEFAULT_OPERATOR_VERSION_TAG
edition: Edition = config.OPERATOR_EDITION
imagePullPolicy: ImagePullPolicy = config.default_image_pull_policy
imagePullSecrets: Optional[List[dict]] = None
imageRepository: str = config.DEFAULT_IMAGE_REPOSITORY
serviceAccountName: Optional[str] = None
roleBindingName: Optional[str] = None
# number of MySQL instances (required)
instances: int = 1
# base value for server_id
baseServerId: int
# override volumeClaimTemplates for datadir in MySQL pods (optional)
datadirVolumeClaimTemplate = None
dataDirPermissions: Optional[DataDirPermissionsSpec] = DataDirPermissionsSpec()
# additional MySQL configuration options
mycnf: str = ""
# override pod template for MySQL (optional)
podSpec: dict = {}
podAnnotations: Optional[dict] = None
podLabels: Optional[dict] = None
backupSchedules: List[BackupSchedule] = []
metrics: Optional[MetriscSpec] = None
logs: Optional[LogsSpec] = None
# (currently) non-configurable constants
mysql_port: int = 3306
mysql_xport: int = 33060
mysql_grport: int = 33061
mysql_metrics_port: int = 9104
router_rwport: int = 6446
router_roport: int = 6447
router_rwxport: int = 6448
router_roxport: int = 6449
router_rwsplitport: int = 6450
router_httpport: int = 8443
serviceFqdnTemplate = None
# TODO resource allocation for server, router and sidecar
# TODO recommendation is that sidecar has 500MB RAM if MEB is used
def __init__(self, namespace: str, name: str, cluster_name: str, spec: dict):
self.namespace = namespace
self.name = name
self.cluster_name = cluster_name
self.backupSchedules: List[BackupSchedule] = []
#@abc .abstracmethod
def load(self, spec: dict) -> None:
...
def _load(self, spec_root: dict, spec_specific: dict, where_specific: str) -> None:
# initialize now or all instances will share the same list initialized before the ctor
self.add_to_initconf_cbs: dict[str, List[AddToInitconfHandler]] = {}
self.remove_from_sts_cbs: dict[str, List[RemoveFromStsHandler]] = {}
self.add_to_sts_cbs: dict[str, List[AddToStsHandler]] = {}
self.get_configmaps_cbs: dict[str, List[GetConfigMapHandler]] = {}
self.get_add_to_svc_cbs: dict[str, List[AddToSvcHandler]] = {}
self.get_svc_monitor_cbs: dict[str, List[GetSvcMonitorHandler]] = {}
self.secretName = dget_str(spec_root, "secretName", "spec")
if "tlsCASecretName" in spec_root:
self.tlsCASecretName = dget_str(spec_root, "tlsCASecretName", "spec")
else:
self.tlsCASecretName = f"{self.name}-ca"
if "tlsSecretName" in spec_root:
self.tlsSecretName = dget_str(spec_root, "tlsSecretName", "spec")
else:
self.tlsSecretName = f"{self.name}-tls"
if "tlsUseSelfSigned" in spec_root:
self.tlsUseSelfSigned = dget_bool(spec_root, "tlsUseSelfSigned", "spec")
self.instances = dget_int(spec_specific, "instances", where_specific)
if "version" in spec_specific:
self.version = dget_str(spec_specific, "version", where_specific)
if "edition" in spec_root:
self.edition = dget_enum(
spec_root, "edition", "spec", default_value=config.OPERATOR_EDITION,
enum_type=Edition)
if "imageRepository" not in spec_root:
self.imageRepository = config.DEFAULT_IMAGE_REPOSITORY
if "imagePullPolicy" in spec_root:
self.imagePullPolicy = dget_enum(
spec_root, "imagePullPolicy", "spec",
default_value=config.default_image_pull_policy,
enum_type=ImagePullPolicy)
if "imagePullSecrets" in spec_root:
self.imagePullSecrets = dget_list(
spec_root, "imagePullSecrets", "spec", content_type=dict)
self.serviceAccountName = dget_str(spec_root, "serviceAccountName", "spec", default_value=f"{self.name}-sidecar-sa")
self.roleBindingName = f"{self.name}-sidecar-rb"
self.switchoverServiceAccountName = "mysql-switchover-sa"
self.switchoverRoleBindingName = "mysql-switchover-rb"
if "imageRepository" in spec_root:
self.imageRepository = dget_str(spec_root, "imageRepository", "spec")
if "podSpec" in spec_specific: # TODO - replace with something more specific
self.podSpec = dget_dict(spec_specific, "podSpec", where_specific)
if "podAnnotations" in spec_specific:
self.podAnnotations = dget_dict(spec_specific, "podAnnotations", where_specific)
if "podLabels" in spec_specific:
self.podLabels = dget_dict(spec_specific, "podLabels", where_specific)
if "datadirVolumeClaimTemplate" in spec_specific:
self.datadirVolumeClaimTemplate = dget_dict(spec_specific, "datadirVolumeClaimTemplate", where_specific)
self.keyring = KeyringSpec(self.namespace, self.name)
if "keyring" in spec_root:
self.keyring.parse(dget_dict(spec_root, "keyring", "spec"), "spec.keyring")
if "mycnf" in spec_root:
self.mycnf = dget_str(spec_root, "mycnf", "spec")
self.metrics = MetriscSpec(self.namespace, self.cluster_name)
if "metrics" in spec_root:
self.metrics.parse(dget_dict(spec_root, "metrics", "spec"), "spec.metrics")
if cb := self.metrics.get_configmaps_cb():
self.get_configmaps_cbs[InnoDBClusterSpecProperties.METRICS.value] = [cb]
if cb := self.metrics.get_add_to_sts_cb():
self.add_to_sts_cbs[InnoDBClusterSpecProperties.METRICS.value] = [cb]
if cb := self.metrics.get_add_to_svc_cb():
self.get_add_to_svc_cbs[InnoDBClusterSpecProperties.METRICS.value] = [cb]
if cb := self.metrics.get_svc_monitor_cb():
self.get_svc_monitor_cbs[InnoDBClusterSpecProperties.METRICS.value] = [cb]
self.logs = LogsSpec(self.namespace, self.cluster_name)
if (section:= InnoDBClusterSpecProperties.LOGS.value) in spec_root:
self.logs.parse(dget_dict(spec_root, section, "spec"), f"spec.{section}", getLogger())
if cb := self.logs.get_configmaps_cb():
self.get_configmaps_cbs[InnoDBClusterSpecProperties.LOGS.value] = [cb]
if cb := self.logs.get_add_to_initconf_cb():
self.add_to_initconf_cbs[InnoDBClusterSpecProperties.LOGS.value] = [cb]
if cb := self.logs.get_remove_from_sts_cb():
self.remove_from_sts_cbs[InnoDBClusterSpecProperties.LOGS.value] = [cb]
if cb := self.logs.get_add_to_sts_cb():
self.add_to_sts_cbs[InnoDBClusterSpecProperties.LOGS.value] = [cb]
# Initialization Options
section = InnoDBClusterSpecProperties.INITDB.value
if section in spec_root:
self.load_initdb(dget_dict(spec_root, section, "spec"))
# TODO keep a list of base_server_id in the operator to keep things globally unique?
if "baseServerId" in spec_specific:
self.baseServerId = dget_int(spec_specific, "baseServerId", "spec")
self.serviceFqdnTemplate = None
if "serviceFqdnTemplate" in spec_root:
self.serviceFqdnTemplate = dget_str(spec_root, "serviceFqdnTemplate",
"spec",
default_value="{service}.{namespace}.svc.{domain}")
self.dataDirPermissions = DataDirPermissionsSpec()
section = "datadirPermissions"
if section in spec_root:
self.dataDirPermissions.parse(dget_dict(spec_root, section, "spec"), f"spec.{section}")
self.instanceService = InstanceServiceSpec()
section = InnoDBClusterSpecProperties.INSTANCE_SERVICE.value
if section in spec_root:
self.instanceService.parse(dget_dict(spec_root, section, "spec"), f"spec.{section}")
def print_backup_schedules(self) -> None:
for schedule in self.backupSchedules:
print(f"schedule={schedule}")
def load_initdb(self, spec: dict) -> None:
self.initDB = InitDB()
self.initDB.parse(spec, "spec.initDB")
def get_backup_profile(self, name: str) -> Optional[BackupProfile]:
if self.backupProfiles:
for profile in self.backupProfiles:
if profile.name == name:
return profile
return None
def validate(self, logger: Logger) -> None:
# TODO see if we can move some of these to a schema in the CRD
if len(self.name) > MAX_CLUSTER_NAME_LEN:
raise ApiSpecError(
f"Cluster name {self.name} is too long. Must be < {MAX_CLUSTER_NAME_LEN}")
if not self.instances:
raise ApiSpecError(
f"spec.instances must be set and > 0. Got {self.instances!r}")
if (not self.baseServerId or
self.baseServerId < config.MIN_BASE_SERVER_ID or
self.baseServerId > config.MAX_BASE_SERVER_ID):
raise ApiSpecError(
f"spec.baseServerId is {self.baseServerId} but must be between "
f"{config.MIN_BASE_SERVER_ID} and {config.MAX_BASE_SERVER_ID}")
# TODO validate against downgrades, invalid version jumps
# validate podSpec through the Kubernetes API
if self.podSpec:
pass
if self.tlsSecretName and not self.tlsCASecretName:
logger.info("spec.tlsSecretName is set but will be ignored because self.tlsCASecretName is not set")
# TODO ensure that if version is set, then image and routerImage are not
# TODO should we support upgrading router only?
# validate version
if self.version:
# note: format of the version string is defined in the CRD
[valid_version, version_error] = utils.version_in_range(self.version)
if not valid_version:
raise ApiSpecError(version_error)
self.logs.validate()
def format_image(self, image, version):
if self.imageRepository:
return f"{self.imageRepository}/{image}:{version}"
return f"{image}:{version}"
@property
def mysql_image(self) -> str:
# server image version is the one given by the user or latest by default
image = config.MYSQL_SERVER_IMAGE if self.edition == Edition.community else config.MYSQL_SERVER_EE_IMAGE
return self.format_image(image, self.version)
@property
def operator_image(self) -> str:
# version is the same as ours (operator)
if self.edition == Edition.community:
image = config.MYSQL_OPERATOR_IMAGE
else:
image = config.MYSQL_OPERATOR_EE_IMAGE
return self.format_image(image, self.sidecarVersion)
@property
def mysql_image_pull_policy(self) -> str:
return self.imagePullPolicy.value
@property
def sidecar_image_pull_policy(self) -> str:
return self.imagePullPolicy.value
@property
def operator_image_pull_policy(self) -> str:
return self.imagePullPolicy.value
@property
def extra_env(self) -> str:
if config.debug:
return f"""
- name: MYSQL_OPERATOR_DEBUG
value: "{config.debug}"
"""
else:
return ""
@property
def extra_volumes(self) -> str:
volumes = []
same_secret = self.tlsCASecretName == self.tlsSecretName
if not self.tlsUseSelfSigned:
volume = f"""
- name: ssldata
projected:
sources:
- secret:
name: {self.tlsSecretName}
"""
if not same_secret:
volume += f"""
- secret:
name: {self.tlsCASecretName}
"""
volumes.append(volume)
return "\n".join(volumes)
@property
def extra_volume_mounts(self) -> str:
mounts = []
if not self.tlsUseSelfSigned:
mounts.append(f"""
- mountPath: /etc/mysql-ssl
name: ssldata
""")
return "\n".join(mounts)
@property
def extra_sidecar_volume_mounts(self) -> str:
mounts = []
if not self.tlsUseSelfSigned:
mounts.append("""
- mountPath: /etc/mysql-ssl
name: ssldata
""")
return "\n".join(mounts)
@property
def image_pull_secrets(self) -> str:
if self.imagePullSecrets:
return f"imagePullSecrets:\n{yaml.safe_dump(self.imagePullSecrets)}"
return ""
class ReadReplicaSpec(AbstractServerSetSpec):
def __init__(self, namespace: str, cluster_name: str, spec_root: dict,
spec_specific: dict, where_specific: str):
name = f"{cluster_name}-{dget_str(spec_specific, 'name', where_specific)}"
super().__init__(namespace, name, cluster_name, spec_root)
self.load(spec_root, spec_specific, where_specific)
def load(self, spec_root: dict, spec_specific: dict, where_specific: str):
self._load(spec_root, spec_specific, where_specific)
@property
def headless_service_name(self) -> str:
#self.name is actually "{cluster_name}-{rr_name}"
return f"{self.name}-instances"
class InnoDBClusterSpec(AbstractServerSetSpec):
service: ServiceSpec = ServiceSpec()
# Initialize DB
initDB: Optional[InitDB] = None
router: RouterSpec = RouterSpec()
# Backup info
backupProfiles: List[BackupProfile] = []
# ReadReplica
readReplicas: List[ReadReplicaSpec] = []
def __init__(self, namespace: str, name: str, spec: dict):
super().__init__(namespace, name, name, spec)
self.load(spec)
def load(self, spec: dict) -> None:
self._load(spec, spec, "spec")
self.service = ServiceSpec()
section = InnoDBClusterSpecProperties.SERVICE.value
if section in spec:
self.service.parse(dget_dict(spec, section, "spec"), f"spec.{section}")
# Router Options
self.router = RouterSpec()
section = InnoDBClusterSpecProperties.ROUTER.value
if section in spec:
self.router.parse(dget_dict(spec, section, "spec"), f"spec.{section}")
if not self.router.tlsSecretName:
self.router.tlsSecretName = f"{self.name}-router-tls"
self.backupProfiles = []
section = InnoDBClusterSpecProperties.BACKUP_PROFILES.value
if section in spec:
profiles = dget_list(spec, section, "spec", [], content_type=dict)
for profile in profiles:
self.backupProfiles.append(self.parse_backup_profile(profile))
self.backupSchedules = []
section = InnoDBClusterSpecProperties.BACKUP_SCHEDULES.value
if section in spec:
schedules = dget_list(spec, section, "spec", [], content_type=dict)
for schedule in schedules:
self.backupSchedules.append(self.parse_backup_schedule(schedule))
self.readReplicas = []
section = "readReplicas"
if section in spec:
read_replicas = dget_list(spec, section, "spec", [], content_type=dict)
i = 0
for replica in read_replicas:
self.readReplicas.append(self.parse_read_replica(spec, replica, f"spec.{section}[{i}]"))
i += 1
def validate(self, logger: Logger) -> None:
super().validate(logger)
# check that the secret exists and it contains rootPassword
if self.secretName: # TODO
pass
if self.mycnf:
if "[mysqld]" not in self.mycnf:
logger.warning(
"spec.mycnf data does not contain a [mysqld] line")
def parse_backup_profile(self, spec: dict) -> BackupProfile:
profile = BackupProfile()
profile.parse(spec, "spec.backupProfiles")
return profile
def parse_backup_schedule(self, spec: dict) -> BackupSchedule:
schedule = BackupSchedule(self)
schedule.parse(spec, "spec.backupSchedules")
return schedule
def parse_read_replica(self, spec_root: dict, spec_specific: dict, where_specific: str):
replica = ReadReplicaSpec(self.namespace, self.name, spec_root,
spec_specific, where_specific)
return replica
def print_backup_schedules(self) -> None:
for schedule in self.backupSchedules:
print(f"schedule={schedule}")
def load_initdb(self, spec: dict) -> None:
self.initDB = InitDB()
self.initDB.parse(spec, "spec.initDB")
def get_backup_profile(self, name: str) -> Optional[BackupProfile]:
if self.backupProfiles:
for profile in self.backupProfiles:
if profile.name == name:
return profile
return None
def get_read_replica(self, name: str) -> Optional[ReadReplicaSpec]:
if self.readReplicas:
for rr in self.readReplicas:
if rr.name == rr.cluster_name + '-' + name:
return rr
return None
@property
def extra_router_volumes_no_cert(self) -> str:
volumes = []
if not self.tlsUseSelfSigned:
volumes.append(f"""
- name: ssl-ca-data
projected:
sources:
- secret:
name: {self.tlsCASecretName}
""")
return "\n".join(volumes)
@property
def extra_router_volumes(self) -> str:
volumes = []
if not self.tlsUseSelfSigned:
volumes.append(f"""
- name: ssl-ca-data
projected:
sources:
- secret:
name: {self.tlsCASecretName}
- name: ssl-key-data
projected:
sources:
- secret:
name: {self.router.tlsSecretName}
""")
return "\n".join(volumes)
@property
def extra_router_volume_mounts_no_cert(self) -> str:
mounts = []
if not self.tlsUseSelfSigned:
mounts.append(f"""
- mountPath: /router-ssl/ca/
name: ssl-ca-data
""")
return "\n".join(mounts)
@property
def extra_router_volume_mounts(self) -> str:
mounts = []
if not self.tlsUseSelfSigned:
mounts.append(f"""
- mountPath: /router-ssl/ca/
name: ssl-ca-data
- mountPath: /router-ssl/key/
name: ssl-key-data
""")
return "\n".join(mounts)
@property
def router_image(self) -> str:
if self.router.version:
version = self.router.version
elif self.version:
version = self.version
else:
version = config.DEFAULT_ROUTER_VERSION_TAG
image = config.MYSQL_ROUTER_IMAGE if self.edition == Edition.community else config.MYSQL_ROUTER_EE_IMAGE
return self.format_image(image, version)
@property
def router_image_pull_policy(self) -> str:
return self.router.podSpec.get("imagePullPolicy", self.imagePullPolicy.value)
@property
def service_fqdn_template(self) -> Optional[str]:
return self.serviceFqdnTemplate
@property
def headless_service_name(self) -> str:
return f"{self.name}-instances"
class InnoDBCluster(K8sInterfaceObject):
def __init__(self, cluster: Body) -> None:
super().__init__()
self.obj: Body = cluster
self._parsed_spec: Optional[InnoDBClusterSpec] = None
def __str__(self):
return f"{self.namespace}/{self.name}"
def __repr__(self):
return f"<InnoDBCluster {self.name}>"
@classmethod
def _get(cls, ns: str, name: str) -> Body:
try:
ret = cast(Body,
api_customobj.get_namespaced_custom_object(
consts.GROUP, consts.VERSION, ns,
consts.INNODBCLUSTER_PLURAL, name))
except ApiException as e:
raise e
return ret
@classmethod
def _patch(cls, ns: str, name: str, patch: dict) -> Body:
return cast(Body, api_customobj.patch_namespaced_custom_object(
consts.GROUP, consts.VERSION, ns,
consts.INNODBCLUSTER_PLURAL, name, body=patch))
@classmethod
def _patch_status(cls, ns: str, name: str, patch: dict) -> Body:
return cast(Body, api_customobj.patch_namespaced_custom_object_status(
consts.GROUP, consts.VERSION, ns,
consts.INNODBCLUSTER_PLURAL, name, body=patch))
@classmethod
def read(cls, ns: str, name: str) -> 'InnoDBCluster':
return InnoDBCluster(cls._get(ns, name))
@property
def metadata(self) -> dict:
return self.obj["metadata"]
@property
def annotations(self) -> dict:
return self.metadata["annotations"]
@property
def spec(self) -> dict:
return self.obj["spec"]
@property
def status(self) -> dict:
if "status" in self.obj:
return self.obj["status"]
return {}
@property
def name(self) -> str:
return self.metadata["name"]
@property
def namespace(self) -> str:
return self.metadata["namespace"]
@property
def uid(self) -> str:
return self.metadata["uid"]
@property
def deleting(self) -> bool:
return "deletionTimestamp" in self.metadata and self.metadata["deletionTimestamp"] is not None
def self_ref(self, field_path: Optional[str] = None) -> dict:
ref = {
"apiVersion": consts.API_VERSION,
"kind": consts.INNODBCLUSTER_KIND,
"name": self.name,
"namespace": self.namespace,
"resourceVersion": self.metadata["resourceVersion"],
"uid": self.uid
}
if field_path:
ref["fieldPath"] = field_path
return ref
@property
def parsed_spec(self) -> InnoDBClusterSpec:
if not self._parsed_spec:
self.parse_spec()
assert self._parsed_spec
return self._parsed_spec
def parse_spec(self) -> None:
self._parsed_spec = InnoDBClusterSpec(self.namespace, self.name, self.spec)
def reload(self) -> None:
self.obj = self._get(self.namespace, self.name)
def owns_pod(self, pod) -> bool:
owner_sts = pod.owner_reference("apps/v1", "StatefulSet")
return owner_sts.name == self.name
def get_pod(self, index) -> 'MySQLPod':
pod = cast(api_client.V1Pod, api_core.read_namespaced_pod(
"%s-%i" % (self.name, index), self.namespace))
return MySQLPod(pod)
def get_pods(self) -> typing.List['MySQLPod']:
# get all pods that belong to the same container
objects = cast(api_client.V1PodList, api_core.list_namespaced_pod(
self.namespace, label_selector="component=mysqld"))
pods = []
# Find the MySQLServer object corresponding to the server we're attached to
for o in objects.items:
pod = MySQLPod(o)
if self.owns_pod(pod):
pods.append(pod)
pods.sort(key=lambda pod: pod.index)
return pods
def get_routers(self) -> typing.List[str]:
# get all pods that belong to the same container
objects = cast(api_client.V1PodList, api_core.list_namespaced_pod(
self.namespace, label_selector=f"component=mysqlrouter,mysql.oracle.com/cluster={self.name}"))
pods = [o.metadata.name for o in objects.items]
return pods
def get_service(self) -> typing.Optional[api_client.V1Service]:
try:
return cast(api_client.V1Service,
api_core.read_namespaced_service(self.parsed_spec.headless_service_name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_read_replica_service(self, rr: ReadReplicaSpec) -> typing.Optional[api_client.V1Service]:
try:
return cast(api_client.V1Service,
api_core.read_namespaced_service(rr.headless_service_name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
# As of K8s 1.21 this is no more beta.
# Thus, eventually this needs to be upgraded to V1PodDisruptionBudget and api_policy to PolicyV1Api
def get_disruption_budget(self) -> typing.Optional[api_client.V1PodDisruptionBudget]:
try:
return cast(api_client.V1PodDisruptionBudget,
api_policy.read_namespaced_pod_disruption_budget(self.name + "-pdb", self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_stateful_set(self) -> typing.Optional[api_client.V1StatefulSet]:
try:
return cast(api_client.V1StatefulSet,
api_apps.read_namespaced_stateful_set(self.name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_read_replica_stateful_set(self, name: str) -> typing.Optional[api_client.V1StatefulSet]:
try:
return cast(api_client.V1StatefulSet,
api_apps.read_namespaced_stateful_set(name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_router_service(self) -> typing.Optional[api_client.V1Service]:
try:
return cast(api_client.V1Service,
api_core.read_namespaced_service(self.name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_router_deployment(self) -> typing.Optional[api_client.V1Deployment]:
try:
return cast(api_client.V1Deployment,
api_apps.read_namespaced_deployment(self.name+"-router", self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_cron_job(self, schedule_name: str) -> typing.Callable:
def get_cron_job_inner() -> typing.Optional[api_client.V1CronJob]:
try:
return cast(api_client.V1CronJob,
api_cron_job.read_namespaced_cron_job(schedule_name, self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
return get_cron_job_inner
def get_router_account(self) -> Tuple[str, str]:
secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
f"{self.name}-router", self.namespace))
return utils.b64decode(secret.data["routerUsername"]), utils.b64decode(secret.data["routerPassword"])
def get_backup_account(self) -> Tuple[str, str]:
secret = cast(api_client.V1Secret,
api_core.read_namespaced_secret(
f"{self.name}-backup", self.namespace))
return utils.b64decode(secret.data["backupUsername"]), utils.b64decode(secret.data["backupPassword"])
def get_private_secrets(self) -> api_client.V1Secret:
return cast(api_client.V1Secret,
api_core.read_namespaced_secret(f"{self.name}-privsecrets", self.namespace))
def get_user_secrets(self) -> typing.Optional[api_client.V1Secret]:
name = self.spec.get("secretName")
try:
return cast(api_client.V1Secret,
api_core.read_namespaced_secret(f"{name}", self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_ca_and_tls(self) -> Dict:
if self.parsed_spec.tlsUseSelfSigned:
return {}
ca_secret = None
server_tls_secret = None
same_secret_for_ca_and_tls = False
ret = {}
try:
server_tls_secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
self.parsed_spec.tlsSecretName, self.namespace))
except ApiException as e:
if e.status == 404:
return {}
raise
if "tls.crt" in server_tls_secret.data:
ret["tls.crt"] = utils.b64decode(server_tls_secret.data["tls.crt"])
if "tls.key" in server_tls_secret.data:
ret["tls.key"] = utils.b64decode(server_tls_secret.data["tls.key"])
if self.parsed_spec.tlsSecretName == self.parsed_spec.tlsCASecretName:
ca_secret = server_tls_secret
same_secret_for_ca_and_tls = True
else:
try:
ca_secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
self.parsed_spec.tlsCASecretName, self.namespace))
except ApiException as e:
if e.status == 404:
return ret
raise
ca_file_name = None
if "ca.pem" in ca_secret.data:
ca_file_name = "ca.pem"
elif "ca.crt" in ca_secret.data:
ca_file_name = "ca.crt"
ret["CA"] = ca_file_name
if ca_file_name:
ret[ca_file_name] = utils.b64decode(ca_secret.data[ca_file_name])
ret['same_secret_for_ca_and_tls'] = same_secret_for_ca_and_tls
# When using HELM a secret should exist, when using bare manifests the secret might
# not exist (not mentioned directly or using the default name) and so it is not mounted
# in the router pod, thus not passed to the router.
try:
router_tls_secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(
self.parsed_spec.router.tlsSecretName, self.namespace))
ret["router_tls.crt"] = utils.b64decode(router_tls_secret.data["tls.crt"])
ret["router_tls.key"] = utils.b64decode(router_tls_secret.data["tls.key"])
except ApiException as e:
if e.status != 404:
raise
return ret
def get_tls_issuer_and_subject_rdns(self) -> Dict[str, str]:
ca_and_tls = self.get_ca_and_tls()
tls_cert = x509.load_pem_x509_certificate(ca_and_tls["tls.crt"].encode('ascii'))
# See RF 4514
# 2.1. Converting the RDNSequence
# If the RDNSequence is an empty sequence, the result is the empty or
# zero-length string.
#
# Otherwise, the output consists of the string encodings of each
# RelativeDistinguishedName in the RDNSequence (according to Section
# 2.2), starting with the last element of the sequence and moving
# backwards toward the first.
# The encodings of adjoining RelativeDistinguishedNames are separated
# by a comma (',' U+002C) character.
# ---
# The fields come in reverse order of what we need, so [::-1] reverses it again after
# splitting and before joining with a `/`
issuer_rdns = "/" + "/".join(tls_cert.issuer.rfc4514_string().split(",")[::-1])
subject_rdns = "/" + "/".join(tls_cert.subject.rfc4514_string().split(",")[::-1])
return {
"issuer" : issuer_rdns,
"subject" : subject_rdns
}
def get_admin_account(self) -> Tuple[str, str]:
secrets = self.get_private_secrets()
return (utils.b64decode(secrets.data["clusterAdminUsername"]),
utils.b64decode(secrets.data["clusterAdminPassword"]))
@classmethod
def get_service_account_sidecar(cls, spec: AbstractServerSetSpec) -> api_client.V1ServiceAccount:
return cast(api_client.V1ServiceAccount,
api_core.read_namespaced_service_account(spec.serviceAccountName, spec.namespace))
@classmethod
def get_service_account_switchover(cls, spec: AbstractServerSetSpec) -> api_client.V1ServiceAccount:
# Not user configurable for now due to limited use. Also, only once per namespace, thus doesn't include cluster name
return cast(api_client.V1ServiceAccount,
api_core.read_namespaced_service_account(spec.switchoverServiceAccountName, spec.namespace))
@classmethod
def get_role_binding_sidecar(cls, spec: AbstractServerSetSpec) -> api_client.V1RoleBinding:
# Not user configurable for now due to limited use. Also, only once per namespace, thus doesn't include cluster name
return cast(api_client.V1RoleBinding,
api_rbac.read_namespaced_role_binding(spec.roleBindingName, spec.namespace))
@classmethod
def get_role_binding_switchover(cls, spec: AbstractServerSetSpec) -> api_client.V1RoleBinding:
return cast(api_client.V1RoleBinding,
api_rbac.read_namespaced_role_binding(spec.switchoverRoleBindingName, spec.namespace))
def delete_configmap(self, cm_name: str) -> typing.Optional[api_client.V1Status]:
try:
body = api_client.V1DeleteOptions(grace_period_seconds=0)
status = cast(api_client.V1Status,
api_core.delete_namespaced_config_map(cm_name, self.namespace, body=body))
return status
except ApiException as e:
if e.status == 404:
return None
raise
def get_configmap(self, cm_name: str) -> typing.Optional[api_client.V1ConfigMap]:
try:
cm = cast(api_client.V1ConfigMap,
api_core.read_namespaced_config_map(cm_name, self.namespace))
return cm
except ApiException as e:
if e.status == 404:
return None
raise
def get_secret(self, s_name: str) -> typing.Optional[api_client.V1Secret]:
try:
cm = cast(api_client.V1Secret,
api_core.read_namespaced_secret(s_name, self.namespace))
return cm
except ApiException as e:
if e.status == 404:
return None
raise
@classmethod
def get_initconf(cls, spec: AbstractServerSetSpec) -> typing.Optional[api_client.V1ConfigMap]:
try:
return cast(api_client.V1ConfigMap,
api_core.read_namespaced_config_map(f"{spec.name}-initconf", spec.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_initmysql(self) -> typing.Optional[api_client.V1ConfigMap]:
try:
return cast(api_client.V1ConfigMap,
api_core.read_namespaced_config_map(f"{self.name}-initmysql", self.namespace))
except ApiException as e:
if e.status == 404:
return None
raise
def get_metrics_monitor(self) :#-> typing.Optional[api_customobj.___]:
try:
return api_customobj.get_namespaced_custom_object(
"monitoring.coreos.com", "v1", self.namespace, "servicemonitors", self.name)
except ApiException as e:
if e.status == 404:
return None
raise
def get_service_monitor(self, name: str) :#-> typing.Optional[api_customobj.___]:
try:
return api_customobj.get_namespaced_custom_object(
"monitoring.coreos.com", "v1", self.namespace, "servicemonitors", name)
except ApiException as e:
if e.status == 404:
return None
raise
def _get_status_field(self, field: str) -> typing.Any:
return cast(str, self.status.get(field))
def _set_status_field(self, field: str, value: typing.Any) -> None:
obj = self._get(self.namespace, self.name)
if "status" not in obj:
patch = {"status": {}}
else:
patch = {"status": obj["status"]}
patch["status"][field] = value
self.obj = self._patch_status(self.namespace, self.name, patch)
def set_cluster_status(self, cluster_status) -> None:
self._set_status_field("cluster", cluster_status)
def get_cluster_status(self, field=None): # TODO -> dict, remove field
status = self._get_status_field("cluster")
if status and field:
return status.get(field)
return status
def set_status(self, status) -> None:
obj = cast(dict, self._get(self.namespace, self.name))
if "status" not in obj:
obj["status"] = status
else:
obj["status"] = utils.merge_patch_object(obj["status"], status)
self.obj = self._patch_status(self.namespace, self.name, obj)
def update_cluster_fqdn(self) -> None:
fqdn_template = fqdn.idc_service_fqdn_template(self.parsed_spec)
patch = {
"metadata": {
"annotations": {
fqdn.FQDN_ANNOTATION_NAME: fqdn_template
}
}
}
self.obj = self._patch(self.namespace, self.name, patch)
def update_cluster_info(self, info: dict) -> None:
"""
Set metadata about the cluster as an annotation.
Information consumed by ourselves to manage the cluster should go here.
Information consumed by external observers should go in status.
"""
patch = {
"metadata": {
"annotations": {
"mysql.oracle.com/cluster-info": json.dumps(info)
}
}
}
self.obj = self._patch(self.namespace, self.name, patch)
# TODO remove field
def get_cluster_info(self, field: typing.Optional[str] = None) -> typing.Optional[dict]:
if self.annotations:
info = self.annotations.get("mysql.oracle.com/cluster-info", None)
if info:
info = json.loads(info)
if field:
return info.get(field)
return info
return None
def set_create_time(self, time: datetime.datetime) -> None:
self._set_status_field("createTime", time.replace(
microsecond=0).isoformat()+"Z")
def get_create_time(self) -> Optional[datetime.datetime]:
dt = self._get_status_field("createTime")
if dt:
return datetime.datetime.fromisoformat(dt.rstrip("Z"))
return None
@property
def ready(self) -> bool:
return cast(bool, self.get_create_time())
def set_last_known_quorum(self, members):
# TODO
pass
def get_last_known_quorum(self):
# TODO
return None
def _add_finalizer(self, fin: str) -> None:
"""
Add the named token to the list of finalizers for the cluster object.
The cluster object will be blocked from deletion until that token is
removed from the list (remove_finalizer).
"""
patch = {
"metadata": {
"finalizers": [fin]
}
}
self.obj = self._patch(self.namespace, self.name, patch)
def _remove_finalizer(self, fin: str) -> None:
# TODO strategic merge patch not working here??
#patch = { "metadata": { "$deleteFromPrimitiveList/finalizers": [fin] }}
patch = {"metadata": {"finalizers": [
f for f in self.metadata["finalizers"] if f != fin]}}
self.obj = self._patch(self.namespace, self.name, patch)
def add_cluster_finalizer(self) -> None:
self._add_finalizer("mysql.oracle.com/cluster")
def remove_cluster_finalizer(self, cluster_body: dict = None) -> None:
print("remove_cluster_finalizer")
self._remove_finalizer("mysql.oracle.com/cluster")
if cluster_body:
# modify the JSON data used internally by kopf to update its finalizer list
cluster_body["metadata"]["finalizers"].remove(
"mysql.oracle.com/cluster")
def set_operator_version(self, version: str) -> None:
v = self.operator_version
if v != version:
patch = {"metadata": {"annotations": {"mysql.oracle.com/mysql-operator-version": version}}}
# TODO store the current server/router version + timestamp
# store previous versions in a version history log
self.obj = self._patch(self.namespace, self.name, patch)
@property
def operator_version(self) -> Optional[str]:
return self.metadata.get("mysql.oracle.com/mysql-operator-version")
def set_current_version(self, version: str) -> None:
v = self.status.get("version")
if v != version:
patch = {"status": {"version": version}}
# TODO store the current server/router version + timestamp
# store previous versions in a version history log
self.obj = self._patch_status(self.namespace, self.name, patch)
# TODO store last known majority and use it for diagnostics when there are
# unconnectable pods
def tls_has_crl(self) -> bool:
if self.parsed_spec.tlsUseSelfSigned:
return False
# XXX TODO fixme
return False
def router_tls_exists(self) -> bool:
if self.parsed_spec.tlsUseSelfSigned:
return False
try:
api_core.read_namespaced_secret(self.parsed_spec.router.tlsSecretName, self.namespace)
except ApiException as e:
if e.status == 404:
return False
raise
return True
def log_cluster_info(self, logger: Logger) -> None:
logger.info(f"InnoDB Cluster {self.namespace}/{self.name} Edition({self.parsed_spec.edition}) Edition")
logger.info(f"\tServer Image:\t{self.parsed_spec.mysql_image} / {self.parsed_spec.mysql_image_pull_policy}")
logger.info(f"\tRouter Image:\t{self.parsed_spec.router_image} / {self.parsed_spec.router_image_pull_policy}")
logger.info(f"\tSidecar Image:\t{self.parsed_spec.operator_image} / {self.parsed_spec.operator_image_pull_policy}")
logger.info(f"\tImagePullPolicy:\t{self.parsed_spec.imagePullPolicy}")
logger.info(f"\tImageRepository:\t{self.parsed_spec.imageRepository}")
logger.info(f"\tBase ServerId:\t{self.parsed_spec.baseServerId}")
logger.info(f"\tLog collection:\t{self.parsed_spec.logs.collect}")
logger.info(f"\tRouter instances:\t{self.parsed_spec.router.instances}")
if self.parsed_spec.logs.collect:
logger.info(f"\tLog collector image:\t{self.parsed_spec.logs.collector.image_name}")
if self.parsed_spec.metrics:
logger.info(f"\tMetrics enabled:\t{self.parsed_spec.metrics.enable}")
logger.info(f"\tMetrics monitor:\t{self.parsed_spec.metrics.monitor}")
if self.parsed_spec.metrics.enable:
logger.info(f"\tMetrics image:\t{self.parsed_spec.metrics.image}")
logger.info(f"\tBackup profiles:\t{len(self.parsed_spec.backupProfiles)}")
logger.info(f"\tBackup schedules:\t{len(self.parsed_spec.backupSchedules)}")
self.log_tls_info(logger)
def log_tls_info(self, logger: Logger) -> None:
logger.info(f"\tServer.TLS.useSelfSigned:\t{self.parsed_spec.tlsUseSelfSigned}")
if not self.parsed_spec.tlsUseSelfSigned:
logger.info(f"\tServer.TLS.tlsCASecretName:\t{self.parsed_spec.tlsCASecretName}")
logger.info(f"\tServer.TLS.tlsSecretName:\t{self.parsed_spec.tlsSecretName}")
logger.info(f"\tTLS.keys :\t{list(self.get_ca_and_tls().keys())}")
router_tls_exists = self.router_tls_exists()
logger.info(f"\tRouter.TLS exists :\t{router_tls_exists}")
if router_tls_exists:
logger.info(f"\tRouter.TLS.tlsSecretName:\t{self.parsed_spec.router.tlsSecretName}")
def get_all_clusters(ns: str = None) -> typing.List[InnoDBCluster]:
if ns is None:
objects = cast(dict, api_customobj.list_cluster_custom_object(
consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL))
else:
objects = cast(dict, api_customobj.list_namespaced_custom_object(
consts.GROUP, consts.VERSION, ns, consts.INNODBCLUSTER_PLURAL))
return [InnoDBCluster(o) for o in objects["items"]]
class MySQLPod(K8sInterfaceObject):
logger: Optional[Logger] = None
def __init__(self, pod: client.V1Pod):
super().__init__()
self.pod: client.V1Pod = pod
self.port = 3306
self.xport = 33060
self.admin_account = None
@overload
@classmethod
def from_json(cls, pod: str) -> 'MySQLPod':
...
@overload
@classmethod
def from_json(cls, pod: Body) -> 'MySQLPod':
...
@classmethod
def from_json(cls, pod) -> 'MySQLPod':
class Wrapper:
def __init__(self, data):
self.data = json.dumps(data)
if not isinstance(pod, str):
pod = eval(str(pod))
return MySQLPod(cast(client.V1Pod, api_core.api_client.deserialize(
Wrapper(pod), client.V1Pod)))
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f"<MySQLPod {self.name}>"
@classmethod
def read(cls, name: str, ns: str) -> 'MySQLPod':
return MySQLPod(cast(client.V1Pod,
api_core.read_namespaced_pod(name, ns)))
@property
def metadata(self) -> api_client.V1ObjectMeta:
return cast(api_client.V1ObjectMeta, self.pod.metadata)
def self_ref(self, field_path: Optional[str] = None) -> dict:
ref = {
"apiVersion": self.pod.api_version,
"kind": self.pod.kind,
"name": self.name,
"namespace": self.namespace,
"resourceVersion": self.metadata.resource_version,
"uid": self.metadata.uid
}
if field_path:
ref["fieldPath"] = field_path
return ref
@property
def status(self) -> api_client.V1PodStatus:
return cast(api_client.V1PodStatus, self.pod.status)
@property
def phase(self) -> str:
return cast(str, self.status.phase)
@property
def deleting(self) -> bool:
return self.metadata.deletion_timestamp is not None
@property
def spec(self) -> api_client.V1PodSpec:
return cast(api_client.V1PodSpec, self.pod.spec)
@property
def name(self) -> str:
return cast(str, self.metadata.name)
@property
def index(self) -> int:
return int(self.name.rpartition("-")[-1])
@property
def namespace(self) -> str:
return cast(str, self.metadata.namespace)
@property
def cluster_name(self) -> str:
return self.pod.metadata.labels["mysql.oracle.com/cluster"]
@property
def instance_type(self) -> str:
if "mysql.oracle.com/instance-type" in self.pod.metadata.labels:
return self.pod.metadata.labels["mysql.oracle.com/instance-type"]
else:
# With old clusters the label may be missing
return "group-member"
@property
def read_replica_name(self) -> str:
return self.pod.metadata.labels["mysql.oracle.com/read-replica"]
@property
def address(self) -> str:
return self.name+"."+cast(str, self.spec.subdomain)
@property
def address_fqdn(self) -> str:
return fqdn.pod_fqdn(self, self.logger)
@property
def pod_ip_address(self) -> str:
return self.pod.status.pod_ip
@property
def endpoint(self) -> str:
return self.address_fqdn + ":" + str(self.port)
@property
def xendpoint(self) -> str:
return self.address_fqdn + ":" + str(self.xport)
@property
def endpoint_co(self) -> dict:
if not self.admin_account:
self.admin_account = self.get_cluster().get_admin_account()
return {"scheme": "mysql",
"user": self.admin_account[0],
"password": self.admin_account[1],
"host": self.address_fqdn,
"port": self.port}
@property
def endpoint_url_safe(self) -> dict:
if not self.admin_account:
self.admin_account = self.get_cluster().get_admin_account()
return {"scheme": "mysql",
"user": self.admin_account[0],
"password": "****",
"host": self.address_fqdn,
"port": self.port}
@property
def xendpoint_co(self) -> dict:
if not self.admin_account:
self.admin_account = self.get_cluster().get_admin_account()
return {"scheme": "mysqlx",
"user": self.admin_account[0],
"password": self.admin_account[1],
"host": self.address_fqdn,
"port": self.xport}
def reload(self) -> None:
self.pod = cast(api_client.V1Pod, api_core.read_namespaced_pod(
self.name, self.namespace))
def owner_reference(self, api_version, kind) -> typing.Optional[api_client.V1OwnerReference]:
for owner in self.metadata.owner_references:
if owner.api_version == api_version and owner.kind == kind:
return owner
return None
def get_cluster(self) -> typing.Optional[InnoDBCluster]:
try:
return InnoDBCluster.read(self.namespace, self.cluster_name)
except ApiException as e:
print(
f"Could not get cluster {self.namespace}/{self.cluster_name}: {e}")
if e.status == 404:
return None
raise
def check_condition(self, cond_type: str) -> typing.Optional[bool]:
if self.status and self.status.conditions:
for c in self.status.conditions:
if c.type == cond_type:
return c.status == "True"
return None
def check_containers_ready(self) -> typing.Optional[bool]:
return self.check_condition("ContainersReady")
def check_container_ready(self, container_name: str) -> typing.Optional[bool]:
if self.status.container_statuses:
for cs in self.status.container_statuses:
if cs.name == container_name:
return cs.ready
return None
def get_container_restarts(self, container_name: str) -> typing.Optional[int]:
if self.status.container_statuses:
for cs in self.status.container_statuses:
if cs.name == container_name:
return cs.restart_count
return None
def get_member_readiness_gate(self, gate: str) -> typing.Optional[bool]:
return self.check_condition(f"mysql.oracle.com/{gate}")
def update_member_readiness_gate(self, gate: str, value: bool) -> None:
now = utils.isotime()
if self.check_condition(f"mysql.oracle.com/{gate}") != value:
changed = True
else:
changed = False
patch = {"status": {
"conditions": [{
"type": f"mysql.oracle.com/{gate}",
"status": "True" if value else "False",
"lastProbeTime": '%s' % now,
"lastTransitionTime": '%s' % now if changed else None
}]}}
#print(f"Updating readiness gate {gate} with patch {patch}")
self.pod = cast(api_client.V1Pod, api_core.patch_namespaced_pod_status(
self.name, self.namespace, body=patch))
# TODO remove field
def get_membership_info(self, field: str = None) -> typing.Optional[dict]:
if self.metadata.annotations:
info = self.metadata.annotations.get("mysql.oracle.com/membership-info", None)
if info:
info = json.loads(info)
if info and field:
return info.get(field)
return info
return None
def update_membership_status(self, member_id: str, role: str, status: str,
view_id: str, version: str,
joined: bool = False) -> None:
now = utils.isotime()
last_probe_time = now
info = self.get_membership_info() or {}
if not info or info.get("role") != role or info.get("status") != status or info.get("groupViewId") != view_id or info.get("memberId") != member_id:
last_transition_time = now
else:
last_transition_time = info.get("lastTransitionTime")
info.update({
"memberId": member_id,
"lastTransitionTime": last_transition_time,
"lastProbeTime": last_probe_time,
"groupViewId": view_id,
"status": status,
"version": version,
"role": role
})
if joined:
info["joinTime"] = now
patch = {
"metadata": {
"labels": {
"mysql.oracle.com/cluster-role": role if status == "ONLINE" else None
},
"annotations": {
"mysql.oracle.com/membership-info": json.dumps(info)
}
}
}
self.pod = cast(api_client.V1Pod, api_core.patch_namespaced_pod(
self.name, self.namespace, patch))
def add_member_finalizer(self) -> None:
self._add_finalizer("mysql.oracle.com/membership")
def remove_member_finalizer(self, pod_body: Body = None) -> None:
self._remove_finalizer("mysql.oracle.com/membership", pod_body)
def _add_finalizer(self, fin: str) -> None:
"""
Add the named token to the list of finalizers for the Pod.
The Pod will be blocked from deletion until that token is
removed from the list (remove_finalizer).
"""
patch = {"metadata": {"finalizers": [fin]}}
self.obj = api_core.patch_namespaced_pod(
self.name, self.namespace, body=patch)
def _remove_finalizer(self, fin: str, pod_body: Body = None) -> None:
patch = {"metadata": {"$deleteFromPrimitiveList/finalizers": [fin]}}
self.obj = api_core.patch_namespaced_pod(
self.name, self.namespace, body=patch)
if pod_body:
# modify the JSON data used internally by kopf to update its finalizer list
if fin in pod_body["metadata"]["finalizers"]:
pod_body["metadata"]["finalizers"].remove(fin)