mysqloperator/controller/backup/meb/meb_main.py (119 lines of code) (raw):

# Copyright (c) 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # import argparse import json import logging import ssl import sys from http.server import HTTPServer, SimpleHTTPRequestHandler from urllib.parse import unquote as urlunquote from . import meb_controller as meb class CustomHandler(SimpleHTTPRequestHandler): datadir = None def do_GET(self): if self.path == "/ping": self.send_response(200) self.end_headers() self.wfile.write(b"true\n") return if self.path == "/restart": # Debug hook to quickly restart container self.send_response(200) self.end_headers() self.wfile.write(b"true\n") sys.exit() def do_POST(self): # We don't do do access cheking here as Python http.server will # verify the client certificate before reaching this point if self.path.startswith('/backup/'): try: backup_name = urlunquote(self.path.rsplit('/', 1)[-1]) content_length = int(self.headers.get('Content-Length', 0)) post_body = self.rfile.read(content_length) info = json.loads(post_body) backup_profile = info["spec"] username = info["source"]["user"] password = info["source"]["password"] options = ["-u", username, f"-p{password}", f"--host=127.0.0.1"] if 'extra_options' in backup_profile and backup_profile['extra_options']: options += backup_profile['extra_options'] storage_opts = backup_profile['storage'] if "s3" in storage_opts: storage = meb.MebStorageS3( storage_opts["s3"]["objectKeyPrefix"] + backup_name, storage_opts["s3"]["region"], storage_opts["s3"]["bucket"], info["secret"]["accessKeyId"], info["secret"]["secretAccessKey"], storage_opts["s3"]["host"] if "host" in storage_opts["s3"] else None) elif "oci" in storage_opts: storage = meb.MebStorageOCIPAR(storage_opts['oci']['prefix']+backup_name, meb.OCIObjectStorage( meb.OCIRequest(info['secret']), storage_opts['oci']['bucketName'], storage_opts['oci']['namespace'])) else: raise Exception("Need either meb or s3 storage specification") incremental = info["incremental"] incremental_base = info["incremental_base"] backup = meb.MySQLEnterpriseBackup( storage, options, "/tmp/backup-tmp", ) try: backup.backup(incremental, 'history:'+incremental_base) except Exception as exc: self.send_response(500) self.end_headers() response = backup.log + "\n" + exc.__str__() self.wfile.write(response.encode('utf-8', errors='replace')) return finally: backup.cleanup() self.send_response(200) self.end_headers() self.wfile.write(backup.log.encode('utf-8', errors='replace')) return except Exception as exc: self.send_response(500) self.end_headers() self.wfile.write(b"Failure\n") raise self.send_response(404) self.end_headers() self.wfile.write(b"false\n") def serve_http(sslopts: dict, datadir: str): server_address = ('0.0.0.0', 4443) handler_class = CustomHandler handler_class.datadir = datadir httpd = HTTPServer(server_address, handler_class) context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(certfile=sslopts["cert"], keyfile=sslopts["key"]) context.load_verify_locations(cafile='/tls/ca.pem') context.verify_mode = ssl.CERT_REQUIRED httpd.socket = context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() def main(argv): parser = argparse.ArgumentParser(description="MySQL InnoDB Cluster MySQL Enterprise Backup Daemon") parser.add_argument('--logging-level', type=int, nargs="?", default = logging.INFO, help="Logging Level") parser.add_argument('--pod-name', type=str, nargs=1, default=None, help="Pod Name") parser.add_argument('--pod-namespace', type=str, nargs=1, default=None, help="Pod Namespace") parser.add_argument('--datadir', type=str, default="/var/lib/mysql", help="Path do data directory") parser.add_argument('--ssl-cert', type=str, help="Path do TLS server cert") parser.add_argument('--ssl-key', type=str, help="Path do TLS server key") args = parser.parse_args(argv[1:]) datadir = args.datadir logging.basicConfig(level=args.logging_level, format='%(asctime)s - [%(levelname)s] [%(name)s] %(message)s', datefmt="%Y-%m-%dT%H:%M:%S") logger = logging.getLogger("meb") name = args.pod_name[0] namespace = args.pod_namespace[0] sslopts = { "cert": args.ssl_cert, "key": args.ssl_key, } try: logger.info("Starting MEB Daemon for %s/%s", namespace, name) serve_http(sslopts, datadir) except Exception as e: import traceback traceback.print_exc() logger.critical(f"Unhandled exception while restoring: {e}") return 1 if __name__ == '__main__': main(sys.argv)