mysqloperator/controller/backup/meb/meb_controller.py (276 lines of code) (raw):

# Copyright (c) 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import base64 import datetime import json import shutil import subprocess import sys import urllib.parse import http.client from urllib.parse import urlparse from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.hashes import SHA256 import threading from datetime import timedelta MYSQLBACKUP_BINARY = '/usr/bin/mysqlbackup' MYCNF_PATH = '/etc/my.cnf' class MebStorageLocalImage: def __init__(self, target_path): self.target_path = target_path def get_backup_args(self): return ['--backup-image='+self.target_path] class OCIRequest: def __init__(self, credentials): self.tenancy_id = credentials['tenancy'] self.user_id = credentials['user'] self.fingerprint = credentials['fingerprint'] self.private_key = credentials['privatekey'] self.region = credentials['region'] def request(self, host: str, request_target: str, method: str, request_body: dict) -> dict: private_key = serialization.load_pem_private_key( self.private_key.encode('utf-8'), password=None ) endpoint = f'https://{host}{request_target}' date_header = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') headers = { "Content-Type": "application/json", "date": date_header, "host": host, "opc-client-info": "public-objectstorage:0.1.0" } signing_string = f"(request-target): {method.lower()} {request_target}\n" \ f"host: {host}\n" \ f"date: {headers['date']}\n" \ f"opc-client-info: {headers['opc-client-info']}" # Sign the string signature = base64.b64encode( private_key.sign( signing_string.encode('utf-8'), padding.PKCS1v15(), SHA256() ) ).decode('utf-8') authorization_header = ( f'Signature version="1",' f'headers="(request-target) host date opc-client-info",' f'keyId="{self.tenancy_id}/{self.user_id}/{self.fingerprint}",' f'algorithm="rsa-sha256",' f'signature="{signature}"' ) headers['authorization'] = authorization_header data = json.dumps(request_body) if request_body else None parsed_url = urlparse(endpoint) conn = http.client.HTTPSConnection(parsed_url.netloc) path = parsed_url.path if parsed_url.query: path += "?" + parsed_url.query headers = headers or {} if data: headers['Content-Type'] = 'application/json' conn.request(method, path, body=data, headers=headers) response = conn.getresponse() if response.status not in (200, 204): print(f"Failed to do OCI request: {response.status}") print(response.read().decode()) raise Exception("Failed OCI API call") response_data = response.read() if response_data: return json.loads(response_data) return True class OCIObjectStorage: def __init__(self, oci: OCIRequest, bucket_name: str, namespace_name: str): self.oci = oci self.bucket_name = bucket_name self.namespace_name = namespace_name def create_par(self, name:str, object_name: str, access_type: str, expiry: timedelta) -> str: host = f'objectstorage.{self.oci.region}.oraclecloud.com' request_target = f"/n/{self.namespace_name}/b/{self.bucket_name}/p" expiration_time = (datetime.datetime.utcnow() + expiry).strftime('%Y-%m-%dT%H:%M:%SZ') request_body = { "name": name, "objectName": object_name, "accessType": access_type, "timeExpires": expiration_time } response = self.oci.request(host, request_target, "POST", request_body) return (f'https://{host}{response["accessUri"]}', response['id']) def delete_par(self, id: str) -> None: id = urllib.parse.quote(id) host = f'objectstorage.{self.oci.region}.oraclecloud.com' request_target = f"/n/{self.namespace_name}/b/{self.bucket_name}/p/{id}" self.oci.request(host, request_target, "DELETE", None) class MebStorageOCIPAR: def __init__(self, target_path, ocios: OCIObjectStorage): self.target_path = target_path self.ocios = ocios self.pars_to_delete = [] self.hidden = [] def clean(self): for par_id in self.pars_to_delete: try: self.ocios.delete_par(par_id) except Exception as exc: # Don't fail during cleanup print(f"Failed to delete PAR {par_id}: {exc}") def get_args(self): (par, par_id) = self.ocios.create_par("backup-par", self.target_path, "ObjectWrite", timedelta(hours=1)) par_url=par.replace(self.target_path, "") self.pars_to_delete.append(par_id) self.hidden.append(par_url) return [ '--backup-image=-', '--cloud-service=OCI', '--cloud-object='+self.target_path, '--cloud-par-url='+par.replace(self.target_path, "") ] def get_filter(self): return self.hidden class MebStorageOCIRestore: par_base_url: str = "" def __init__(self, par_base_url: str): self.par_base_url = par_base_url def get_args(self, source): return [ '--backup-image=-', '--cloud-service=OCI', '--cloud-par-url='+self.par_base_url+source, ] def get_filter(self): return [self.par_base_url] class MebStorageS3: def __init__(self, object_key:str, region:str, bucket:str, key_id:str, secret_access_key:str, host:str=None): self.opts = [ "--cloud-service=s3", "--cloud-aws-region="+region, "--cloud-access-key-id="+key_id, "--cloud-secret-access-key="+secret_access_key, "--cloud-bucket="+bucket, "--cloud-object-key="+object_key, "--backup-image=-" ] if host: self.opts.append("--cloud-host="+host) def get_args(self): return self.opts def get_filter(self): return None class MySQLEnterpriseBackup: def __init__(self, storage, auth_options: list, tmppath: str): self.auth_options = auth_options self.storage = storage self.tmppath = tmppath self.log = "" def _run_cmd(self, cmd: str, args=None): command = [ MYSQLBACKUP_BINARY, '--backup-dir='+self.tmppath ] if args: command += args if cmd != "copy-back-and-apply-log": command += self.storage.get_args() + self.auth_options command.append(cmd) def print_output(pipe, stream): hidden = self.storage.get_filter() try: for line in iter(pipe.readline, ''): if line: if hidden: # Avoid printing PAR URLs and similar into logs for term in hidden: line = line.replace(term, 'X' * len(term)) self.log += line stream.write(line) stream.flush() finally: pipe.close() # in order to pass password via stdin we got to also pipe stdout/stderr # through our code, if we end up using localroot (passwordless) we # might use the simpler subprocess.run ... process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, text=True ) process.stdin.close() stdout_thread = threading.Thread(target=print_output, args=(process.stdout, sys.stdout)) stderr_thread = threading.Thread(target=print_output, args=(process.stderr, sys.stderr)) stdout_thread.start() stderr_thread.start() stdout_thread.join() stderr_thread.join() error_code = process.wait() if error_code: raise RuntimeError(f"Running Backup failed: {error_code}") return True def backup(self, incremental=False, incremental_base='history:last_backup'): if incremental: args = [ '--incremental', '--incremental-base='+incremental_base, ] return self._run_cmd('backup-to-image', args) return self._run_cmd('backup-to-image') def extract(self, image: str): command = [ MYSQLBACKUP_BINARY, '--backup-dir='+self.tmppath, '--datadir=/var/lib/mysql' ] subprocess.run(command + self.storage.get_args(image) + ['image-to-backup-dir'], check=True) def restore(self, full_image: str, incrementals: list = ()): command = [ #'--defaults-file='+MYCNF_PATH, '--backup-dir='+self.tmppath, '--datadir=/var/lib/mysql' ] try: self._run_cmd('copy-back-and-apply-log', args=command+self.storage.get_args(full_image)) except FileNotFoundError as exc: if exc.filename == MYSQLBACKUP_BINARY: raise NotImplementedError("The MySQL Operator image does " \ "not contain MySQL Enterprise " \ "Backup, please verify you are " \ "using MySQL Enterprise Edition" ) from exc raise except RuntimeError: # subprocess.CalledProcessError: sys.exit(1) # /var/run/mysqld is a location shared between restore initContainer, # which creates this file and sidecar container, which wants to read it shutil.copy(self.tmppath+'/meta/backup_variables.txt', '/var/run/mysqld/backup_variables.txt') shutil.rmtree(self.tmppath) for incremental in incrementals: self._run_cmd('copy-back-and-apply-log', args=command+['--incremental']+self.storage.get_args(incremental)) # This will override hte previous - we are only interested to keep the last shutil.copy(self.tmppath+'/meta/backup_variables.txt', '/var/run/mysqld/backup_variables.txt') shutil.rmtree(self.tmppath) def cleanup(self): if hasattr(self.storage, 'clean'): self.storage.clean() try: shutil.rmtree(self.tmppath) except Exception as exc: print(f"Failed to delete temp directory, this is likely okay: {exc}") def testmain(): import argparse parser = argparse.ArgumentParser( prog='Backup Test Interrface', description='Create MySQL Enterprise Backup') parser.add_argument("target") parser.add_argument("-u", "--user") parser.add_argument("-p", "--password") parser.add_argument("-t", "--host") # argparse wants -h for itself ... parser.add_argument("-i", "--incremental", action='store_true') args = parser.parse_args() storage = MebStorageOCIPAR(args.target, OCIObjectStorage(OCIRequest(), 'jschluet', 'idylsdbcgx0d')) meb = MySQLEnterpriseBackup( storage, [f"-u{args.user}", f"-p{args.password}", f"--host={args.host}"], "/tmp/backup-tmp" ) try: meb.backup(args.incremental) finally: meb.cleanup() if __name__ == "__main__": testmain()