mysqloperator/controller/backup/meb/restore_main.py (105 lines of code) (raw):
# Copyright (c) 2025, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
import argparse
import json
import logging
import os
import re
import shutil
import sys
import traceback
from . import meb_controller as meb
def _get_storage(mebinfo: dict):
if 'ociObjectStorage' in mebinfo['spec']['storage']:
return meb.MebStorageOCIRestore(mebinfo['credentials']['parBaseUrl'])
elif 's3' in mebinfo['spec']['storage']:
s3 = mebinfo['spec']['storage']['s3']
secret = mebinfo['secret']
return meb.MebStorageS3(s3['objectKeyPrefix'], s3['bucket'],
secret['key_id'], secret['secret_access_key'])
print("No storage config for restore found, aborting")
sys.exit(1)
def do_restore(datadir: str, cluster_name: str, mebinfo: dict,
logger: logging.Logger):
storage = _get_storage(mebinfo)
mebspec = mebinfo['spec']
options = []
if "extraOptions" in mebspec:
options = mebspec["extraOptions"]
m = meb.MySQLEnterpriseBackup(
storage,
options,
"/tmp/backup-tmp"
)
incrementals = []
if 'incrementalBackups' in mebspec:
incrementals = mebspec['incrementalBackups']
m.restore(mebspec['fullBackup'], incrementals)
m.cleanup()
logger.info("Restored full backup %s", mebspec['fullBackup'])
if 'incrementalBackups' in mebspec:
logger.info("Restored incremental backups %s", incrementals)
def prepare_binlogs_as_relay_logs_for_pitr(datadir: str,
cluster_name: str,
mebinfo: dict,
logger: logging.Logger):
storage = _get_storage(mebinfo)
mebspec = mebinfo['spec']
options = []
if "extraOptions" in mebspec:
options = mebspec["extraOptions"]
if not "pitr" in mebspec or not "backupFile" in mebspec["pitr"]:
return
logger.info("Preparing relay logs for PITR")
tmpdir = "/tmp/backup-tmp"
m = meb.MySQLEnterpriseBackup(
storage,
options,
tmpdir
)
try:
m.extract(mebspec['pitr']['backupFile'])
binlog_base = mebspec["pitr"]["binlogName"] if len(mebspec["pitr"]["binlogName"]) else "binlog"
pattern = re.compile(rf'^{re.escape(binlog_base)}\.\d{{6}}$')
binlogs = [l for l in os.listdir(f"{tmpdir}/datadir") if pattern.match(l)]
binlogs.sort()
logger.info("Trying to prepare binlogs from backup as relay logs")
i = 0
with open(f"{datadir}/{cluster_name}-0-relay-bin-pitr.index", "wt") as relay_index:
for logfile in binlogs:
i += 1
logger.info(f"Preparing for PITR: COPY {tmpdir}/datadir/{logfile.strip()} TO {datadir}/{cluster_name}-0-relay-bin.{i:06}")
shutil.copy(f"{tmpdir}/datadir/{logfile.strip()}", f"{datadir}/{cluster_name}-0-relay-bin-pitr.{i:06}")
relay_index.write(f"./{cluster_name}-0-relay-bin-pitr.{i:06}\n")
finally:
m.cleanup()
def main(argv):
try:
with open("/tmp/meb_restore.json") as restore_file:
meb_restore_info = json.load(restore_file)
except FileNotFoundError:
print("No restore needed, skipping (check initconf for reason)")
return 0
parser = argparse.ArgumentParser(description = "MySQL InnoDB Cluster Instance Restore Container")
parser.add_argument('--logging-level', type = int, nargs="?", default = logging.INFO, help = "Logging Level")
parser.add_argument('--pod-name', type = str, nargs=1, default=None, help = "Pod Name")
parser.add_argument('--pod-namespace', type = str, nargs=1, default=None, help = "Pod Namespace")
parser.add_argument('--cluster-name', type = str, nargs=1, default=None, help = "InniDBCluster Name")
parser.add_argument('--datadir', type = str, default = "/var/lib/mysql", help = "Path do data directory")
args = parser.parse_args(argv[1:])
datadir = args.datadir
logging.basicConfig(level=args.logging_level,
format='%(asctime)s - [%(levelname)s] [%(name)s] %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S")
logger = logging.getLogger("restore")
name = args.pod_name[0] # nargs returns a list
namespace = args.pod_namespace[0] # nargs returns a list
cluster_name = args.cluster_name[0]
try:
logger.info(f"Restoreing MySQL Enterprise Backup in Pod {namespace}/{name}, datadir={datadir}")
do_restore(datadir, cluster_name, meb_restore_info, logger)
prepare_binlogs_as_relay_logs_for_pitr(datadir, cluster_name, meb_restore_info, logger)
except Exception as exc:
traceback.print_exc()
logger.critical(f"Unhandled exception while restoring: {exc}")
sys.exit(1)
if __name__ == '__main__':
main(sys.argv)