in mysqloperator/controller/backup/meb/meb_main.py [0:0]
def do_POST(self):
# We don't do do access cheking here as Python http.server will
# verify the client certificate before reaching this point
if self.path.startswith('/backup/'):
try:
backup_name = urlunquote(self.path.rsplit('/', 1)[-1])
content_length = int(self.headers.get('Content-Length', 0))
post_body = self.rfile.read(content_length)
info = json.loads(post_body)
backup_profile = info["spec"]
username = info["source"]["user"]
password = info["source"]["password"]
options = ["-u", username, f"-p{password}", f"--host=127.0.0.1"]
if 'extra_options' in backup_profile and backup_profile['extra_options']:
options += backup_profile['extra_options']
storage_opts = backup_profile['storage']
if "s3" in storage_opts:
storage = meb.MebStorageS3(
storage_opts["s3"]["objectKeyPrefix"] + backup_name,
storage_opts["s3"]["region"], storage_opts["s3"]["bucket"],
info["secret"]["accessKeyId"], info["secret"]["secretAccessKey"],
storage_opts["s3"]["host"] if "host" in storage_opts["s3"] else None)
elif "oci" in storage_opts:
storage = meb.MebStorageOCIPAR(storage_opts['oci']['prefix']+backup_name,
meb.OCIObjectStorage(
meb.OCIRequest(info['secret']),
storage_opts['oci']['bucketName'],
storage_opts['oci']['namespace']))
else:
raise Exception("Need either meb or s3 storage specification")
incremental = info["incremental"]
incremental_base = info["incremental_base"]
backup = meb.MySQLEnterpriseBackup(
storage,
options,
"/tmp/backup-tmp",
)
try:
backup.backup(incremental, 'history:'+incremental_base)
except Exception as exc:
self.send_response(500)
self.end_headers()
response = backup.log + "\n" + exc.__str__()
self.wfile.write(response.encode('utf-8', errors='replace'))
return
finally:
backup.cleanup()
self.send_response(200)
self.end_headers()
self.wfile.write(backup.log.encode('utf-8', errors='replace'))
return
except Exception as exc:
self.send_response(500)
self.end_headers()
self.wfile.write(b"Failure\n")
raise
self.send_response(404)
self.end_headers()
self.wfile.write(b"false\n")