mysqloperator/sidecar_main.py (593 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # # Bootstrap sequence: # 1 - initconf (initContainer) # 1.1 - initializes MySQL configuration files # # 2 - initmysql (initContainer) - via docker container entrypoint script # 2.1 - initializes MySQL datadir # 2.2 - create default root account # 2.3 - create localroot # # 3 - mysql - via container entrypoint: # 3.1 - start mysqld # # 4 - sidecar: # 4.1 - configureInstance() # 4.2 - initialize db (clone, loadDump etc) # 4.3 - restart (optional) # 4.4 - mark as ready # # TODO move some stuff to initdatadir? # Initializes a MySQL Pod to be used in an InnoDBCluster managed by Kubernetes # # This performs the following tasks and runs instead of the regular # initialization done by the Docker image entry point: # - configureInstance() and create InnoDB Cluster admin account # - if this is the seed pod: # - populate database from what's specified in initDB # - create remote root user # # This process will start and restart mysqld as needed, but mysqld will be # stopped when it exits. It is meant to be called from a initContainer, if # this script fails, the Pod creation fails. # # This script is called whenever the Pod is created, including when the Pod # is deleted and recreated soon after by the StatefulSet. When a Pod is # recreated, the volumes may still contain data from the previous run of the # same Pod of the same StatefulSet (?). # import base64 from logging import Logger from typing import Optional, TYPE_CHECKING, Tuple, cast import mysqlsh import sys import os import logging import time import asyncio import argparse import kopf import json from threading import Lock from .controller import utils, mysqlutils, k8sobject, fqdn, config from .controller.api_utils import Edition from .controller.backup import meb from .controller.innodbcluster import initdb from .controller.innodbcluster.cluster_api import CloneInitDBSpec, DumpInitDBSpec, MebInitDBSpec, InnoDBCluster, MySQLPod, ClusterSetInitDBSpec from .controller.kubeutils import api_core, client as api_client from .controller.innodbcluster import router_objects from .controller.plugins import install_enterprise_plugins, install_enterprise_encryption, install_keyring_udf if TYPE_CHECKING: from mysqlsh.mysql import ClassicSession from mysqlsh import Dba mysql = mysqlsh.mysql k8sobject.g_component = "sidecar" k8sobject.g_host = os.getenv("HOSTNAME") g_cluster_name = None g_pod_index = 0 g_pod_name = None g_pod_namespace = None g_tls_change_underway = False g_ca_change_underway = False g_ca_tls_change_underway_lock = Lock() g_ready_gate = False g_ready_gate_lock = Lock() # The time it takes for mysqld to restart after a clone can be very long, # because it has to apply redo logs. OTOH we monitor the error log to see # if there's any activity happening, so the timeout is for activity to happen # not the total time it takes for the server to start. CLONE_RESTART_TIMEOUT = 60*10 # Path to a file created to indicate server bootstrap was done BOOTSTRAP_DONE_FILE = "/var/run/mysql/bootstrap-done" def create_local_accounts(session: 'ClassicSession', logger: Logger): """ Creates: - a localroot@localhost account with auth_socket authentication. """ sql = [ "SET sql_log_bin=0;", "CREATE USER IF NOT EXISTS localroot@localhost IDENTIFIED WITH auth_socket AS 'mysql';", "GRANT ALL ON *.* TO 'localroot'@localhost WITH GRANT OPTION;", "GRANT PROXY ON ''@'' TO 'localroot'@localhost WITH GRANT OPTION;", "SET sql_log_bin=1;" ] logger.info("Creating local accounts") for s in sql: try: ret = session.run_sql(s) if ret is not None and ret.warnings != []: logger.warning(f"Warnings during execution of {s}: {ret.warnings}") except Exception as e: logger.error(f"Error executing {s}: {e}") raise Exception("Error creating local accounts") def configure_for_innodb_cluster(dba:'Dba', logger: Logger): """ Configure instance for InnoDB Cluster. """ options = {} logger.info("Configuring instance for InnoDB Cluster") dba.configure_instance(None, options) logger.info("Instance configured") def wipe_old_innodb_cluster(session: 'ClassicSession', logger: Logger): # drop innodb cluster accounts try: rows = session.run_sql( "select attributes->>'$.recoveryAccountUser', attributes->>'$.recoveryAccountHost' from mysql_innodb_cluster_metadata.v2_instances").fetch_all() for user, host in rows: if user and host: logger.info(f"Dropping user {user}@{host}") session.run_sql("drop user if exists ?@?", [user, host]) except mysqlsh.Error as e: if e.code in (mysql.ErrorCode.ER_BAD_DB_ERROR, mysql.ErrorCode.ER_NO_SUCH_TABLE): pass else: logger.error( f"Could not query for old InnoDB Cluster accounts: {e}") raise # drop metadata schema if there's one logger.info("Dropping cloned mysql_innodb_cluster_metadata schema") session.run_sql("drop schema if exists mysql_innodb_cluster_metadata") def populate_with_clone(datadir: str, session: 'ClassicSession', cluster: InnoDBCluster, clone_spec: CloneInitDBSpec, pod: MySQLPod, logger: Logger): """ Initialize the DB using clone. Server may be restarted multiple times but will be back up on return. After the clone: - If the donor had the metadata schema, it will be dropped. - Local accounts will be created if missing. - root password will be reset if needed """ logger.info(f"Initializing mysql via clone...") start_time = session.run_sql("select now(6)").fetch_one()[0] logger.info(f"Starting at {start_time}") # TODO monitor clone from a thread and dump progress # initdb.monitor_clone(session, start_time, logger) initdb.start_clone_seed_pod( session, cluster, pod, clone_spec, logger) logger.info("Waiting for mysqld to be restarted/shutdown by clone") logger.info("Restarting mysqld back up, this may take a while") # root credentials for the new instance are supposed to match in donor try: user, host, password = get_root_account_info(cluster) except Exception as e: pod.error(action="InitDB", reason="InvalidArgument", message=f"{e}") raise logger.info(f"Connecting as {user}@{host}") session = connect(user, password, logger) initdb.finish_clone_seed_pod(session, cluster, logger) # create local accounts again since the donor may not have them create_local_accounts(session, logger) create_admin_account(session, cluster, logger) # reset password of the IC admin account # this won't be done by create and may be needed if donor contains such an account admin_user, admin_pass = cluster.get_admin_account() logger.info(f"Resetting password for {admin_user}@%") session.run_sql("SET PASSWORD FOR ?@'%'=?", [admin_user, admin_pass]) wipe_old_innodb_cluster(session, logger) # recreate metrics user if needed create_metrics_account(session, cluster, logger) create_admin_account(session, cluster, logger) return session def populate_with_dump(datadir: str, session: 'ClassicSession', cluster: InnoDBCluster, init_spec: DumpInitDBSpec, pod: MySQLPod, logger: Logger): logger.info(f"Initializing mysql from a dump...") initdb.load_dump(session, cluster, pod, init_spec, logger) # create local accounts again since the donor may not have them create_local_accounts(session, logger) wipe_old_innodb_cluster(session, logger) wipe_old_innodb_cluster(session, logger) return session def meb_do_pitr(datadir:str, session: 'ClassicSession', cluster: InnoDBCluster, init_spec: MebInitDBSpec, pod: MySQLPod, logger: Logger): """Do point-in-time-recovery (PITR) For doing PITR we have to do those things: In restore_main donme during init: 1) Extract the backup image so we get access to the binlogs 2) move them in as relay logs To do here: 3) set GTIDs to skip, if any 4) apply relay logs (binlogs from backup) up to target gtid 5) hold further processing till done 6) cleanup extracted backup file """ import time def is_relay_log_fully_applied(session, logger): res = session.run_sql("SHOW REPLICA STATUS") replica_status = res.fetch_one_object() if not replica_status: raise Exception("This server is not configured as a MySQL replica.") io_running = replica_status["Replica_IO_Running"] sql_running = replica_status["Replica_SQL_Running"] sql_running_state = replica_status["Replica_SQL_Running_State"] # states by which we end may vary on sever version and exact reason for # application of binlogs to end and we are tolerant here. In most cases # with MySQL 9.0 and later we should have an empty string ("") end_states = [ "", "Slave has read all relay log; waiting for more updates", "Replica has read all relay log; waiting for more updates", "Replica has stopped" ] logger.info((f"Applying PITR: IO Running: {io_running}, " f"SQL Running: {sql_running}, SQL State: {sql_running_state}")) return io_running == "No" and sql_running == "No" and sql_running_state in end_states logger.info("Applying Binary Logs") session.run_sql(f"CHANGE REPLICATION SOURCE TO RELAY_LOG_FILE='{cluster.name}-0-relay-bin-pitr.000001', RELAY_LOG_POS=1, SOURCE_HOST='pitr' FOR CHANNEL 'pitr'") if (init_spec.pitr_end_term and init_spec.pitr_end_value): logger.info(f"START REPLICA SQL_THREAD UNTIL {init_spec.pitr_end_term} = ? FOR CHANNEL 'pitr'", [init_spec.pitr_end_value]) session.run_sql(f"START REPLICA SQL_THREAD UNTIL {init_spec.pitr_end_term} = ? FOR CHANNEL 'pitr'", [init_spec.pitr_end_value]) else: session.run_sql("START REPLICA SQL_THREAD FOR CHANNEL 'pitr'") def growing_sleep(): """When only short log is to be applied (especially in testcases likely we don't want to wait long, however if there is a lot to do we don't want to query too quickly (and in consequence spam the log) The growth factor here is absolutely hand-wavy and not based on any science """ s = 0 while s < 5: s += 0.2 yield time.sleep(s) while s < 15: s += 1 yield time.sleep(s) while True: yield time.sleep(15) for _ in growing_sleep(): if is_relay_log_fully_applied(session, logger): break session.run_sql("STOP REPLICA SQL_THREAD FOR CHANNEL 'pitr'").fetch_all() session.run_sql("RESET REPLICA ALL").fetch_all() def populate_with_meb(datadir: str, session: 'ClassicSession', cluster: InnoDBCluster, init_spec: MebInitDBSpec, pod: MySQLPod, logger: Logger): if init_spec.pitr_backup_file and len(init_spec.pitr_backup_file): meb_do_pitr(datadir, session, cluster, init_spec, pod, logger) wipe_old_innodb_cluster(session, logger) return session def populate_by_joining_cluster_set(_, session: 'ClassicSession', cluster: InnoDBCluster, init_spec: ClusterSetInitDBSpec, pod: MySQLPod, logger: Logger): # recreate our accounts #old_read_only = session.run_sql("SELECT @@super_read_only").fetch_one()[0] # we are in a replica cluster, thus should be in super_Read_only mode, but let's play safe #try: # # this is somewhat duplicated with initialize() - probably wise to factor out # session.run_sql("SET sql_log_bin=0") # create_local_accounts(session, logger) #finally: # session.run_sql("SET GLOBAL super_read_only=?", [old_read_only]) # session.run_sql("SET sql_log_bin=1") logger.info(f"Joining InnoDB ClusterSet at {init_spec.uri}") session = initdb.join_innodb_cluster_set(session, cluster, init_spec, pod, logger, create_admin_account) try: user, host, password = get_root_account_info(cluster) except Exception as e: pod.error(action="InitDB", reason="InvalidArgument", message=f"{e}") raise logger.info(f"Connecting as {user}@{host}") session = connect(user, password, logger) return session def populate_db(datadir: str, session: 'ClassicSession', cluster: InnoDBCluster, pod, logger: Logger) -> 'ClassicSession': """ Populate DB from source specified in the cluster spec. Also creates main root account specified by user. mysqld may get restarted by clone. """ if cluster.parsed_spec.initDB: if cluster.parsed_spec.initDB.clone: logger.info("Populate with clone") return populate_with_clone(datadir, session, cluster, cluster.parsed_spec.initDB.clone, pod, logger) elif cluster.parsed_spec.initDB.dump: logger.info("Populate with dump") return populate_with_dump(datadir, session, cluster, cluster.parsed_spec.initDB.dump, pod, logger) elif cluster.parsed_spec.initDB.meb: logger.info("PITR and cleanup after MEB") return populate_with_meb(datadir, session, cluster, cluster.parsed_spec.initDB.meb, pod, logger) elif cluster.parsed_spec.initDB.cluster_set: logging.info("Joining InnoDB ClusterSet") return populate_by_joining_cluster_set(datadir, session, cluster, cluster.parsed_spec.initDB.cluster_set, pod, logger) else: logger.warning("spec.initDB ignored because no supported initialization parameters found") logger.info("InitDB not specified") return session def get_root_account_info(cluster: InnoDBCluster) -> Tuple[str, str, str]: secrets = cluster.get_user_secrets() if secrets: user = secrets.data.get("rootUser") host = secrets.data.get("rootHost") password = secrets.data.get("rootPassword", None) if not password: raise Exception( f"rootPassword missing in secret {cluster.parsed_spec.secretName}") if user: user = utils.b64decode(user) else: user = "root" if host: host = utils.b64decode(host) else: host = "%" password = utils.b64decode(password) return user, host, password raise Exception( f"Could not get secret '{cluster.parsed_spec.secretName}' with for root account information for {cluster.namespace}/{cluster.name}") def create_root_account(session: 'ClassicSession', pod: MySQLPod, cluster: InnoDBCluster, logger: Logger) -> None: """ Create general purpose root account (owned by user) as specified by user. """ logger.info("Creating root account") try: user, host, password = get_root_account_info(cluster) except Exception as e: pod.error(action="InitDB", reason="InvalidArgument", message=f"{e}") raise # DROP root@localhost which has random password in case create_root_account() is called from initialize # or drop the root@localhost from a initDB and set new password, as the one in the dump/clone might differ # The `root` user might not have username root thus we need to drop root@localhost created by the MySQL --initialize-only # It was created with random password we are not aware of but it doesn't matter. We create an user as specified in the secret ret = session.run_sql("DROP USER IF EXISTS root@localhost") logger.info(f"DROP USER root@localhost - Warnings {ret.warnings if ret is not None else []}") logger.info(f"Creating own root account {user}@{host}") ret = session.run_sql("CREATE USER IF NOT EXISTS ?@? IDENTIFIED BY ?", [user, host, password]) logger.info(f"CREATE USER - Warnings {ret.warnings if ret is not None else []}") ret = session.run_sql("GRANT ALL ON *.* TO ?@? WITH GRANT OPTION", [user, host]) logger.info(f"GRANT ALL - Warnings {ret.warnings if ret is not None else []}") ret = session.run_sql("GRANT PROXY ON ''@'' TO ?@? WITH GRANT OPTION", [user, host]) logger.info(f"GRANT PROXY - Warnings {ret.warnings if ret is not None else []}") def create_admin_account(session, cluster, logger: Logger): """ Create a super-user to be used by the operator. """ host = "%" user, password = cluster.get_admin_account() logger.info(f"Creating account {user}@{host}") # binlog has to be disabled for this, because we need to create the account # independently in all instances (so that we can run configure on them), # which would cause diverging GTID sets session.run_sql("DROP USER IF EXISTS ?@?", [user, host]) session.run_sql("CREATE USER ?@? IDENTIFIED BY ?", [user, host, password]) session.run_sql("GRANT ALL ON *.* TO ?@? WITH GRANT OPTION", [user, host]) session.run_sql("GRANT PROXY ON ''@'' TO ?@? WITH GRANT OPTION", [user, host]) logger.info("Admin account created") def create_metrics_account(session: 'ClassicSession', cluster: InnoDBCluster, logger: Logger): """ Create a user for metrics, if needed """ if not cluster.parsed_spec.metrics or not cluster.parsed_spec.metrics.enable: return host = "localhost" user = cluster.parsed_spec.metrics.dbuser_name max_connections = cluster.parsed_spec.metrics.dbuser_max_connections grants = cluster.parsed_spec.metrics.dbuser_grants logger.info(f"Creating account {user}@{host} using {session.get_uri()}") # binlog has to be disabled for this, because we need to create the account # independently in all instances (so that metrics are available even on later config failure), # which would cause diverging GTID sets mysqlutils.setup_metrics_user(session, user, grants, max_connections) logger.info("Metrics account created") def create_backup_account(session: 'ClassicSession', cluster: InnoDBCluster, logger: Logger): """ Create a user for backup """ user, password = cluster.get_backup_account() logger.debug(f"Creating backup account {user} using {session.get_uri()}") mysqlutils.setup_backup_account(session, user, password) logger.info("Backup account created") def connect(user: str, password: str, logger: Logger, timeout: Optional[int] = 60) -> 'ClassicSession': shell = mysqlsh.globals.shell i = 0 while timeout is None or i < timeout: try: shell.connect( {"user": user, "password": password, "scheme": "mysql"}) logger.info(f"Connect attempt #{i} successful") break except mysqlsh.Error as e: if mysqlutils.is_client_error(e.code): logger.warning(f"Connect attempt #{i} failed: {e}") time.sleep(2) else: logger.critical(f"Unexpected MySQL error during connection: {e}") raise i += 1 else: raise Exception("Could not connect to MySQL server after initialization") assert mysqlsh.globals.session return mysqlsh.globals.session def initialize(session: 'ClassicSession', datadir: str, pod: MySQLPod, cluster: InnoDBCluster, logger: Logger) -> None: session.run_sql("SET sql_log_bin=0") create_root_account(session, pod, cluster, logger) create_admin_account(session, cluster, logger) session.run_sql("SET sql_log_bin=1") user, password = cluster.get_admin_account() session = connect(user, password, logger) configure_for_innodb_cluster(mysqlsh.globals.dba, logger) if pod.index == 0 and cluster.get_create_time() is None: # if this is the 1st pod of the cluster, then initialize it and create default accounts session = populate_db(datadir, session, cluster, pod, logger) # TODO: This will fail after restarting server due to cloning - wonder why # I only noticed this with clusterSet not with plain clone # # mysqlsh.DBError: MySQL Error (2013): ClassicSession.run_sql: Lost connection to MySQL server during query session.run_sql("SET sql_log_bin=0") old_read_only = session.run_sql("SELECT @@super_read_only").fetch_one()[0] session.run_sql("SET GLOBAL super_read_only=0") try: # These need to be created on every pod and not replicated. Thus under sql_log_bin=0 # Not created earlier, as if there is initdb, for example from dump of clone, the # creation will be overwritten and we will need to create them again in the populate method create_metrics_account(session, cluster, logger) create_backup_account(session, cluster, logger) # Some commands like INSTALL [PLUGIN|COMPONENT] are not being # replicated we run them on any restart, those have to be idempotent # With enterprise edition activate enterprise plugins if cluster.parsed_spec.edition == Edition.enterprise: install_enterprise_plugins(cluster.parsed_spec.version, session, logger) # If a Keyring setup is requested install keyring UDFs if "keyring" in cluster.spec: print(f"KEYRING: {cluster.spec['keyring']}") install_keyring_udf(cluster.parsed_spec.version, session, logger) finally: session.run_sql("SET GLOBAL super_read_only=?", [old_read_only]) session.run_sql("SET sql_log_bin=1") def metadata_schema_version(session: 'ClassicSession', logger: Logger) -> Optional[str]: try: r = session.run_sql( "select * from mysql_innodb_cluster_metadata.schema_version").fetch_one() return r[0] except Exception as e: logger.info(f"Metadata check failed: {e}") return None def bootstrap(pod: MySQLPod, datadir: str, logger: Logger) -> int: """ Prepare MySQL instance for InnoDB cluster. This function must be idempotent, because the sidecar container can get restarted in many different scenarios. It's also possible that the whole pod gets deleted and recreated while its underlying PV is reused. In that case, the Pod will look brand new (so we can't rely on any data stored in the Pod object), but the instance will be already prepared and not be in the expected initial state with initial defaults. Returns 1 if bootstrapped, 0 if already configured and -1 on error """ name = pod.name namespace = pod.namespace # Check if the Pod is already configured according to itself gate = pod.get_member_readiness_gate("configured") if gate: logger.info(f"MySQL server was already initialized configured={gate}") return 0 cluster = pod.get_cluster() # In most cases we have a fresh datadir at this point, where only a user # localroot exists. A restore using MySQL Shell would only be done later. # However if a restore was done using MEB the datadir will be populated # with whatever data there was in the backup during init before this runs. # In that case we have to get in using credentials passed by the user. # We also must be careful about changs we do, as a PITR may follow and # changes may break application of binlogs. if pod.index == 0 and cluster.get_create_time() is None and cluster.parsed_spec.initDB and cluster.parsed_spec.initDB.meb: logger.info("Detected MEB restore, giving ourselves access") mebsession = None user, _, password = get_root_account_info(cluster) logger.info("Using account %s", user) try: mebsession = connect(user, password, logger, timeout=None) create_local_accounts(mebsession, logger) finally: if mebsession: mebsession.close() # Connect using localroot and check if the metadata schema already exists # note: we may have to wait for mysqld to startup, since the sidecar and # mysql containers are started at the same time. session = connect("localroot", "", logger, timeout=None) # Restore from MEB is done in init and the Backup may contain old metadata # mabe we could only check cluster create time in all cases, but that would # need a lot more testing in restart situations mdver = metadata_schema_version(session, logger) initdb_spec = cluster.parsed_spec.initDB if mdver and not (initdb_spec and initdb_spec.meb and cluster.get_create_time() is None): logger.info(f"InnoDB Cluster metadata (version={mdver}) found, skipping configuration...") pod.update_member_readiness_gate("configured", True) return 0 # Check if the datadir already existed logger.info( f"Configuring mysql pod {namespace}/{name}, configured={gate} datadir={datadir}") try: initialize(session, datadir, pod, cluster, logger) pod.update_member_readiness_gate("configured", True) logger.info("Configuration finished") except Exception as e: import traceback traceback.print_exc() logger.critical(f"Unhandled exception while bootstrapping MySQL: {e}") # TODO post event to the Pod and the Cluster object if this is the seed return -1 session.close() return 1 def ensure_correct_tls_sysvars(pod: MySQLPod, session: 'ClassicSession', enabled: bool, caller: str, logger: Logger) -> None: has_crl = os.path.exists("/etc/mysql-ssl/crl.pem") logger.info(f"Ensuring custom TLS certificates are {'enabled' if enabled else 'disabled'} {'(with crl)' if has_crl else ''} caller={caller}") def ensure_sysvar(var, value): logger.info(f"\tChecking if {var} is [{value}]") res = session.run_sql("SHOW VARIABLES LIKE ?", [var]) row = res.fetch_one() if row: curval = row[1] if curval != value: logger.info(f"\t{var} is [{curval}] persisting to [{value}]") session.run_sql(f"SET PERSIST {var} = ?", [value]) else: raise kopf.PermanentError(f"Variable {var} not found!") # first ensure configured paths are correct if enabled: ensure_sysvar("ssl_ca", "/etc/mysql-ssl/ca.pem") ensure_sysvar("ssl_crl", "/etc/mysql-ssl/crl.pem" if has_crl else "") ensure_sysvar("ssl_cert", "/etc/mysql-ssl/tls.crt") ensure_sysvar("ssl_key", "/etc/mysql-ssl/tls.key") if pod.instance_type == "group-member": ensure_sysvar("group_replication_recovery_ssl_verify_server_cert", "ON") ensure_sysvar("group_replication_ssl_mode", "VERIFY_IDENTITY") ensure_sysvar("group_replication_recovery_ssl_ca", "/etc/mysql-ssl/ca.pem") ensure_sysvar("group_replication_recovery_ssl_cert", "/etc/mysql-ssl/tls.crt") ensure_sysvar("group_replication_recovery_ssl_key", "/etc/mysql-ssl/tls.key") else: ensure_sysvar("ssl_ca", "ca.pem") ensure_sysvar("ssl_crl", "") ensure_sysvar("ssl_cert", "server-cert.pem") ensure_sysvar("ssl_key", "server-key.pem") if pod.instance_type == "group-member": ensure_sysvar("group_replication_recovery_ssl_verify_server_cert", "OFF") ensure_sysvar("group_replication_ssl_mode", "REQUIRED") ensure_sysvar("group_replication_recovery_ssl_ca", "") ensure_sysvar("group_replication_recovery_ssl_cert", "") ensure_sysvar("group_replication_recovery_ssl_key", "") def reconfigure_tls(pod: MySQLPod, enabled: bool, caller: str, logger: Logger) -> None: session = connect("localroot", "", logger, timeout=None) ensure_correct_tls_sysvars(pod, session, enabled, caller, logger) try: logger.info("Reloading TLS") session.run_sql("ALTER INSTANCE RELOAD TLS") except Exception as e: logger.error(f"MySQL error reloading TLS certificates: {e}") finally: session.close() def check_secret_mounted(secrets: dict, paths: list, logger: Logger) -> bool: logger.info("check_secret_mounted") for path in paths: secret_name = path.split("/")[-1] secret_value = secrets[secret_name] if not secret_value is None: if os.path.exists(path): dataf = open(path).read() if dataf != secret_value: logger.info(f"check_secret_mounted: No match for {secret_name}") return False logger.info(f"check_secret_mounted: {secret_name} matches") else: logger.info(f"check_secret_mounted: Path to secret {secret_name} doesn't exist") return False else: logger.info(f"check_secret_mounted: Not checking {path}, expected None value") logger.info(f"check_secret_mounted: Success") return True def on_ca_secret_create_or_change(value: dict, useSelfSigned: bool, router_deployment: Optional[api_client.V1Deployment], logger: Logger) -> None: global g_pod_name global g_pod_namespace logger.info("on_ca_secret_create_or_change") ca_pem = utils.b64decode(value['data']['ca.pem']) if 'ca.pem' in value['data'] else None crl_pem = utils.b64decode(value['data']['crl.pem']) if 'crl.pem' in value['data'] else None secrets = {'ca.pem': ca_pem, 'crl.pem': crl_pem} max_time = 7 * 60 delay = 5 for _ in range(max_time//delay): if check_secret_mounted(secrets, ["/etc/mysql-ssl/ca.pem", "/etc/mysql-ssl/crl.pem"], logger): logger.info(f"TLS CA file change detected, reloading TLS configurations") pod = MySQLPod.read(g_pod_name, g_pod_namespace) reconfigure_tls(pod, False if useSelfSigned else True, "on_ca_secret_create_or_change", logger) if router_deployment: # give time to all other sidecars to reload the TLS and then restart the router deployment from -0 time.sleep(delay) router_objects.restart_deployment_for_tls(router_deployment, router_tls_crt = None, router_tls_key = None, ca_pem = ca_pem, crl_pem = crl_pem, logger=logger) break else: logger.debug("Waiting for mounted TLS files to refresh...") time.sleep(delay) # TemporaryError was supposed to get this handler called again, but isn't # raise kopf.TemporaryError("TLS CA secret changed, but file didn't refresh yet") else: raise kopf.PermanentError("Timeout waiting for TLS files to get refreshed") def on_tls_secret_create_or_change(value: dict, useSelfSigned: bool, router_deployment: Optional[api_client.V1Deployment], logger: Logger) -> None: global g_pod_name global g_pod_namespace logger.info("on_tls_secret_create_or_change") pod = MySQLPod.read(g_pod_name, g_pod_namespace) tls_crt = utils.b64decode(value['data']['tls.crt']) if 'tls.crt' in value['data'] else None tls_key = utils.b64decode(value['data']['tls.key']) if 'tls.key' in value['data'] else None secrets = {'tls.crt': tls_crt, 'tls.key': tls_key} max_time = 7 * 60 delay = 5 for _ in range(max_time//delay): if check_secret_mounted(secrets, ["/etc/mysql-ssl/tls.crt", "/etc/mysql-ssl/tls.key"], logger): logger.info(f"TLS certificate file change detected, reloading TLS configurations") reconfigure_tls(pod, False if useSelfSigned else True, "on_tls_secret_create_or_change", logger) break else: logger.info("Waiting for mounted TLS files to refresh...") time.sleep(delay) #raise kopf.TemporaryError("TLS certificate secret changed, but file didn't refresh yet") else: raise kopf.PermanentError("Timeout waiting for TLS files to get refreshed") def on_router_tls_secret_create_or_change(value: dict, useSelfSigned: bool, router_deployment: Optional[api_client.V1Deployment], logger: Logger) -> None: logger.info("on_router_tls_secret_create_or_change") if router_deployment: router_tls_crt = utils.b64decode(value['data']['tls.crt']) if 'tls.crt' in value['data'] else None router_tls_key = utils.b64decode(value['data']['tls.key']) if 'tls.key' in value['data'] else None router_objects.restart_deployment_for_tls(router_deployment, router_tls_crt = router_tls_crt, router_tls_key = router_tls_key, ca_pem = None, crl_pem = None, logger=logger) def secret_belongs_to_the_cluster_checker(namespace:str, name, **_) -> bool: # This should be always true but some precaution won't hurt # It is true when the sidecar runs as standalone operator. If for some reason it doesn't # that it will listen to all namespaces and then this won't hold true all the time if namespace == g_pod_namespace: ic = InnoDBCluster.read(namespace, g_cluster_name) return name in (ic.parsed_spec.tlsCASecretName, ic.parsed_spec.tlsSecretName, ic.parsed_spec.router.tlsSecretName) return False @kopf.on.create("", "v1", "secrets", when=secret_belongs_to_the_cluster_checker) # type: ignore @kopf.on.update("", "v1", "secrets", when=secret_belongs_to_the_cluster_checker) # type: ignore def on_secret_create_or_update(name: str, namespace: str, spec, new, logger: Logger, **kwargs): # g_cluster_name # g_pod_index global g_tls_change_underway global g_ca_change_underway global g_ca_tls_change_underway_lock global g_ready_gate global g_ready_gate_lock logger.info(f"on_secret_create_or_update: name={name} pod_index={g_pod_index}") try: g_ready_gate_lock.acquire() if not g_ready_gate: logger.info("Cached value of gate[ready] is false, re-checking") ready = MySQLPod.read(g_pod_name, g_pod_namespace).get_member_readiness_gate("ready") if not ready: raise kopf.TemporaryError(f"Pod not ready - not yet part of the IC. Will retry", delay=15) g_ready_gate = True logger.info("Readiness gate 'ready' is true. Handling event.") finally: g_ready_gate_lock.release() ic = InnoDBCluster.read(namespace, g_cluster_name) tls_changed = False ca_changed = False handler = None router_deployment = None # In case the same secret is used for CA and TLS, and router TLS, then the order # here is very important. on_ca_secret_create_or_change() does what # on_tls_secret_create_or_change() does and restarts the deployment on top # So, either this order of checks or three separate if-statements. if ic.parsed_spec.tlsCASecretName == name: logger.info(f"on_secret_create_or_update: tlsCASecretName") g_ca_tls_change_underway_lock.acquire() try: if g_tls_change_underway: raise kopf.TemporaryError(f"TLS change underway. Wait to finish. {name}", delay=12) g_ca_change_underway = True ca_changed = True finally: g_ca_tls_change_underway_lock.release() handler = on_ca_secret_create_or_change router_deployment = ic.get_router_deployment() if g_pod_index == 0 else None elif ic.parsed_spec.tlsSecretName == name: logger.info(f"on_secret_create_or_update: tlsSecretName") g_ca_tls_change_underway_lock.acquire() try: if g_ca_change_underway: raise kopf.TemporaryError(f"CA change underway. Wait to finish. {name}", delay=14) g_tls_change_underway = True tls_changed = True finally: g_ca_tls_change_underway_lock.release() handler = on_tls_secret_create_or_change elif ic.parsed_spec.router.tlsSecretName == name: logger.info(f"on_secret_create_or_update: router.tlsSecretName") try: g_ca_tls_change_underway_lock.acquire() if g_ca_change_underway: raise kopf.TemporaryError(f"CA change underway. Wait to finish. {name}", delay=16) else: handler = on_router_tls_secret_create_or_change router_deployment = ic.get_router_deployment() if g_pod_index == 0 else None finally: g_ca_tls_change_underway_lock.release() if handler: try: handler(new, ic.parsed_spec.tlsUseSelfSigned, router_deployment , logger) finally: if ca_changed: g_ca_tls_change_underway_lock.acquire() g_ca_change_underway = False g_ca_tls_change_underway_lock.release() if tls_changed: g_ca_tls_change_underway_lock.acquire() g_tls_change_underway = False g_ca_tls_change_underway_lock.release() @kopf.on.startup() def configure(settings: kopf.OperatorSettings, logger: Logger, *args, **_): logger.info("sidecar: configure()") settings.peering.standalone = True settings.posting.enabled = False def main(argv): global g_cluster_name global g_pod_index global g_pod_name global g_pod_namespace # const - when there is an argument without value # default - when there is no argument at all # nargs = "?" - zero or one arguments # nargs = "+" - one or more arguments, returns a list() # nargs = 8 - 8 arguments will be consumed # nargs = 1 - 1 argument will be consumed, returns a list with one element parser = argparse.ArgumentParser(description = "MySQL InnoDB Cluster Instance Sidecar Container") parser.add_argument('--debug', type = int, nargs="?", const = 1, default = 0, help = "Debug") parser.add_argument('--logging-level', type = int, nargs="?", default = logging.INFO, help = "Logging Level") parser.add_argument('--pod-name', type = str, default = "", help = "Pod Name") parser.add_argument('--pod-namespace', type = str, default = "", help = "Pod Namespace") parser.add_argument('--datadir', type = str, nargs=1, help = "Path do data directory") args = parser.parse_args(argv) datadir = args.datadir kopf.configure(verbose=True if args.debug != 0 else False) mysqlsh.globals.shell.options.useWizards = False logging.basicConfig(level=args.logging_level, format='%(asctime)s - [%(levelname)s] [%(name)s] %(message)s', datefmt="%Y-%m-%dT%H:%M:%S") logger = logging.getLogger("sidecar") utils.log_banner(__file__, logger) g_pod_namespace = args.pod_namespace g_pod_name = args.pod_name name = args.pod_name namespace = args.pod_namespace pod = MySQLPod.read(name, namespace) logger.info(f"My pod is {name} in {namespace}") logger.info("Bootstrapping") r = bootstrap(pod, datadir, logger) if r < 0: logger.info(f"Bootstrap error {r}") return abs(r) cluster = pod.get_cluster() cluster.log_tls_info(logger) g_cluster_name = cluster.name g_pod_index = pod.index if r == 0: # refresh TLS settings if we're restarting in case something changed reconfigure_tls(pod, False if cluster.parsed_spec.tlsUseSelfSigned else True, "main", logger) logger.info("Starting Operator request handler...") try: loop = asyncio.get_event_loop() loop.run_until_complete(kopf.operator(namespace=namespace)) except Exception as e: import traceback traceback.print_exc() logger.critical(f"Unhandled exception while bootstrapping MySQL: {e}") # TODO post event to the Pod and the Cluster object if this is the seed return 1