mysqloperator/backup_main.py (381 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # import json import sys import os import multiprocessing import requests import argparse from urllib.parse import quote as urlquote import mysqlsh from .controller import consts, utils, config, shellutils from .controller import storage_api from .controller.backup.backup_api import MySQLBackup, DumpInstance, Snapshot, MEB, BackupProfile from .controller.backup import backup_objects from .controller.kubeutils import client as api_client, api_core, ApiException from .controller.innodbcluster.cluster_api import InnoDBCluster import logging from typing import Optional, cast BACKUP_OCI_USER_NAME = "OCI_USER_NAME" BACKUP_OCI_FINGERPRINT = "OCI_FINGERPRINT" BACKUP_OCI_TENANCY = "OCI_TENANCY" BACKUP_OCI_REGION = "OCI_REGION" BACKUP_OCI_PASSPHRASE = "OCI_PASSPHRASE" OCI_CONFIG_NAME = "OCI_CONFIG_NAME" OCI_API_KEY_NAME = "OCI_API_KEY_NAME" OCI_CONFIG_FILE_NAME = "config" def get_secret(secret_name: str, namespace: str, logger: logging.Logger) -> dict: if not secret_name: raise Exception(f"No secret provided") ret = {} try: secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(secret_name, namespace)) for k, v in secret.data.items(): ret[k] = utils.b64decode(v) except Exception: raise Exception(f"Secret {secret_name} in namespace {namespace} cannot be found") return ret def get_dir_size(d): size = 0 for dirpath, dirnames, filenames in os.walk(d): for f in filenames: size += os.path.getsize(os.path.join(dirpath, f)) return size def execute_dump_instance(backup_source: dict, profile: DumpInstance, backupdir: Optional[str], backup_name: str, logger: logging.Logger): shell = mysqlsh.globals.shell util = mysqlsh.globals.util start = utils.isotime() options = profile.dumpOptions.copy() if "threads" not in options: options["threads"] = multiprocessing.cpu_count() if profile.storage.ociObjectStorage: oci_config = create_oci_config_file_from_envs(os.environ, logger) options["osBucketName"] = profile.storage.ociObjectStorage.bucketName options["ociConfigFile"] = oci_config["config"] options["ociProfile"] = oci_config["profile"] logger.info(f"options={options}") if profile.storage.ociObjectStorage.prefix: output = os.path.join( profile.storage.ociObjectStorage.prefix, backup_name) else: output = backup_name elif profile.storage.s3: options["s3BucketName"] = profile.storage.s3.bucketName options["s3Profile"] = profile.storage.s3.profile if profile.storage.s3.endpoint: options["s3EndpointOverride"] = profile.storage.s3.endpoint if profile.storage.s3.prefix: output = os.path.join( profile.storage.s3.prefix, backup_name) else: output = backup_name elif profile.storage.azure: options["azureContainerName"] = profile.storage.azure.containerName if profile.storage.azure.prefix: output = os.path.join( profile.storage.azure.prefix, backup_name) else: output = backup_name else: output = os.path.join(backupdir, backup_name) logger.info( f"dump_instance starting: output={output} options={options} source={backup_source['user']}@{backup_source['host']}:{backup_source['port']}") try: shell.connect(backup_source) except mysqlsh.Error as e: logger.error( f"Could not connect to {backup_source['host']}:{backup_source['port']}: {e}") raise try: util.dump_instance(output, options) except mysqlsh.Error as e: logger.error(f"dump_instance failed: {e}") raise # TODO get backup size and other stats from the dump cmd itself if profile.storage.ociObjectStorage: tenancy = [line.split("=")[1].strip() for line in open( options["ociConfigFile"], "r").readlines() if line.startswith("tenancy")][0] info = { "method": "dump-instance/oci-bucket", "source": f"{backup_source['user']}@{backup_source['host']}:{backup_source['port']}", "bucket": profile.storage.ociObjectStorage.bucketName, "ociTenancy": tenancy } elif profile.storage.s3: info = { "method": "dump-instance/s3", "source": f"{backup_source['user']}@{backup_source['host']}:{backup_source['port']}", "bucket": profile.storage.s3.bucketName, } elif profile.storage.azure: info = { "method": "dump-instance/azure-blob-storage", "source": f"{backup_source['user']}@{backup_source['host']}:{backup_source['port']}", "container": profile.storage.azure.containerName, } elif profile.storage.persistentVolumeClaim: fsinfo = os.statvfs(backupdir) gb_avail = (fsinfo.f_frsize * fsinfo.f_bavail) / (1024*1024*1024) backup_size = get_dir_size(output) / (1024*1024*1024) info = { "method": "dump-instance/volume", "source": f"{backup_source['user']}@{backup_source['host']}:{backup_source['port']}", "spaceAvailable": f"{gb_avail:.4}G", "size": f"{backup_size:.4}G" } else: assert False logger.info(f"dump_instance finished successfully") return info def execute_clone_snapshot(backup_source: dict, profile: Snapshot, backupdir: Optional[str], backup_name: str, logger: logging.Logger) -> dict: ... def execute_meb(backup: MySQLBackup, backup_source: dict, backup_name: str, logger: logging.Logger) -> dict: cert = ("/tls/client.pem", "/tls/client.key") ca = "/tls/ca.pem" backup_obj = MySQLBackup.read(backup.name, backup.namespace) profile = backup_obj.get_profile().meb request = { "spec": profile, "source": backup_source } if "s3" in profile.storage: request["secret"] = get_secret(profile.storage["s3"]["credentials"], backup.namespace, logger) elif "oci" in profile.storage: request["secret"] = get_secret(profile.storage["oci"]["credentials"], backup.namespace, logger) else: raise Exception("Need either meb or s3 storage specification") request["incremental"] = backup.parsed_spec.incremental request["incremental_base"] = backup.parsed_spec.incrementalBase request_s = json.dumps(request, default=lambda o: o.__dict__, indent=4) name = urlquote(backup_name, safe="") url = f"https://{backup_source['host']}:4443/backup/{name}" logger.info(f"Triggering {url}") response = requests.post(url, data=request_s, cert=cert, verify=False) #, verify=ca) print(response.content.decode()) if response.status_code != 200: raise Exception(f"Failed to take backup. Backup daemon returned {response.status_code}: {response.content}") info = { "method": "meb", "source": f"{backup_source['user']}@{backup_source['host']}:{backup_source['port']}", #"spaceAvailable": f"{gb_avail:.4}G", #"size": f"{backup_size:.4}G" } return info def pick_source_instance(cluster: InnoDBCluster, backup: MySQLBackup, profile: BackupProfile, logger: logging.Logger) -> dict: mysql = mysqlsh.mysql primary = None best_secondary = None best_secondary_applier_queue_size = None for pod in cluster.get_pods(): if pod.deleting: continue try: with shellutils.DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger, max_tries=3)) as dba: if backup.parsed_spec.incremental and profile.meb: # If an incremental backup is requested we schedule on the # same host as last full backup. This may be a busy primary. sql = """WITH last_full AS ( SELECT MAX(consistency_time_utc) AS last_full_time FROM mysql.backup_history WHERE backup_type = "FULL" AND exit_state = "SUCCESS" ) SELECT gm.MEMBER_HOST backup_host, bh.server_uuid backup_uuid, @@server_uuid this_uuid, MEMBER_STATE, exit_state FROM mysql.backup_history bh JOIN last_full ON bh.consistency_time_utc >= last_full.last_full_time LEFT JOIN performance_schema.replication_group_members gm ON gm.MEMBER_ID = bh.server_uuid WHERE bh.exit_state = "SUCCESS" ORDER BY bh.consistency_time_utc DESC""" try: # TODO - if we are in a cronjob we may want to check # amount of incremental backups or have a # fallback for taking full backups incremental_info = dba.session.run_sql(sql).fetch_one_object() if incremental_info['backup_uuid'] == incremental_info['this_uuid']: # By definition we are online, thus not checking MEMBER_STATE return pod.endpoint_co else: continue except mysqlsh.Error as e: logger.warning( f"Could not get MEB backup status from {pod}: {e}") continue try: tmp = dba.get_cluster().status({"extended": 1})["defaultReplicaSet"] cluster_status = tmp["status"] self_uuid = dba.session.run_sql("select @@server_uuid").fetch_one()[0] member_status = [x for x in tmp["topology"].values() if x["memberId"] == self_uuid][0] except mysqlsh.Error as e: logger.warning( f"Could not get cluster status from {pod}: {e}") continue applier_queue_size = dba.session.run_sql( "SELECT COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE" " FROM performance_schema.replication_group_member_stats" " WHERE member_id = @@server_uuid").fetch_one()[0] except mysqlsh.Error as e: logger.warning(f"Could not connect to {pod}: {e}") continue if backup.parsed_spec.incremental and profile.meb: raise Exception( f"No instances available to backup from in cluster {cluster.name}. Inremental backup has to be taken from the same host as the last full backup.") logger.info( f"Cluster status from {pod} is {cluster_status}, member_status={member_status} applier_queue_size={applier_queue_size}") if not cluster_status.startswith("OK") or member_status["memberState"] != "ONLINE": continue if member_status["memberRole"] == "SECONDARY": if not best_secondary or applier_queue_size < best_secondary_applier_queue_size: best_secondary = pod.endpoint_co best_secondary_applier_queue_size = applier_queue_size if applier_queue_size == 0: break else: primary = pod.endpoint_co if best_secondary: return best_secondary elif primary: return primary raise Exception( f"No instances available to backup from in cluster {cluster.name}") def do_backup(backup : MySQLBackup, job_name: str, start, backupdir: Optional[str], logger: logging.Logger) -> dict: logger.info( f"Starting backup of {backup.namespace}/{backup.parsed_spec.clusterName} profile={backup.parsed_spec.backupProfileName} backupdir={backupdir}") cluster = backup.get_cluster() profile = backup.get_profile() # select bh.backup_type, MEMBER_HOST from performance_schema.replication_group_members gm join mysql.backup_history bh on gm.MEMBER_ID = bh.server_uuid; # backup_source = pick_source_instance(cluster, backup, profile, logger) if profile.meb: return execute_meb(backup, backup_source, job_name, logger) elif profile.dumpInstance: return execute_dump_instance(backup_source, profile.dumpInstance, backupdir, job_name, logger) elif profile.snapshot: return execute_clone_snapshot(backup_source, profile.snapshot, backupdir, job_name, logger) else: raise Exception(f"Invalid backup method in profile {profile.name}") def create_oci_config_file_from_envs(env_vars: dict, logger : logging.Logger) -> dict: backup_oci_user_name = env_vars.get(BACKUP_OCI_USER_NAME) backup_oci_fingerprint = env_vars.get(BACKUP_OCI_FINGERPRINT) backup_oci_tenancy = env_vars.get(BACKUP_OCI_TENANCY) backup_oci_region = env_vars.get(BACKUP_OCI_REGION) backup_oci_passphrase = env_vars.get(BACKUP_OCI_PASSPHRASE) oci_config_name = env_vars.get(OCI_CONFIG_NAME) oci_api_key_name = env_vars.get(OCI_API_KEY_NAME) if backup_oci_user_name is None: raise Exception(f"No env var {BACKUP_OCI_USER_NAME} passed") elif not backup_oci_user_name: raise Exception(f"Empty value for {BACKUP_OCI_USER_NAME} passed") if backup_oci_fingerprint is None: raise Exception(f"No env var {BACKUP_OCI_FINGERPRINT} passed") elif not backup_oci_fingerprint: raise Exception(f"Empty value for {BACKUP_OCI_FINGERPRINT} passed") if backup_oci_tenancy is None: raise Exception(f"No env var {BACKUP_OCI_TENANCY} passed") elif not backup_oci_tenancy: raise Exception(f"Empty value for {BACKUP_OCI_TENANCY} passed") if backup_oci_region is None: raise Exception(f"No env var {BACKUP_OCI_REGION} passed") elif not backup_oci_region: raise Exception(f"Empty value for {BACKUP_OCI_REGION} passed") if backup_oci_passphrase is None: raise Exception(f"No env var {BACKUP_OCI_PASSPHRASE} passed") if oci_config_name is None: raise Exception(f"No env var {OCI_CONFIG_NAME} passed") elif not oci_config_name: raise Exception(f"Empty value for {OCI_CONFIG_NAME} passed") elif os.path.isfile(oci_config_name): raise Exception(f"{oci_api_key_name} already exists, won't overwrite") if oci_api_key_name is None: raise Exception(f"No env var {OCI_API_KEY_NAME} passed") elif not oci_api_key_name: raise Exception(f"Empty value for {OCI_API_KEY_NAME} passed") elif not os.path.isfile(oci_api_key_name): raise Exception(f"{oci_api_key_name} is not a file") import configparser config_profile = "DEFAULT" config = configparser.ConfigParser() config[config_profile] = { "user" : backup_oci_user_name, "fingerprint" : backup_oci_fingerprint, "tenancy": backup_oci_tenancy, "region": backup_oci_region, "passphrase": backup_oci_passphrase, "key_file" : oci_api_key_name, } with open(oci_config_name, 'w') as configfile: config.write(configfile) return { "config": oci_config_name, "profile" : config_profile, } def command_do_create_backup(namespace, name, job_name: str, backup_dir: str, logger: logging.Logger, debug) -> bool: start = utils.isotime() if logger: logger.info(f"Loading up MySQLBackup object {namespace}/{name}") backup = None try: backup = MySQLBackup.read(name=name, namespace=namespace) backup.set_started(job_name, start) info = do_backup(backup, job_name, start, backup_dir, logger) backup.set_succeeded(job_name, start, utils.isotime(), info) except Exception as e: import traceback traceback.print_exc() logger.error(f"Backup failed with an exception: {e}") if backup: backup.set_failed(job_name, start, utils.isotime(), e) if debug: import time logger.info("Waiting for 1h...") time.sleep(60*60) return False return True def command_create_backup_object(namespace, cluster_name, schedule_name: str, logger: logging.Logger) -> bool: cluster = InnoDBCluster.read(namespace, cluster_name) if not cluster: print(f"Could not load cluster object {namespace}/{cluster_name}") return False for schedule in cluster.parsed_spec.backupSchedules: if schedule.name == schedule_name: backup_object = None backup_job_name = backup_objects.backup_job_name(cluster_name, schedule_name) if schedule.backupProfileName: backup_profile_name = schedule.backupProfileName backup_object = backup_objects.prepare_mysql_backup_object_by_profile_name(backup_job_name, cluster_name, backup_profile_name) elif cluster.spec['backupSchedules']: for raw_schedule in cluster.spec['backupSchedules']: if raw_schedule['name'] == schedule_name: backup_profile = raw_schedule['backupProfile'] backup_object = backup_objects.prepare_mysql_backup_object_by_profile_object(backup_job_name, cluster_name, backup_profile) if backup_object: logger.info(f"Creating backup job {backup_job_name} : {utils.dict_to_json_string(backup_object)}") return MySQLBackup.create(namespace, backup_object) is not None logger.error(f"Could not find schedule named {schedule_name} of cluster {cluster_name} in namespace {namespace}") return False def main(argv): import datetime, time parser = argparse.ArgumentParser(description = "MySQL InnoDB Cluster Instance Sidecar Container") parser.add_argument('--debug', type = int, nargs="?", const = 1, default = 0, help = "Debug") parser.add_argument('--namespace', type = str, default = "", help = "Namespace") parser.add_argument('--command', type = str, default = "", help = "Command") parser.add_argument('--backup-object-name', type = str, default = "", help = "Backup Object Name") parser.add_argument('--job-name', type = str, default = "", help = "Job name") parser.add_argument('--backup-dir', type = str, default = os.environ.get('DUMP_MOUNT_PATH', ""), help = "Backup Directory") parser.add_argument('--cluster-name', type = str, default = "", help = "Cluster Name") parser.add_argument('--schedule-name', type = str, default = "", help = "Schedule Name") args = parser.parse_args(argv) logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(levelname)s] [%(name)s] %(message)s', datefmt="%Y-%m-%dT%H:%M:%S") debug = args.debug # suppress logging from other libs for name in ['kubernetes']: logger = logging.getLogger(name) logger.propagate = debug if not debug: logger.handlers[:] = [logging.NullHandler()] logger = logging.getLogger("backup") ts = datetime.datetime.fromtimestamp( os.stat(__file__).st_mtime).isoformat() command = args.command logger.info(f"[BACKUP] command={command} version={config.OPERATOR_VERSION} timestamp={ts}") print(f"Command is {command}") ret = False if command == "execute-backup": import subprocess subprocess.run(["ls", "-la", "/"]) subprocess.run(["ls", "-l", "/.oci"]) namespace = args.namespace backup_object_name = args.backup_object_name job_name = args.job_name backup_dir = args.backup_dir logger.info(f"backupdir={backup_dir}") ret = command_do_create_backup(namespace, backup_object_name, job_name, backup_dir, logger, debug) elif command == "create-backup-object": namespace = args.namespace cluster_name = args.cluster_name schedule_name = args.schedule_name ret = command_create_backup_object(namespace, cluster_name, schedule_name, logger) else: raise Exception(f"Unknown command {command}") logger.info(f"Command {command} finished with code {ret}") return 0 if ret == True else 1