mysqloperator/controller/backup/backup_api.py (320 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import yaml from os import execl from logging import Logger from typing import List, Optional, cast from .. import consts from .. api_utils import dget_dict, dget_str, dget_int, dget_bool, dget_list, ApiSpecError from .. kubeutils import api_core, api_apps, api_customobj, ApiException from .. storage_api import StorageSpec from .. utils import merge_patch_object from .. innodbcluster import cluster_api class Snapshot: def __init__(self): self.storage: Optional[StorageSpec] = None def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: self.storage.add_to_pod_spec(pod_spec, container_name) def parse(self, spec: dict, prefix: str) -> None: storage = dget_dict(spec, "storage", prefix) self.storage = StorageSpec( ["ociObjectStorage", "s3", "azure", "persistentVolumeClaim"]) self.storage.parse(storage, prefix+".storage") def __str__(self) -> str: return f"Object Snapshot: storage={self.storage}" def __eq__(self, other : 'Snapshot') -> bool: assert other is None or isinstance(other, Snapshot) return (other is not None \ and self.storage == other.storage) class MEB: def __init__(self): self.storage: Optional[str] = None self.extra_options: Optional[list[str]] = None def parse(self, spec: dict, prefix: str) -> None: self.storage = dget_dict(spec, "storage", prefix) self.extra_options = dget_list(spec, "extraOptions", prefix, []) def __str__(self) -> str: return f"Object MEBInstance: storage={self.storage}" def __eq__(self, other: 'MEB') -> bool: assert other is None or isinstance(other, DumpInstance) return other is not None \ and self.storage == other.storage \ and self.extra_options == other.extra_options def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: cluster_name = pod_spec['metadata']['labels']['mysql.oracle.com/cluster'] patch = f""" spec: containers: - name: {container_name} volumeMounts: - mountPath: /tls name: mebtlsclient volumes: - name: mebtlsclient secret: defaultMode: 200 secretName: {cluster_name}-meb-tls """ merge_patch_object(pod_spec, yaml.safe_load(patch)) class DumpInstance: def __init__(self): self.dumpOptions: dict = {} # dict with options for dumpInstance() self.storage: Optional[StorageSpec] = None # StorageSpec self.options = {} def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: self.storage.add_to_pod_spec(pod_spec, container_name) def parse(self, spec: dict, prefix: str) -> None: self.dumpOptions = dget_dict(spec, "dumpOptions", prefix, {}) storage = dget_dict(spec, "storage", prefix) self.storage = StorageSpec() self.storage.parse(storage, prefix+".storage") def __str__(self) -> str: return f"Object DumpInstance: storage={self.storage}" def __eq__(self, other : 'DumpInstance') -> bool: assert other is None or isinstance(other, DumpInstance) return (other is not None \ and self.dumpOptions == other.dumpOptions \ and self.storage == other.storage) class BackupProfile: def __init__(self): self.name: str = "" self.dumpInstance: Optional[DumpInstance] = None self.snapshot: Optional[Snapshot] = None self.meb: Optional[MEB] = None self.podAnnotations: Optional[dict] = None self.podLabels: Optional[dict] = None def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: assert self.snapshot or self.dumpInstance or self.meb if self.snapshot: return self.snapshot.add_to_pod_spec(pod_spec, container_name) if self.dumpInstance: return self.dumpInstance.add_to_pod_spec(pod_spec, container_name) if self.meb: return self.meb.add_to_pod_spec(pod_spec, container_name) def parse(self, spec: dict, prefix: str, name_required: bool = True) -> None: self.name = dget_str(spec, "name", prefix, default_value= None if name_required else "") if "podAnnotations" in spec: self.podAnnotations = dget_dict(spec, "podAnnotations", prefix) if "podLabels" in spec: self.podLabels = dget_dict(spec, "podLabels", prefix) prefix += "." + self.name method_spec = dget_dict(spec, "dumpInstance", prefix, {}) if method_spec: self.dumpInstance = DumpInstance() self.dumpInstance.parse(method_spec, prefix+".dumpInstance") method_spec = dget_dict(spec, "snapshot", prefix, {}) if method_spec: self.snapshot = Snapshot() self.snapshot.parse(method_spec, prefix+".snapshot") method_spec = dget_dict(spec, "meb", prefix, {}) if method_spec: self.meb = MEB() self.meb.parse(method_spec, prefix+".meb") if self.dumpInstance and self.snapshot: # TODO MEB! raise ApiSpecError( f"Only one of dumpInstance or snapshot may be set in {prefix}") if not self.dumpInstance and not self.snapshot and not self.meb: raise ApiSpecError( f"One of dumpInstance, snapshot or meb must be set in a {prefix}") def __str__(self) -> str: return f"Object BackupProfile name={self.name} dumpInstance={self.dumpInstance} snapshot={self.snapshot} podAnnotations={self.podAnnotations} podLabels={self.podLabels}" def __eq__(self, other: 'BackupProfile') -> bool: assert other is None or isinstance(other, BackupProfile) return (other is not None \ and self.name == other.name \ and self.dumpInstance == other.dumpInstance \ and self.snapshot == other.snapshot \ and self.meb == other.meb) class BackupSchedule: def __init__(self, cluster_spec): self.cluster_spec: cluster_api.InnoDBClusterSpec = cluster_spec self.name: str = "" self.backupProfileName: Optional[str] = None self.backupProfile: Optional[BackupProfile] = None self.schedule: str = "" self.enabled: bool = False self.timeZone: str = "" self.deleteBackupData: bool = False # unused def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: assert self.backupProfile if self.backupProfile: return self.backupProfile.add_to_pod_spec(pod_spec, container_name) def parse(self, spec: dict, prefix: str, load_profile: bool = True) -> None: self.name = dget_str(spec, "name", prefix, default_value= "") self.deleteBackupData = dget_bool(spec, "deleteBackupData", prefix, default_value=False) self.enabled = dget_bool(spec, "enabled", prefix, default_value=False) self.backupProfileName = dget_str(spec, "backupProfileName", prefix, default_value= "") self.timeZone = dget_str(spec, "timeZone", prefix, default_value="") # marking timeZone with default_value None will make it non-optional self.schedule = dget_str(spec, "schedule", prefix) if not self.schedule: raise ApiSpecError(f"schedule not set in in a {prefix}") backup_profile = dget_dict(spec, "backupProfile", prefix, {}) if self.backupProfileName and backup_profile: print(f"Only one of backupProfileName or backupProfile must be set in {prefix}") raise ApiSpecError(f"Only one of backupProfileName or backupProfile must be set in {prefix}") if not self.backupProfileName and not backup_profile: print(f"One of backupProfileName or backupProfile must be set in {prefix}") raise ApiSpecError(f"One of backupProfileName or backupProfile must be set in {prefix}") if backup_profile: self.backupProfile = BackupProfile() self.backupProfile.parse(backup_profile, prefix + ".backupProfile", name_required= False) elif load_profile: self.backupProfile = self.cluster_spec.get_backup_profile(self.backupProfileName) if not self.backupProfile: print(f"Invalid backupProfileName '{self.backupProfileName}' in cluster {self.cluster_spec.namespace}/{self.cluster_spec.name}") raise ApiSpecError(f"Invalid backupProfileName '{self.backupProfileName}' in cluster {self.cluster_spec.namespace}/{self.cluster_spec.name}") def __str__(self) -> str: return f"Object BackupSchedule scheduleName={self.name} deleteBackupData={self.deleteBackupData} enabled={self.enabled} backupProfileName={self.backupProfileName} schedule={self.schedule} profile={self.backupProfile} timeZone={self.timeZone}" def __eq__(self, other : 'BackupSchedule') -> bool: assert other is None or isinstance(other, BackupSchedule) return (other is not None \ and self.cluster_spec.namespace == other.cluster_spec.namespace \ and self.cluster_spec.name == other.cluster_spec.name \ and self.name == other.name \ and self.backupProfileName == other.backupProfileName \ and self.backupProfile == other.backupProfile \ and self.schedule == other.schedule \ and self.deleteBackupData == other.deleteBackupData \ and self.timeZone == other.timeZone \ and self.enabled == other.enabled) class MySQLBackupSpec: def __init__(self, namespace: str, name: str, spec: dict): self.namespace = namespace self.name = name self.clusterName: str = "" self.backupProfileName: str = "" self.backupProfile: BackupProfile = None self.deleteBackupData: bool = False # unused self.timeZone: str = "" self.addTimestampToBackupDirectory: bool = True self.operator_image: str = "" self.operator_image_pull_policy: str = "" self.serviceAccountName: Optional[str] = None self.incremental: bool = False self.incrementalBase: str = "" self.parse(spec) def add_to_pod_spec(self, pod_spec: dict, container_name: str) -> None: assert self.backupProfile return self.backupProfile.add_to_pod_spec(pod_spec, container_name) def parse(self, spec: dict) -> Optional[ApiSpecError]: self.clusterName = dget_str(spec, "clusterName", "spec") self.backupProfileName = dget_str(spec, "backupProfileName", "spec", default_value="") self.backupProfile = self.parse_backup_profile(dget_dict(spec, "backupProfile", "spec", {}), "spec.backupProfile") self.deleteBackupData = dget_bool(spec, "deleteBackupData", "spec", default_value=False) self.timeZone = dget_str(spec, "timeZone", "spec", default_value="") #marking timeZone with default_value None will make it non-optional self.addTimestampToBackupDirectory = dget_bool(spec, "addTimestampToBackupDirectory", "spec", default_value=True) self.incremental = dget_bool(spec, "incremental", "spec", default_value=False) self.incrementalBase = dget_str(spec, "incrementalBase", "spec", default_value="last_backup") if self.backupProfileName and self.backupProfile: raise ApiSpecError("Only one of spec.backupProfileName or spec.backupProfile must be set") if not self.backupProfileName and not self.backupProfile: raise ApiSpecError("One of spec.backupProfileName or spec.backupProfile must be set") try: cluster = cluster_api.InnoDBCluster.read(self.namespace, self.clusterName) except ApiException as e: if e.status == 404: return ApiSpecError(f"Invalid clusterName {self.namespace}/{self.clusterName}") raise self.operator_image = cluster.parsed_spec.operator_image self.operator_image_pull_policy = cluster.parsed_spec.operator_image_pull_policy self.serviceAccountName = cluster.parsed_spec.serviceAccountName if self.backupProfileName: self.backupProfile = cluster.parsed_spec.get_backup_profile(self.backupProfileName) if not self.backupProfile: err_msg = f"Invalid backupProfileName '{self.backupProfileName}' in cluster {self.namespace}/{self.clusterName}" raise ApiSpecError(err_msg) if self.incremental: if not self.backupProfile.meb: raise ApiSpecError("Incremental Backup is only supported with a profile using MySQL Enterprise Backup") if self.incrementalBase not in ('last_backup', 'last_full_backup'): raise ApiSpecError("BackupBase must be last_backup or last_full_backup") return None def parse_backup_profile(self, profile: dict, prefix: str) -> Optional[BackupProfile]: if profile: profile_object = BackupProfile() profile_object.parse(profile, prefix) return profile_object return None class MySQLBackup: def __init__(self, backup: dict): self.obj: dict = backup # self.namespace and self.name here will call the getters, which in turn will # look into self.obj['metadata'] self.parsed_spec = MySQLBackupSpec( self.namespace, self.name, self.spec) def __str__(self) -> str: return f"{self.namespace}/{self.name}" def __repr__(self) -> str: return f"<MySQLBackup {self.name}>" def get_cluster(self): try: cluster = cluster_api.InnoDBCluster.read(self.namespace, self.cluster_name) except ApiException as e: if e.status == 404: return ApiSpecError(f"Invalid clusterName {self.namespace}/{self.cluster_name}") raise return cluster @classmethod def read(cls, name: str, namespace: str) -> 'MySQLBackup': return MySQLBackup(cast(dict, api_customobj.get_namespaced_custom_object( consts.GROUP, consts.VERSION, namespace, consts.MYSQLBACKUP_PLURAL, name))) @classmethod def create(cls, namespace: str, body: dict) -> Optional[dict]: try: return cast(dict, api_customobj.create_namespaced_custom_object( consts.GROUP, consts.VERSION, namespace, consts.MYSQLBACKUP_PLURAL, body)) except ApiException as exc: print(f"Exception {exc} when calling create_namespaced_custom_object({consts.GROUP}, {consts.VERSION}, {namespace}, {consts.MYSQLBACKUP_PLURAL} body={body}") return None assert 0 # "Uncaught exception/wrong code flow" @property def metadata(self) -> dict: return self.obj["metadata"] @property def spec(self) -> dict: return self.obj["spec"] @property def status(self) -> dict: if "status" in self.obj: return self.obj["status"] return {} @property def name(self) -> str: return self.metadata["name"] @property def namespace(self) -> str: return self.metadata["namespace"] @property def cluster_name(self) -> str: return self.parsed_spec.clusterName def get_profile(self) -> BackupProfile: if self.parsed_spec.backupProfile: return self.parsed_spec.backupProfile cluster = self.get_cluster() profile = cluster.parsed_spec.get_backup_profile(self.parsed_spec.backupProfileName) if not profile: raise Exception( f"Unknown backup profile {self.parsed_spec.backupProfileName} in cluster {self.namespace}/{self.parsed_spec.clusterName}") return profile def set_started(self, backup_name: str, start_time: str) -> None: patch = {"status": { "status": "Running", "startTime": start_time, "output": backup_name }} self.obj = cast(dict, api_customobj.patch_namespaced_custom_object_status( consts.GROUP, consts.VERSION, self.namespace, consts.MYSQLBACKUP_PLURAL, self.name, body=patch)) def set_succeeded(self, backup_name: str, start_time: str, end_time: str, info: dict) -> None: import dateutil.parser as dtp elapsed = dtp.isoparse(end_time) - dtp.isoparse(start_time) hours, seconds = divmod(elapsed.total_seconds(), 3600) minutes, seconds = divmod(seconds, 60) patch = {"status": { "status": "Completed", "startTime": start_time, "completionTime": end_time, "elapsedTime": f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}", "output": backup_name }} patch["status"].update(info) self.obj = cast(dict, api_customobj.patch_namespaced_custom_object_status( consts.GROUP, consts.VERSION, self.namespace, consts.MYSQLBACKUP_PLURAL, self.name, body=patch)) def set_failed(self, backup_name: str, start_time: str, end_time: str, error: Exception) -> None: import dateutil.parser as dtp elapsed = dtp.isoparse(end_time) - dtp.isoparse(start_time) hours, seconds = divmod(elapsed.total_seconds(), 3600) minutes, seconds = divmod(seconds, 60) patch = {"status": { "status": "Error", "startTime": start_time, "completionTime": end_time, "elapsedTime": f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}", "message": str(error), "output": backup_name }} self.obj = cast(dict, api_customobj.patch_namespaced_custom_object_status( consts.GROUP, consts.VERSION, self.namespace, consts.MYSQLBACKUP_PLURAL, self.name, body=patch))