mysqloperator/controller/innodbcluster/initdb.py (251 lines of code) (raw):

# Copyright (c) 2020, 2024, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # from typing import TYPE_CHECKING, cast from .cluster_api import DumpInitDBSpec, MySQLPod, InitDB, CloneInitDBSpec, InnoDBCluster, ClusterSetInitDBSpec from ..shellutils import SessionWrap, DbaWrap, connect_dba from .. import mysqlutils, utils from ..kubeutils import api_core, api_apps, api_customobj from ..kubeutils import client as api_client, ApiException from .. import fqdn from abc import ABC, abstractmethod import mysqlsh import time import os from logging import Logger if TYPE_CHECKING: from mysqlsh.mysql import ClassicSession def start_clone_seed_pod(session: 'ClassicSession', cluster: InnoDBCluster, seed_pod: MySQLPod, clone_spec: CloneInitDBSpec, logger: Logger) -> bool: logger.info( f"Initializing seed instance. method=clone pod={seed_pod} source={clone_spec.uri}") donor_root_co = dict(mysqlsh.globals.shell.parse_uri(clone_spec.uri)) # Here we get only the password from the cluster secret. The secret # might contain also rootUser and rootHost (mask from where the user connects) # shouldn't we respect rootUser and not ask for rootUser in clone_spec? # Or...this is different kind of secret? donor_root_co["user"] = clone_spec.root_user or "root" donor_root_co["password"] = clone_spec.get_password(cluster.namespace) logger.info(f"CONNECTING {donor_root_co['user']}@{donor_root_co['host']}...") # Let's check if the donor has the CLONE plugin and if not install it # It's not possible to clone without this plugin being installed with SessionWrap(donor_root_co) as donor: clone_installed = False for row in iter(donor.run_sql("SHOW PLUGINS").fetch_one, None): if row[3]: logger.info(f"Donor has plugin {row[0]} / {row[3]}") if row[0] == "clone": clone_installed = True if not clone_installed: logger.info(f"Installing clone plugin at {donor.uri}") # A: Check here if the plugin reall got installed before continuing? donor.run_sql("install plugin clone soname 'mysql_clone.so'") # TODO copy other installed plugins(?) # clone try: donor_co = dict(mysqlsh.globals.shell.parse_uri(clone_spec.uri)) # Here we get only the password from the cluster secret. The secret # might contain also rootUser and rootHost (mask from where the user connects) # shouldn't we respect rootUser although the clone_spec.uri might already contain it? # spec : root@xyz.abc.dev donor_co["password"] = clone_spec.get_password(cluster.namespace) with SessionWrap(donor_co) as donor: logger.info(f"Starting server clone from {clone_spec.uri}") ret = mysqlutils.clone_server(donor_co, donor, session, logger) logger.info("Cloning finished") return ret except mysqlsh.Error as e: if mysqlutils.is_client_error(e.code) or e.code == mysqlsh.mysql.ErrorCode.ER_ACCESS_DENIED_ERROR: # TODO check why are we still getting access denied here, the container should have all accounts ready by now # rethrow client and retriable errors raise else: raise def monitor_clone(session: 'ClassicSession', start_time: str, logger: Logger) -> None: logger.info("Waiting for clone...") while True: r = session.run_sql("select * from performance_schema.clone_progress") time.sleep(1) def finish_clone_seed_pod(session: 'ClassicSession', cluster: InnoDBCluster, logger: Logger) -> None: logger.info(f"Finalizing clone - Not implemented") return # copy sysvars that affect data, if any # TODO logger.info(f"Clone finished successfully") def get_secret(secret_name: str, namespace: str, logger: Logger) -> dict: logger.info(f"load_dump::get_secret") if not secret_name: raise Exception(f"No secret provided") ret = {} try: secret = cast(api_client.V1Secret, api_core.read_namespaced_secret(secret_name, namespace)) for k, v in secret.data.items(): ret[k] = utils.b64decode(v) except Exception: raise Exception(f"Secret {secret_name} in namespace {namespace} cannot be found") return ret class RestoreDump(ABC): def __init__(self, init_spec: DumpInitDBSpec, namespace: str, logger: Logger) -> None: super().__init__() self.init_spec = init_spec self.storage = init_spec.storage self.namespace = namespace self.logger = logger self.create_config() def __del__(self): try: self.clean_config() except: # We ignore cleanup errors, which is fine pass @abstractmethod def create_config(self): ... @abstractmethod def add_options(self, options: dict): ... @property def path(self): ... @abstractmethod def clean_config(self): ... class RestoreOciObjectStoreDump(RestoreDump): def __init__(self, init_spec: DumpInitDBSpec, namespace: str, logger: Logger) -> None: self.oci_config_file = f"{os.getenv('MYSQLSH_USER_CONFIG_HOME')}/oci_config" self.oci_privatekey_file = f"{os.getenv('MYSQLSH_USER_CONFIG_HOME')}/privatekey.pem" super().__init__(init_spec, namespace, logger) def create_config(self): import configparser privatekey = None config_profile = "DEFAULT" config = configparser.ConfigParser() oci_credentials = get_secret(self.storage.ociObjectStorage.ociCredentials, self.namespace, self.logger) if not isinstance(oci_credentials, dict): raise Exception(f"Failed to process credentials secret {self.storage.ociObjectStorage.ociCredentials}") for k, v in oci_credentials.items(): if k != "privatekey": config[config_profile][k] = v else: privatekey = v config[config_profile]["key_file"] = self.oci_privatekey_file with open(self.oci_config_file, 'w') as f: config.write(f) with open(self.oci_privatekey_file, 'w') as f: f.write(privatekey) def add_options(self, options: dict): options["ociConfigFile"] = self.oci_config_file options["ociProfile"] = "DEFAULT" options["osBucketName"] = self.storage.ociObjectStorage.bucketName @property def path(self): return self.storage.ociObjectStorage.prefix def clean_config(self): try: os.remove(self.oci_config_file) os.remove(self.oci_privatekey_file) except: pass class RestoreS3Dump(RestoreDump): def create_config(self): s3_credentials = get_secret(self.storage.s3.config, self.namespace, self.logger) if not isinstance(s3_credentials, dict): raise Exception(f"Failed to process S3 config secret {self.storage.s3.config}") configdir = f"{os.getenv('HOME')}/.aws" try: os.mkdir(configdir) except FileExistsError: # ok if directory exists pass for filename in s3_credentials: with open(f"{configdir}/{filename}", "w") as file: file.write(s3_credentials[filename]) def add_options(self, options: dict): options["s3BucketName"] = self.storage.s3.bucketName options["s3Profile"] = self.storage.s3.profile if self.storage.s3.endpoint: options["s3EndpointOverride"] = self.storage.s3.endpoint @property def path(self): return self.storage.s3.prefix def clean_config(self): configdir = f"{os.getenv('HOME')}/.aws" try: os.remove(f"{configdir}/credentials") os.remove(f"{configdir}/config") os.rmdir(configdir) except: pass class RestoreAzureBlobStorageDump(RestoreDump): def create_config(self): configdir = f"{os.getenv('HOME')}/.azure" azure_config = get_secret(self.storage.azure.config, self.namespace, self.logger) if not isinstance(azure_config, dict): raise Exception(f"Failed to process Azure config secret {self.storage.azure.config}") try: os.mkdir(configdir) except FileExistsError: # ok if directory exists pass with open(f"{configdir}/config", "w") as file: file.write(azure_config["config"]) def add_options(self, options: dict): options["azureContainerName"] = self.storage.azure.containerName @property def path(self): return self.storage.azure.prefix def clean_config(self): configdir = f"{os.getenv('HOME')}/.azure" try: os.remove(f"{configdir}/config") os.rmdir(configdir) except: pass class RestorePath(RestoreDump): def create_config(self): pass def add_options(self, options: dict): pass @property def path(self): return self.init_spec.path def clean_config(self): pass def load_dump(session: 'ClassicSession', cluster: InnoDBCluster, pod: MySQLPod, init_spec: DumpInitDBSpec, logger: Logger) -> None: logger.info("::load_dump") options = init_spec.loadOptions.copy() options["progressFile"] = "" restore = None if init_spec.storage.ociObjectStorage: restore = RestoreOciObjectStoreDump(init_spec, cluster.namespace, logger) elif init_spec.storage.s3: restore = RestoreS3Dump(init_spec, cluster.namespace, logger) elif init_spec.storage.azure: restore = RestoreAzureBlobStorageDump(init_spec, cluster.namespace, logger) else: restore = RestorePath(init_spec, cluster.namespace, logger) restore.add_options(options) path = restore.path logger.info(f"Executing load_dump({path}, {options})") assert path try: mysqlsh.globals.util.load_dump(path, options) logger.info("Load_dump finished") except mysqlsh.Error as e: logger.error(f"Error loading dump: {e}") raise finally: del restore # This is a dupe of ClusterController::dba_cluster_name # TODO: Needs to be refactored so both places use one function def cannonical_dba_cluster_name(name: str) -> str: """Return the name of the cluster as defined in the k8s resource as a InnoDB Cluster compatible name.""" return name.replace("-", "_").replace(".", "_") def join_innodb_cluster_set(session: 'ClassicSession', cluster: InnoDBCluster, init_spec: ClusterSetInitDBSpec, pod: MySQLPod, logger: Logger, create_admin_account): primary_root_co = dict(mysqlsh.globals.shell.parse_uri(init_spec.uri)) # Here we get only the password from the cluster secret. The secret # might contain also rootUser and rootHost (mask from where the user connects) # shouldn't we respect rootUser and not ask for rootUser in clone_spec? # Or...this is different kind of secret? primary_root_co["user"] = init_spec.root_user or "root" # don't print password by assigning later - but mind: user might provide password in URI ... is it their fault then? print( f"CONNECTING WITH {primary_root_co} {isinstance(primary_root_co, dict)} {type(primary_root_co)}") primary_root_co["password"] = init_spec.get_password(cluster.namespace) #import random #import string #characters = string.ascii_letters + string.digits #suffix = ''.join(random.choice(characters) for _ in range(10)) with SessionWrap(primary_root_co) as primary_session: # TODO the create_admin_account function is passed as callback, quite a hack, need to decide where that has to live, probably the caller ins sidecar_main should establish the right session and preapre the user and then call this function ... or move it all to sidecar_main? # also other accounts are created per pod while binlog is disabled,we however have to make sure mysqladmin accoutns are replicated through tehe complete clusterset, so that current primary can be reached from anywhere, by the time of writing replicacluster can reach all (as here we write to primary), but primary can't reach replicaclsuters (as primary's account isn't replicated over) create_admin_account(primary_session, cluster, logger) with DbaWrap(connect_dba(primary_root_co, logger)) as dba: cluster_set = dba.get_cluster().get_cluster_set() add_instance_options = { "recoveryProgress": True, "recoveryMethod": "clone", "timeout": 120 } if not cluster.parsed_spec.tlsUseSelfSigned: logger.info("Using TLS GR authentication") rdns = cluster.get_tls_issuer_and_subject_rdns() add_instance_options["certSubject"] = rdns["subject"] else: logger.info("Using PASSWORD GR authentication") try: pod_fqdn = fqdn.pod_fqdn(pod, logger) logger.info(f"Creating replica cluster {cluster.name} attached to {pod_fqdn} with options {add_instance_options}") cluster_set.create_replica_cluster(pod_fqdn, cannonical_dba_cluster_name(cluster.name), add_instance_options) except: logger.error("Exception when creating replica cluster") raise logger.info("Replica cluster created") return session