mysqloperator/controller/innodbcluster/cluster_objects.py (603 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # import random import string from logging import Logger, getLogger import kopf from typing import List, Dict, Optional, cast from ..kubeutils import client as api_client from .. import utils, config, consts from .cluster_api import InnoDBCluster, AbstractServerSetSpec, InnoDBClusterSpec, ReadReplicaSpec, InnoDBClusterSpecProperties from .. import fqdn import yaml from ..kubeutils import api_core, api_apps, api_customobj, k8s_cluster_domain, ApiException from . import router_objects import base64 import os from ..backup import backup_objects # TODO replace app field with component (mysqld,router) and tier (mysql) # This service includes all instances, even those that are not ready def prepare_cluster_service(spec: AbstractServerSetSpec, logger: Logger) -> dict: extra_label = "" if type(spec) is InnoDBClusterSpec: instance_type = "group-member" instances = spec.instances elif type(spec) is ReadReplicaSpec: instance_type = "read-replica" extra_label = f"mysql.oracle.com/read-replica: {spec.name}" else: raise NotImplementedError(f"Unknown subtype {type(spec)} for creating StatefulSet") tmpl = f""" apiVersion: v1 kind: Service metadata: name: {spec.headless_service_name} namespace: {spec.namespace} labels: tier: mysql mysql.oracle.com/cluster: {spec.cluster_name} mysql.oracle.com/instance-type: {instance_type} {extra_label} spec: clusterIP: None publishNotReadyAddresses: true ports: - name: mysql port: {spec.mysql_port} targetPort: {spec.mysql_port} - name: mysqlx port: {spec.mysql_xport} targetPort: {spec.mysql_xport} - name: gr-xcom port: {spec.mysql_grport} targetPort: {spec.mysql_grport} selector: component: mysqld tier: mysql mysql.oracle.com/cluster: {spec.cluster_name} mysql.oracle.com/instance-type: {instance_type} {extra_label} type: ClusterIP """ svc = yaml.safe_load(tmpl) if spec.instanceService.annotations: if not 'annotations' in svc['metadata']: svc['metadata']['annotations'] = {} svc['metadata']['annotations'] = spec.instanceService.annotations if spec.instanceService.labels: if not 'labels' in svc['metadata']: svc['metadata']['labels'] = {} svc['metadata']['labels'] = spec.instanceService.labels | svc['metadata']['labels'] for subsystem in spec.get_add_to_svc_cbs: print(f"\t\tChecking subsystem {subsystem}") for add_to_svc_cb in spec.get_add_to_svc_cbs[subsystem]: print(f"\t\tAdding {subsystem} SVC bits") add_to_svc_cb(svc, logger) return svc def prepare_secrets(spec: InnoDBClusterSpec) -> dict: def encode(s): return base64.b64encode(bytes(s, "ascii")).decode("ascii") # TODO: should we share the suffix with router&backup and stor in IC? # miught make it simpler to diagnose and remove characters = string.ascii_letters + string.digits suffix = ''.join(random.choice(characters) for _ in range(10)) admin_user = encode(config.CLUSTER_ADMIN_USER_NAME + '-' + suffix) admin_pwd = encode(utils.generate_password()) tmpl = f""" apiVersion: v1 kind: Secret metadata: name: {spec.name}-privsecrets data: clusterAdminUsername: {admin_user} clusterAdminPassword: {admin_pwd} """ return yaml.safe_load(tmpl) def prepare_cluster_pod_disruption_budget(spec: InnoDBClusterSpec) -> dict: tmpl = f""" apiVersion: policy/v1 kind: PodDisruptionBudget metadata: name: {spec.name}-pdb spec: maxUnavailable: 1 selector: matchLabels: component: mysqld tier: mysql mysql.oracle.com/cluster: {spec.name} """ pdb = yaml.safe_load(tmpl.replace("\n\n", "\n")) return pdb def get_restore_container(cluster: InnoDBCluster, spec: AbstractServerSetSpec, cluster_domain: str): if not isinstance(spec, InnoDBClusterSpec): # restore happens on a primary, read replica doesn't need a restore container return ("", "") if not spec.initDB or not spec.initDB.meb: # No MEB Restore - no extra container return ("", "") container = f""" - name: restore command: - mysqlsh - --pym - meb.restore_main - --pod-name - "$(POD_NAME)" - --pod-namespace - "$(POD_NAMESPACE)" - --cluster-name - "{spec.name}" - --datadir - /var/lib/mysql env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} - name: MYSQLSH_USER_CONFIG_HOME value: /tmp - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never image: {spec.mysql_image} imagePullPolicy: {spec.mysql_image_pull_policy} securityContext: allowPrivilegeEscalation: false capabilities: drop: - ALL privileged: false readOnlyRootFilesystem: true runAsGroup: 27 runAsUser: 27 volumeMounts: - mountPath: /usr/lib/mysqlsh/python-packages/meb name: mebcode - mountPath: /var/lib/mysql name: datadir - mountPath: /tmp # sharing with initconf to transfer restore information name: initconf-tmp - mountPath: /mysqlsh name: mebmyslhshhome - name: rundir mountPath: /var/run/mysqld """ volumes = f""" - name: mebmyslhshhome emptyDir: {{}} - name: mebcode configMap: name: {cluster.name}-mebcode """ return (container, volumes) def get_meb_container(cluster: InnoDBCluster, spec: InnoDBClusterSpec, cluster_domain: str): if not isinstance(spec, InnoDBClusterSpec): # Currently backup can't be done on RRs return ("", "") if all(not getattr(profile, 'meb', None) for profile in spec.backupProfiles): # No profile requests MEB return ("", "") if spec.tlsUseSelfSigned: ssl_cert = "/var/lib/mysql/server-cert.pem" ssl_key = "/var/lib/mysql/server-key.pem" mount = "" else: ssl_cert = "//etc/mysql-ssl/tls.crt" ssl_key = "/etc/mysql-ssl/tls.key" mount = """ - mountPath: /etc/mysql-ssl" name: ssldata """ container = f""" - name: meb command: - mysqlsh - --pym - meb.meb_main - --pod-name - "$(POD_NAME)" - --pod-namespace - "$(POD_NAMESPACE)" - --datadir - /var/lib/mysql - --ssl-cert - {ssl_cert} - --ssl-key - {ssl_key} env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} - name: MYSQLSH_USER_CONFIG_HOME value: /tmp - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never image: {spec.mysql_image} imagePullPolicy: {spec.mysql_image_pull_policy} securityContext: allowPrivilegeEscalation: false capabilities: drop: - ALL privileged: false readOnlyRootFilesystem: true runAsGroup: 27 runAsUser: 27 volumeMounts: - mountPath: /usr/lib/mysqlsh/python-packages/meb name: mebcode - mountPath: /var/lib/mysql name: datadir - mountPath: /tmp # sharing with initconf to transfer restore information name: initconf-tmp - mountPath: /mysqlsh name: mebmyslhshhome - name: rundir mountPath: /var/run/mysqld - name: mebtls mountPath: /tls {mount} """ volumes = f""" - name: mebmyslhshhome emptyDir: {{}} - name: mebcode configMap: name: {cluster.name}-mebcode - name: mebtls secret: secretName: {cluster.name}-meb-tls """ return (container, volumes) # TODO - check if we need to add a finalizer to the sts and svc (and if so, what's the condition to remove them) # TODO - check if we need to make readinessProbe take into account innodb recovery times # TODO - create ServiceAccount ({cluster.name}-sidecar-sa) for the mysql pods and bind it to the mysql-sidecar role # ## About lifecycle probes: # # ### startupProbe # # used to let k8s know that the container is still starting up. # # * Server startup can take anywhere from a few seconds to several minutes. # * If the server is initializing for the first time, it will take a few seconds. # * If the server is restarting after a clean shut down and there's not much data, # it will take even less to startup. # * But if it's restarting after a crash and there's a lot of data, the InnoDB # recovery can take a very long time to finish. # Since we want success to be reported asap, we set the interval to a small value. # We also set the successThreshold to > 1, so that we can report success once # every now and then to reset the failure counter. # NOTE: Currently, the startup probe will never fail the startup. We assume that # mysqld will abort if the startup fails. Once a method to check whether the # server is actually frozen during startup, the probe should be updated to stop # resetting the failure counter and let it actually fail. # # ### readinessProbe # # used to let k8s know that the container can be marked as ready, which means # it can accept external connections. We need mysqld to be always accessible, # so the probe should always succeed as soon as startup succeeds. # Any failures that happen after it's up don't matter for the probe, because # we want GR and the operator to control the fate of the container, not the # probe. # # ### livenessProbe # # this checks that the server is still healthy. If it fails above the threshold # (e.g. because of a deadlock), the container is restarted. # def prepare_cluster_stateful_set(cluster: InnoDBCluster, spec: AbstractServerSetSpec, logger: Logger) -> dict: init_mysql_argv = ["mysqld", "--user=mysql"] # if config.enable_mysqld_general_log: # init_mysql_argv.append("--general-log=1") mysql_argv = init_mysql_argv # we only need this in initconf, we pass it to all operator images to be # on the safe side cluster_domain = k8s_cluster_domain(logger) fqdn_template = fqdn.idc_service_fqdn_template(spec) extra_label = "" if type(spec) is InnoDBClusterSpec: instance_type = "group-member" elif type(spec) is ReadReplicaSpec: instance_type = "read-replica" extra_label = f"mysql.oracle.com/read-replica: {spec.name}" # initial startup no replica, we scale up once the group is running # spec.instances therefore will be reduced by the caller! else: raise NotImplementedError(f"Unknown subtype {type(spec)} for creating StatefulSet") fixdatadir_container = "" if spec.dataDirPermissions.setRightsUsingInitContainer: fixdatadir_container = f""" - name: fixdatadir image: {spec.operator_image} imagePullPolicy: {spec.sidecar_image_pull_policy} command: ["bash", "-c", "chown 27:27 /var/lib/mysql && chmod 0700 /var/lib/mysql"] securityContext: # make an exception for this one runAsNonRoot: false runAsUser: 0 # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true capabilities: add: - CHOWN - FOWNER drop: - ALL volumeMounts: - name: datadir mountPath: /var/lib/mysql env: - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never """ logger.info(f"Fix data container {'EN' if fixdatadir_container else 'DIS'}ABLED") # if meb restore ... (restore_container, restore_volumes) = get_restore_container(cluster, spec, cluster_domain) (meb_container, meb_volumes) = get_meb_container(cluster, spec, cluster_domain) # TODO re-add "--log-file=", tmpl = f""" apiVersion: apps/v1 kind: StatefulSet metadata: name: {spec.name} annotations: mysql.oracle.com/fqdn-template: '{fqdn_template}' labels: tier: mysql mysql.oracle.com/cluster: {spec.cluster_name} mysql.oracle.com/instance-type: {instance_type} {extra_label} app.kubernetes.io/name: mysql-innodbcluster app.kubernetes.io/instance: mysql-innodbcluster-{spec.name} app.kubernetes.io/component: database app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: serviceName: {spec.headless_service_name} replicas: {spec.instances} podManagementPolicy: Parallel selector: matchLabels: component: mysqld tier: mysql mysql.oracle.com/cluster: {spec.cluster_name} mysql.oracle.com/instance-type: {instance_type} {extra_label} app.kubernetes.io/name: mysql-innodbcluster-mysql-server app.kubernetes.io/instance: mysql-innodbcluster-{spec.name}-mysql-server app.kubernetes.io/component: database app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator template: metadata: annotations: mysql.oracle.com/fqdn-template: '{fqdn_template}' labels: component: mysqld tier: mysql mysql.oracle.com/cluster: {spec.cluster_name} mysql.oracle.com/instance-type: {instance_type} {extra_label} app.kubernetes.io/name: mysql-innodbcluster-mysql-server app.kubernetes.io/instance: mysql-innodbcluster-{spec.name}-mysql-server app.kubernetes.io/component: database app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: readinessGates: - conditionType: "mysql.oracle.com/configured" - conditionType: "mysql.oracle.com/ready" serviceAccountName: {spec.serviceAccountName} securityContext: runAsUser: 27 runAsGroup: 27 fsGroup: 27 {utils.indent("fsGroupChangePolicy: " + spec.dataDirPermissions.fsGroupChangePolicy, 8) if spec.dataDirPermissions.fsGroupChangePolicy else ""} runAsNonRoot: true terminationGracePeriodSeconds: 120 initContainers: {utils.indent(fixdatadir_container, 6)} - name: initconf image: {spec.operator_image} imagePullPolicy: {spec.sidecar_image_pull_policy} # For datadir see the datadir volum mount command: ["mysqlsh", "--log-level=@INFO", "--pym", "mysqloperator", "init", "--pod-name", "$(POD_NAME)", "--pod-namespace", "$(POD_NAMESPACE)", "--datadir", "/var/lib/mysql" ] securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that runAsNonRoot: true capabilities: drop: - ALL env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} - name: MYSQLSH_USER_CONFIG_HOME value: /tmp - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never volumeMounts: - name: initconfdir mountPath: /mnt/initconf readOnly: true - name: datadir mountPath: /var/lib/mysql - name: mycnfdata mountPath: /mnt/mycnfdata - name: initconf-tmp mountPath: /tmp - name: rootcreds readOnly: true # rootHost is not obligatory and thus might not exist in the secret # Nevertheless K8s won't complain and instead of mounting an empty file # will create a directory (/rootcreds/rootHost will be an empty directory) # For more information see below the comment regarding rootcreds. subPath: rootHost mountPath: /rootcreds/rootHost {restore_container} - name: initmysql image: {spec.mysql_image} imagePullPolicy: {spec.mysql_image_pull_policy} args: {init_mysql_argv} securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that runAsNonRoot: true capabilities: drop: - ALL env: - name: MYSQL_INITIALIZE_ONLY value: "1" - name: MYSQL_RANDOM_ROOT_PASSWORD value: "1" - name: MYSQLSH_USER_CONFIG_HOME value: /tmp volumeMounts: - name: datadir mountPath: /var/lib/mysql - name: rundir mountPath: /var/run/mysqld - name: mycnfdata mountPath: /etc/my.cnf.d subPath: my.cnf.d - name: mycnfdata mountPath: /docker-entrypoint-initdb.d subPath: docker-entrypoint-initdb.d - name: mycnfdata mountPath: /etc/my.cnf subPath: my.cnf - name: initmysql-tmp mountPath: /tmp - name: varlibmysqlfiles # The entrypoint of the container `touch`-es 2 files there mountPath: /var/lib/mysql-files containers: - name: sidecar image: {spec.operator_image} imagePullPolicy: {spec.sidecar_image_pull_policy} command: ["mysqlsh", "--pym", "mysqloperator", "sidecar", "--pod-name", "$(POD_NAME)", "--pod-namespace", "$(POD_NAMESPACE)", "--datadir", "/var/lib/mysql" ] securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that runAsNonRoot: true capabilities: drop: - ALL env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: MYSQL_UNIX_PORT value: /var/run/mysqld/mysql.sock - name: MYSQLSH_USER_CONFIG_HOME value: /mysqlsh - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never volumeMounts: - name: rundir mountPath: /var/run/mysqld - name: mycnfdata mountPath: /etc/my.cnf.d subPath: my.cnf.d - name: mycnfdata mountPath: /etc/my.cnf subPath: my.cnf - name: shellhome mountPath: /mysqlsh - name: sidecar-tmp mountPath: /tmp {utils.indent(spec.extra_sidecar_volume_mounts, 8)} - name: mysql image: {spec.mysql_image} imagePullPolicy: {spec.mysql_image_pull_policy} args: {mysql_argv} securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that runAsNonRoot: true capabilities: drop: - ALL lifecycle: preStop: exec: # 60 is the default value for dba.gtidWaitTimeout # see https://dev.mysql.com/doc/mysql-shell/8.0/en/mysql-innodb-cluster-working-with-cluster.html command: ["sh", "-c", "sleep 60 && mysqladmin -ulocalroot shutdown"] startupProbe: exec: command: ["/livenessprobe.sh", "8"] initialDelaySeconds: 5 periodSeconds: 3 failureThreshold: 10000 successThreshold: 1 timeout: 2 readinessProbe: exec: command: ["/readinessprobe.sh"] periodSeconds: 5 initialDelaySeconds: 10 failureThreshold: 10000 livenessProbe: exec: command: ["/livenessprobe.sh"] initialDelaySeconds: 15 periodSeconds: 15 failureThreshold: 10 successThreshold: 1 timeout: 5 env: - name: MYSQL_UNIX_PORT value: /var/run/mysqld/mysql.sock - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS value: never {utils.indent(spec.extra_env, 8)} ports: - containerPort: {spec.mysql_port} name: mysql - containerPort: {spec.mysql_xport} name: mysqlx - containerPort: {spec.mysql_grport} name: gr-xcom volumeMounts: - name: datadir mountPath: /var/lib/mysql - name: rundir mountPath: /var/run/mysqld - name: mycnfdata mountPath: /etc/my.cnf.d subPath: my.cnf.d - name: mycnfdata mountPath: /etc/my.cnf subPath: my.cnf - name: initconfdir mountPath: /livenessprobe.sh subPath: livenessprobe.sh - name: initconfdir mountPath: /readinessprobe.sh subPath: readinessprobe.sh - name: varlibmysqlfiles # The entrypoint of the container `touch`-es 2 files there mountPath: /var/lib/mysql-files - name: mysql-tmp mountPath: /tmp {utils.indent(spec.extra_volume_mounts, 8)} {meb_container} volumes: - name: mycnfdata emptyDir: {{}} - name: rundir emptyDir: {{}} - name: varlibmysqlfiles emptyDir: {{}} - name: initconfdir configMap: name: {spec.name}-initconf defaultMode: 0755 - name: shellhome emptyDir: {{}} - name: initconf-tmp emptyDir: {{}} - name: initmysql-tmp emptyDir: {{}} - name: mysql-tmp emptyDir: {{}} - name: sidecar-tmp emptyDir: {{}} {meb_volumes} {restore_volumes} # If we declare it and not use it anywhere as backing for a volumeMount K8s won't check # if the volume exists. K8s seems to be lazy in that regard. We don't need the information # from this secret directly, as the sidecar of pod 0 will fetch the information using the K8s API # However, we won't not to be lazy in checking if the secret exists and make it easier for the # administrator to find out if the secret is missing. If we mount it in a init or normal container, # the pod # will get stuck into "Ready:0/2 Init:0/3" with # Warning FailedMount XXs (....) kubelet "MountVolume.SetUp failed for volume "rootcreds" : secret ".........." not found" error to be seen in describe. - name: rootcreds secret: secretName: {spec.secretName} defaultMode: 0400 {utils.indent(spec.extra_volumes, 6)} volumeClaimTemplates: - metadata: name: datadir spec: accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 2Gi """ statefulset = yaml.safe_load(tmpl.replace("\n\n", "\n")) metadata = {} if spec.podAnnotations: print("\t\tAdding podAnnotations") metadata['annotations'] = spec.podAnnotations if spec.podLabels: print("\t\tAdding podLabels") metadata['labels'] = spec.podLabels if len(metadata): utils.merge_patch_object(statefulset["spec"]["template"], {"metadata" : metadata }) if spec.keyring: print("\t\tAdding keyring STS bit") spec.keyring.add_to_sts_spec(statefulset) for subsystem in spec.add_to_sts_cbs: print(f"\t\tadd_to_sts_cb: Checking subsystem {subsystem}") for add_to_sts_cb in spec.add_to_sts_cbs[subsystem]: print(f"\t\tAdding {subsystem} STS bits") add_to_sts_cb(statefulset, None, logger) if os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_CM"): ps_cm_ns = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_NS", "mysql-operator") ps_cm_name = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_CM") ps_cm_item = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_ITEM", "podspec.yaml") ps_cm = api_core.get_namespaced_config_map(ps_cm_name, ps_cm_name) ps_override = yaml.safe_load(ps_cm[ps_cm_item]) print("\t\tAdding global podSpec") utils.merge_patch_object(statefulset["spec"]["template"]["spec"], ps_override, f"{ps_cm_ns}.{ps_cm_name}.{ps_cm_item}") if spec.podSpec: print("\t\tAdding podSpec") utils.merge_patch_object(statefulset["spec"]["template"]["spec"], spec.podSpec, "spec.podSpec") if spec.datadirVolumeClaimTemplate: print("\t\tAdding datadirVolumeClaimTemplate") utils.merge_patch_object(statefulset["spec"]["volumeClaimTemplates"][0]["spec"], spec.datadirVolumeClaimTemplate, "spec.volumeClaimTemplates[0].spec") return statefulset def update_stateful_set_size(cluster: InnoDBCluster, rr_spec: ReadReplicaSpec, logger: Logger) -> None: sts = cluster.get_read_replica_stateful_set(rr_spec.name) if sts: patch = {"spec": {"replicas": rr_spec.instances}} api_apps.patch_namespaced_stateful_set( sts.metadata.name, sts.metadata.namespace, body=patch) def prepare_service_account_sidecar(spec: AbstractServerSetSpec) -> dict: account = f""" apiVersion: v1 kind: ServiceAccount metadata: name: {spec.serviceAccountName} namespace: {spec.namespace} {spec.image_pull_secrets} """ account = yaml.safe_load(account) return account def prepare_service_account_patch_for_image_pull_secrets(spec: AbstractServerSetSpec) -> Optional[Dict]: if not spec.imagePullSecrets: return None return { "imagePullSecrets" : spec.imagePullSecrets } def prepare_service_account_switchover(spec: AbstractServerSetSpec) -> dict: account = f""" apiVersion: v1 kind: ServiceAccount metadata: name: {spec.switchoverServiceAccountName} namespace: {spec.namespace} {spec.image_pull_secrets} """ account = yaml.safe_load(account) return account def prepare_role_binding_sidecar(spec: AbstractServerSetSpec) -> dict: rolebinding = f""" apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: {spec.roleBindingName} namespace: {spec.namespace} subjects: - kind: ServiceAccount name: {spec.serviceAccountName} roleRef: kind: ClusterRole name: mysql-sidecar apiGroup: rbac.authorization.k8s.io """ rolebinding = yaml.safe_load(rolebinding) return rolebinding def prepare_role_binding_switchover(spec: AbstractServerSetSpec) -> dict: rolebinding = f""" apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: {spec.switchoverRoleBindingName} namespace: {spec.namespace} subjects: - kind: ServiceAccount name: {spec.switchoverServiceAccountName} roleRef: kind: ClusterRole name: mysql-switchover apiGroup: rbac.authorization.k8s.io """ rolebinding = yaml.safe_load(rolebinding) return rolebinding def prepare_additional_configmaps(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]: configmaps = [] prefix = '' for subsystem in spec.get_configmaps_cbs: for cb in spec.get_configmaps_cbs[subsystem]: if cms := cb(prefix, logger): for (cm_name, cm) in cms: if cm: configmaps.append(cm) # TODO: This should use the CB mechanism like other CMs above if isinstance(spec, InnoDBClusterSpec): clusterspec = cast(InnoDBClusterSpec, spec) if (clusterspec.initDB and clusterspec.initDB.meb) \ or any(getattr(profile, 'meb', None) for profile in clusterspec.backupProfiles): if spec.edition != config.Edition.enterprise: raise kopf.TemporaryError("A BackupProfile requires MySQL Enterprise Backup, but the cluster doesn't use Enterprise Edition") configmaps.append(backup_objects.prepare_meb_code_configmap(clusterspec)) return configmaps def prepare_component_config_configmaps(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]: configmaps = [ spec.keyring.get_component_config_configmap_manifest() ] return configmaps def prepare_component_config_secrets(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]: secrets = [] cm = spec.keyring.get_component_config_secret_manifest() if cm: secrets.append(cm) return secrets def prepare_initconf(cluster: InnoDBCluster, spec: AbstractServerSetSpec, logger: Logger) -> dict: with open(os.path.dirname(os.path.abspath(__file__))+'/router-entrypoint-run.sh.tpl', 'r') as entryfile: router_entrypoint = "".join(entryfile.readlines()) liveness_probe = """#!/bin/bash # Copyright (c) 2020, 2021, Oracle and/or its affiliates. # Insert 1 success every this amount of failures # (assumes successThreshold is > 1) max_failures_during_progress=$1 # Ping the server to see if it's up mysqladmin -umysqlhealthchecker ping # If it's up, we succeed if [ $? -eq 0 ]; then exit 0 fi if [ -z $max_failures_during_progress ]; then exit 1 fi # If the init/startup/InnoDB recovery is still ongoing, we're # not succeeded nor failed yet, so keep failing and getting time # extensions until it succeeds. # We currently rely on the server to exit/abort if the init/startup fails, # but ideally there would be a way to check whether the server is # still making progress and not just stuck waiting on a frozen networked # volume, for example. if [ -f /tmp/fail-counter ]; then fail_count=$(($(cat /tmp/fail-counter) + 1)) else fail_count=1 fi if [ $fail_count -gt $max_failures_during_progress ]; then # Report success to reset the failure counter upstream and get # a time extension rm -f /tmp/fail-counter exit 0 else # Update the failure counter and fail out echo $fail_count > /tmp/fail-counter exit 1 fi """ readiness_probe = """#!/bin/bash # Copyright (c) 2020, 2022, Oracle and/or its affiliates. # Once the container is ready, it's always ready. if [ -f /tmp/mysql-ready ]; then exit 0 fi # Ping server to see if it is ready if mysqladmin -umysqlhealthchecker ping; then touch /tmp/mysql-ready exit 0 else exit 1 fi """ has_crl = cluster.tls_has_crl() if not spec.tlsUseSelfSigned: ca_file_name = cluster.get_ca_and_tls().get("CA", "ca.pem") else: ca_file_name = "" tmpl = f""" apiVersion: v1 kind: ConfigMap metadata: name: {spec.name}-initconf data: initdb-localroot.sql: | set sql_log_bin=0; # Create socket authenticated localroot@localhost account CREATE USER localroot@localhost IDENTIFIED WITH auth_socket AS 'mysql'; GRANT ALL ON *.* TO localroot@localhost WITH GRANT OPTION; GRANT PROXY ON ''@'' TO localroot@localhost WITH GRANT OPTION; # Drop the default account created by the docker image DROP USER IF EXISTS healthchecker@localhost; # Create account for liveness probe CREATE USER mysqlhealthchecker@localhost IDENTIFIED WITH auth_socket AS 'mysql'; set sql_log_bin=1; readinessprobe.sh: | {utils.indent(readiness_probe, 4)} livenessprobe.sh: | {utils.indent(liveness_probe, 4)} router-entrypoint-run.sh.tpl: | {utils.indent(router_entrypoint, 4)} my.cnf.in: | # Server identity related options (not shared across instances). # Do not edit. [mysqld] server_id=@@SERVER_ID@@ report_host=@@HOSTNAME@@ datadir=/var/lib/mysql loose_mysqlx_socket=/var/run/mysqld/mysqlx.sock socket=/var/run/mysqld/mysql.sock local-infile=1 [mysql] socket=/var/run/mysqld/mysql.sock [mysqladmin] socket=/var/run/mysqld/mysql.sock !includedir /etc/my.cnf.d 00-basic.cnf: | # Basic configuration. # Do not edit. [mysqld] plugin_load_add=auth_socket.so loose_auth_socket=FORCE_PLUS_PERMANENT skip_log_error log_error_verbosity=3 01-group_replication.cnf: | # GR and replication related options # Do not edit. [mysqld] log_bin={spec.name} enforce_gtid_consistency=ON gtid_mode=ON skip_replica_start=1 02-ssl.cnf: | # SSL configurations # Do not edit. [mysqld] {"# " if spec.tlsUseSelfSigned else ""}ssl-ca=/etc/mysql-ssl/{ca_file_name} {"# " if not has_crl else ""}ssl-crl=/etc/mysql-ssl/crl.pem {"# " if spec.tlsUseSelfSigned else ""}ssl-cert=/etc/mysql-ssl/tls.crt {"# " if spec.tlsUseSelfSigned else ""}ssl-key=/etc/mysql-ssl/tls.key loose_group_replication_recovery_use_ssl=1 {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_verify_server_cert=1 {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_ca=/etc/mysql-ssl/{ca_file_name} #{"# " if not has_crl else ""}loose_group_replication_recovery_ssl_crl=/etc/mysql-ssl/crl.pem {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_cert=/etc/mysql-ssl/tls.crt {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_key=/etc/mysql-ssl/tls.key 99-extra.cnf: | # Additional user configurations taken from spec.mycnf in InnoDBCluster. # Do not edit directly. {utils.indent(spec.mycnf, 4) if spec.mycnf else ""} """ cm = yaml.safe_load(tmpl) prefix = 5 for subsystem in spec.add_to_initconf_cbs: for add_to_initconf_cb in spec.add_to_initconf_cbs[subsystem]: add_to_initconf_cb(cm, f"{prefix:02d}-", logger) prefix = prefix + 1 return cm def prepare_service_failover_job(batchjob_name: str, force: str, options: dict, spec: InnoDBClusterSpec, logger: Logger) -> dict: cluster_domain = k8s_cluster_domain(logger) cluster_name = spec.cluster_name namespace = spec.namespace extras =", --force" if force else "" if options: extras += f", --timeout={options['timeout']}" if "timeout" in options else "" # TODO shell escape! Probably it's better to have the pod read this itself extras += f", --invalidate-replica-clusters={','.join(options['invalidateReplicaClusters'])}" if "invalidateReplicaClusters" in options else "" job = f""" apiVersion: batch/v1 kind: Job metadata: name: {batchjob_name} namespace: {namespace} spec: template: spec: serviceAccountName: mysql-switchover-sa securityContext: runAsUser: 27 runAsGroup: 27 fsGroup: 27 restartPolicy: Never containers: - name: failover image: {spec.operator_image} imagePullPolicy: {spec.mysql_image_pull_policy} command: ["mysqlsh", "--pym", "mysqloperator", "csfo", --cluster-name, {cluster_name}, "--namespace", {namespace} {extras}] securityContext: allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true capabilities: drop: - ALL env: - name: MYSQLSH_USER_CONFIG_HOME value: /tmp - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} volumeMounts: - name: tmp mountPath: /tmp volumes: - name: tmp emptyDir: {{}} """ return yaml.safe_load(job) def prepare_metrics_service_monitors(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]: monitors = [] prefix = '' for subsystem in spec.get_svc_monitor_cbs: for cb in spec.get_svc_monitor_cbs[subsystem]: (monitor_name, monitor) = cb(logger) if monitor: monitors.append(monitor) return monitors def update_stateful_set_spec(sts : api_client.V1StatefulSet, patch: dict) -> None: api_apps.patch_namespaced_stateful_set( sts.metadata.name, sts.metadata.namespace, body=patch) def update_service(svc: api_client.V1Deployment, spec: InnoDBClusterSpec, logger: Logger) -> None: body = prepare_cluster_service(spec, logger) print(body) api_core.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, body=body) def update_mysql_image(sts: api_client.V1StatefulSet, cluster: InnoDBCluster, spec: AbstractServerSetSpec, patcher, logger: Logger) -> None: """Update MySQL Server image This will also update the sidecar container to the current operator version, so that a single rolling upgrade covers both and we don't require a restart for upgrading sidecar. """ logger.info("update_mysql_image") # Operators <= 8.0.32-2.0.8 don't set this environment variable, we have to make sure it is there cluster_domain_env = [{ "name": "MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN", "value": k8s_cluster_domain(logger) }] patch = {"spec": {"template": {"spec": { "containers": [ {"name": "sidecar", "image": spec.operator_image, "env": cluster_domain_env }, {"name": "mysql", "image": spec.mysql_image, "env": cluster_domain_env }, ], "initContainers": [ {"name": "fixdatadir", "image": spec.operator_image, "env": cluster_domain_env }, {"name": "initconf", "image": spec.operator_image, "env": cluster_domain_env }, {"name": "initmysql", "image": spec.mysql_image, "env": cluster_domain_env }, ]} }}} # TODO [compat8.3.0] remove this when compatibility pre 8.3.0 isn't needed anymore keyring_update = spec.keyring.upgrade_to_component(sts, spec, logger) if keyring_update: logger.info("Need to upgrade keyring from plugin to component") (cm, key_sts_patch) = keyring_update utils.merge_patch_object(patch["spec"]["template"], key_sts_patch) kopf.adopt(cm) patcher.create_configmap(spec.namespace, cm["metadata"]["name"], cm, on_apiexception_generic_handler) #api_core.create_namespaced_config_map(spec.namespace, cm) initconf_patch = [{"op": "remove", "path": "/data/03-keyring-oci.cnf"}] #try: # api_core.patch_namespaced_config_map(f"{spec.cluster_name}-initconf", # spec.namespace, initconf_patch) #except ApiException as exc: # # This might happen during a retry or some other case where it was # # removed already # logger.info(f"Failed to remove keyring config from initconf, ignoring: {exc}") patcher.patch_configmap(spec.namespace, f"{spec.cluster_name}-initconf", initconf_patch, on_apiexception_404_handler) cm = prepare_initconf(cluster, spec, logger) patcher.patch_configmap(spec.namespace, cm['metadata']['name'], cm, on_apiexception_generic_handler) #api_core.patch_namespaced_config_map( # cm['metadata']['name'], sts.metadata.namespace, body=cm) patcher.patch_sts(patch) # update_stateful_set_spec(sts, patch) def update_operator_image(sts: api_client.V1StatefulSet, spec: InnoDBClusterSpec) -> None: patch = {"spec": {"template": {"spec": { "containers": [ {"name": "sidecar", "image": spec.operator_image} ], "initContainers": [ {"name": "fixdatadir", "image": spec.operator_image}, {"name": "initconf", "image": spec.operator_image} ]} }}} update_stateful_set_spec(sts, patch) def update_pull_policy(sts: api_client.V1StatefulSet, spec: InnoDBClusterSpec, logger: Logger) -> dict: patch = {"spec": {"template": {"spec": { "initContainers": [ {"name": "initconf", "imagePullPolicy": spec.sidecar_image_pull_policy}, {"name": "initmysql", "imagePullPolicy": spec.mysql_image_pull_policy} ], "containers": [ {"name": "sidecar", "imagePullPolicy": spec.sidecar_image_pull_policy}, {"name": "mysql", "imagePullPolicy": spec.mysql_image_pull_policy} ]} }}} return patch def update_template_property(sts: api_client.V1StatefulSet, property_name: str, property_value: str, logger: Logger) -> None: patch = {"spec": {"template": {"spec": { property_name: property_value }}}} update_stateful_set_spec(sts, patch) def update_objects_for_subsystem(subsystem: InnoDBClusterSpecProperties, cluster: InnoDBCluster, patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None: logger.info(f"update_objects_for_subsystem: {subsystem}") sts = cluster.get_stateful_set() svc = cluster.get_service() spec = cluster.parsed_spec if subsystem in spec.get_configmaps_cbs: print(f"\t\tWalking over get_configmaps_cbs len={len(spec.get_configmaps_cbs[subsystem])}") #TODO: This won't delete old CMs but only replace old ones, if are still in use, with new content # or create new ones. The solution is to use tuple returning like get_svc_monitor_cbs, where # the cm name will be returned as first tuple element and second will be just None. This will # signal that this CM should be removed, as not in use anymore. for get_configmap_cb in spec.get_configmaps_cbs[subsystem]: prefix = '' new_configmaps = get_configmap_cb(prefix, logger) if not new_configmaps: continue for (cm_name, new_cm) in new_configmaps: current_cm = cluster.get_configmap(cm_name) if current_cm: if not new_cm: print(f"\t\t\tDeleting CM {cluster.namespace}/{cm_name}") #patcher.delete_configmap(cluster.namespace, cm_name, on_apiexception_404_handler) cluster.delete_configmap(cm_name) continue data_differs = current_cm.data != new_cm["data"] if data_differs: print(f"\t\t\tReplacing CM {cluster.namespace}/{cm_name}") current_cm.data = new_cm["data"] #patcher.replace_configmap(cluster.namespace, cm_name, current_cm, on_apiexception_404_handler) api_core.replace_namespaced_config_map(cm_name, cluster.namespace, body=current_cm) else: print(f"\t\t\tNo such cm exists. Creating {cluster.namespace}/{new_cm}") kopf.adopt(new_cm) #patcher.create_configmap(cluster.namespace, new_cm['metadata']['name'], new_cm, on_apiexception_generic_handler) api_core.create_namespaced_config_map(cluster.namespace, new_cm) if subsystem in spec.add_to_sts_cbs: print(f"\t\tCurrent container count: {len(sts.spec.template.spec.containers)}") print(f"\t\tWalking over add_to_sts_cbs len={len(spec.add_to_sts_cbs[subsystem])}") changed = False sts.spec = spec_to_dict(sts.spec) for add_to_sts_cb in spec.add_to_sts_cbs[subsystem]: changed = True print("\t\t\tPatching STS") add_to_sts_cb(sts, patcher, logger) if changed: new_container_names = [c["name"] for c in patcher.get_sts_path('/spec/template/spec/containers') if c["name"] not in ["mysql", "sidecar"]] print(f"\t\t\tNew containers: {new_container_names}") new_volumes_names = [c["name"] for c in patcher.get_sts_path('/spec/template/spec/volumes')] print(f"\t\t\tNew volumes: {new_volumes_names}") new_volume_mounts = [(c["name"], c["volumeMounts"]) for c in patcher.get_sts_path('/spec/template/spec/containers') if c["name"] not in ["mysql", "sidecar"]] print(f"\t\t\tNew volume mounts: {new_volume_mounts}") # There might be configmap changes, which when mounted will change the server, so we rollover # For fine grained approache the get_configmap should return whether there are such changes that require # a restart. With a restart, for example, the Cluster1LFSGeneralLogEnableDisableEnable test will hang restart_patch = {"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":utils.isotime()}}}}} patcher.patch_sts(restart_patch) #patcher.submit_patches(restart_sts=True) print(f"\t\t\tSTS {'patched' if changed else 'unchanged. No rollover upgrade!'}") if subsystem in spec.get_add_to_svc_cbs: print(f"\t\tWalking over get_add_to_svc_cbs len={len(spec.get_add_to_svc_cbs[subsystem])}") changed = False for add_to_svc_cb in spec.get_add_to_svc_cbs[subsystem]: changed = True print("\t\t\tPatching SVC") add_to_svc_cb(svc, logger) if changed: api_core.replace_namespaced_service(svc.metadata.name, svc.metadata.namespace, svc) print(f"\t\t\tSVC {'patched' if changed else 'unchanged'}") if subsystem in spec.get_svc_monitor_cbs: for subsystem in spec.get_svc_monitor_cbs: for cb in spec.get_svc_monitor_cbs[subsystem]: (monitor_name, monitor) = cb(logger) # monitor could be empty, this means - delete old monitor with monitor_name print(f"\t\t\tChecking for old ServiceMonitor {monitor_name}") if cluster.get_service_monitor(monitor_name): print(f"\t\t\tRemoving old ServiceMonitor {monitor_name}") try: api_customobj.delete_namespaced_custom_object("monitoring.coreos.com", "v1", cluster.namespace, "servicemonitors", monitor_name) except Exception as exc: print(f"\t\t\tPrevious ServiceMonitor {monitor_name} was not removed. Reason: {exc}") if monitor: kopf.adopt(monitor) print(f"\t\t\tCreating ServiceMonitor {monitor} ...") try: api_customobj.create_namespaced_custom_object("monitoring.coreos.com", "v1", cluster.namespace, "servicemonitors", monitor) except Exception as exc: # This might be caused by Prometheus Operator missing # we won't fail for that print(f"\t\t\tServiceMonitor {monitor_name} NOT created!") print(exc) cluster.warn(action="CreateCluster", reason="CreateResourceFailed", message=f"{exc}") else: print(f"\t\t\tNew ServiceMonitor {monitor_name} will not be created. Monitoring disabled.") def update_objects_for_logs(cluster: InnoDBCluster, patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None: subsystem = InnoDBClusterSpecProperties.LOGS.value update_objects_for_subsystem(subsystem, cluster, patcher, logger) def update_objects_for_metrics(cluster: InnoDBCluster, patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None: subsystem = InnoDBClusterSpecProperties.METRICS.value update_objects_for_subsystem(subsystem, cluster, patcher, logger) def remove_read_replica(cluster: InnoDBCluster, rr: ReadReplicaSpec): name = rr['name'] try: api_core.delete_namespaced_config_map(f"{cluster.name}-{name}-initconf", cluster.namespace) except Exception as exc: print(f"ConfigMap for ReadReplica {name} was not removed. This is usually ok. Reason: {exc}") try: api_core.delete_namespaced_service(rr.headless_service_name, cluster.namespace) except Exception as exc: print(f"Service for ReadReplica {name} was not removed. This is usually ok. Reason: {exc}") try: api_apps.delete_namespaced_stateful_set(f"{cluster.name}-{name}", cluster.namespace) except Exception as exc: print(f"StatefulSet for ReadReplica {name} was not removed. This is usually ok. Reason: {exc}") def on_first_cluster_pod_created(cluster: InnoDBCluster, logger: Logger) -> None: # Add finalizer to the cluster object to prevent it from being deleted # until the last pod is properly deleted. cluster.add_cluster_finalizer() def on_last_cluster_pod_removed(cluster: InnoDBCluster, logger: Logger) -> None: # Remove cluster finalizer because the last pod was deleted, this lets # the cluster object to be deleted too logger.info( f"Last pod for cluster {cluster.name} was deleted, removing cluster finalizer...") cluster.remove_cluster_finalizer() from enum import Enum from typing import Callable, cast from .. import kubeutils class PatchTarget(Enum): STS = "STS" DEPLOY = "DEPLOYMENT" CM = "CONFIGMAP" class ApiCommandType(Enum): PATCH_STS = "PATCH_STS" PATCH_DEPLOY = "PATCH_DEPLOY" CREATE_CM = "CREATE_CM" DELETE_CM = "DELETE_CM" REPLACE_CM = "REPLACE_CM" PATCH_CM = "PATCH_CM" OnApiExceptionHandler = Callable[[ApiException, Logger], None] def on_apiexception_404_handler(exc: ApiException, logger: Logger): if exc.status == 404: logger.warning("Object not found! Exception: {exc}") return raise exc def on_apiexception_generic_handler(exc: ApiException, logger: Logger): logger.warning("ApiException: {exc}") class ApiCommand: def __init__(self, type: ApiCommandType, namespace: str, name: str, body: Optional[dict] = None, on_api_exception: Optional[OnApiExceptionHandler] = None): self.type = type self.namespace = namespace self.name = name self.body = body self.on_api_exception = on_api_exception def run(self, logger: Logger) -> Optional[api_client.V1Status]: try: if self.type == ApiCommandType.CREATE_CM: status = cast(api_client.V1Status, api_core.create_namespaced_config_map(self.namespace, self.body)) elif self.type == ApiCommandType.DELETE_CM: delete_body = api_client.V1DeleteOptions(grace_period_seconds=0) status = cast(api_client.V1Status, api_core.delete_namespaced_config_map(self.name, self.namespace, body=delete_body)) return status elif self.type == ApiCommandType.REPLACE_CM: status = cast(api_client.V1Status, api_core.replace_namespaced_config_map(self.name, self.namespace, body=self.body)) elif self.type == ApiCommandType.PATCH_CM: status = cast(api_client.V1Status, api_core.patch_namespaced_config_map(self.name, self.namespace, self.body)) except kubeutils.ApiException as exc: if self.on_api_exception is not None: self.on_api_exception(exc, logger) else: raise return status def snail_to_camel(s: str) -> str: if s.find("_") == -1: return s # Special case for '_exec' # For some reason for preStop with `exec` when dict-ified we get `pre_stop`` with `_exec` # 'lifecycle': { # 'post_start': None, # 'pre_stop': {'_exec': {'command': ['sh', '-c', 'sleep 60 && mysqladmin -ulocalroot shutdown']}, # If we don't handle that it becomes `Exec` if len(s) and s[0] == "_": s = s[1:] words = s.split("_") ret = words[0] + "".join(word.title() for word in words[1:]) return ret def item_snail_to_camel(item): if isinstance(item, dict): # k8s API will return some fields as None, like # spec.containers[1].readinessProbe` : Required value: must specify a handler type # spec.containers[1].startupProbe: Required value: must specify a handler type # So we strip here the None values. Might hit somewhere where None is legit but for now it works! return {snail_to_camel(key):item_snail_to_camel(value) for key, value in item.items() if value is not None} if isinstance(item, list): return [item_snail_to_camel(value) for value in item] return item def spec_to_dict(spec) -> dict: return item_snail_to_camel(spec.to_dict()) def strategic_merge(original, patch): if isinstance(original, dict) and isinstance(patch, dict): return merge_dicts(original, patch) elif isinstance(original, list) and isinstance(patch, list): return original + patch return patch def merge_dicts(original, patch): for key, value in patch.items(): if key in original: original[key] = strategic_merge(original[key], value) else: original[key] = value return original class InnoDBClusterObjectModifier: def __init__(self, cluster: InnoDBCluster, logger: Logger): self.server_sts_patch = {} self.sts_changed = False self.sts_template_changed = False self.deploy_changed = False self.router_deploy_patch = {} self.cluster = cluster self.logger = logger self.commands: list[ApiCommand] = [] self.sts = self.cluster.get_stateful_set() self.sts.spec = spec_to_dict(self.sts.spec) self.sts_spec_changed = False def _apply_server_sts_patch_to_sts_spec_if_needed(self): if len(self.server_sts_patch): # update with accumulated patches before overwriting self.logger.info(f"Applying accumulated patches {self.server_sts_patch['spec']} to sts.spec") utils.merge_patch_object(self.sts.spec, self.server_sts_patch["spec"], none_deletes=True) self.sts_spec_changed = True self.server_sts_patch = {} def _get_or_patch_sts_path(self, path: str, patch: Optional[dict] = None): self.logger.info(f"get_sts_path: patch_path={path}\n") # patches could be cached in self.server_sts_patch, so apply them, if any, before returning parts of self.sts.spec self._apply_server_sts_patch_to_sts_spec_if_needed() base = self.sts.spec # first is leading backslash, then is 'spec', so we skip path_elements = path.split("/")[2:] if len(path) > 1: for path_element in path_elements[0:-1]: #self.logger.info(f"{path_element} in base = {path_element in base}\n") assert path_element in base base = base[path_element] if patch is not None: base[path_elements[-1]] = patch self.logger.info(f"get_sts_path: after patching self.sts.spec={self.sts.spec}") return base[path_elements[-1]] def get_sts_path(self, path: str): return self._get_or_patch_sts_path(path, None) def patch_sts(self, patch: dict) -> None: self.sts_changed = True if "template" in patch: self.sts_template_changed = True self.logger.info(f"Accumulating patch={patch}\n") # cache the patches without merging into self.sts.spec # in case there is no call to patch_sts_overwrite then we won't "replace" # the existing sts object but "patch" it # if an sts_overwrite happens, we have to apply the patches to the self.sts.spec before overwriting utils.merge_patch_object(self.server_sts_patch, patch, none_deletes=True) def patch_sts_overwrite(self, patch: dict, patch_path: str) -> None: self.sts_changed = True if "template" in patch: self.sts_template_changed = True self.sts_spec_changed = True self._get_or_patch_sts_path(patch_path, patch) return if len(self.server_sts_patch): # update with accumulated patches before overwriting self.logger.info(f"Applying accumulated patches before applying overwriting patch patch={self.server_sts_patch['spec']}") #self.logger.info(f"STS.spec before apply={self.sts.spec}") utils.merge_patch_object(self.sts.spec, self.server_sts_patch["spec"], none_deletes=True) #self.logger.info(f"STS.spec after apply={self.sts.spec}") self.server_sts_patch = {} #self.logger.info(f"patch_sts_overwrite: patch_path={patch_path} patch={patch}\n") base = self.sts.spec # first is leading backslash, then is spec, so we skip patch_path_elements = patch_path.split("/")[2:] if len(patch_path) > 1: for patch_path_element in patch_path_elements[0:-1]: #self.logger.info(f"{patch_path_element} in base = {patch_path_element in base}") assert patch_path_element in base base = base[patch_path_element] # base[] #self.logger.info(f"\nExchanging {base[patch_path_elements[-1]]} \nwith\n{patch}") base[patch_path_elements[-1]] = patch self.logger.info(f"\n\nself.sts.spec={self.sts.spec}\n\n") def patch_deploy(self, patch: dict) -> None: self.deploy_changed = True self.logger.info(f"patch={patch}") utils.merge_patch_object(self.router_deploy_patch, patch, none_deletes=True) def create_configmap(self, namespace: str, name: str, body: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None: self.commands.append(ApiCommand(ApiCommandType.CREATE_CM, namespace, name, body, on_api_exception)) def delete_configmap(self, namespace: str, name: str, on_api_exception: Optional[OnApiExceptionHandler]) -> None: self.commands.append(ApiCommand(ApiCommandType.DELETE_CM, namespace, name, None, on_api_exception)) def replace_configmap(self, namespace: str, name: str, body: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None: self.commands.append(ApiCommand(ApiCommandType.REPLACE_CM, namespace, name, body, on_api_exception)) def patch_configmap(self, namespace: str, name: str, patch: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None: self.commands.append(ApiCommand(ApiCommandType.PATCH_CM, namespace, name, patch, on_api_exception)) def submit_patches(self) -> None: self.logger.info(f"InnoDBClusterObjectModifier::submit_patches sts_changed={self.sts_changed} sts_spec_changed={self.sts_spec_changed} len(router_deploy_patch)={len(self.router_deploy_patch)} len(commands)={len(self.commands)}") if (self.sts_changed or len(self.router_deploy_patch) or len(self.commands)): if len(self.commands): for command in self.commands: command.run(self.logger) if self.sts_changed: if self.sts_spec_changed: # this should apply server_sts_patch over self.sts.spec and empty self.server_sts_patch # in the next step we will `replace` the STS and not `patch` it # Only if the self.sts.spec is not touched should be server_sts_patch be applied, as otherwise # changes to self.sts.spec will be skipped/forgotten self._apply_server_sts_patch_to_sts_spec_if_needed() if len(self.server_sts_patch): self.logger.info(f"Patching STS.spec with {self.server_sts_patch}") if self.sts_template_changed: restart_patch = {"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":utils.isotime()}}}}} utils.merge_patch_object(self.server_sts_patch, restart_patch) api_apps.patch_namespaced_stateful_set(self.sts.metadata.name, self.sts.metadata.namespace, body=self.server_sts_patch) self.server_sts_patch = {} else: self.logger.info(f"Replacing STS.spec with {self.sts.spec}") # only if template has been changed. It could be that only scale up/down (spec['replica'] changed) has happened # in that case if we set an annotation in the template the whole STS will be rolled over and this is not # what is wanted. If replica count is increased only new pods are needed, respectively when replica is decreased. # if replica is up and we set the annotation actually what will happen is rollover update and then scale up - we disturb the cluster # (and tests will fail) if self.sts_template_changed: if not "annotations" in self.sts.spec["template"]["metadata"] or self.sts.spec["template"]["metadata"]["annotations"] is None: self.sts.spec["template"]["metadata"]["annotations"] = {} self.sts.spec["template"]["metadata"]["annotations"]["kubectl.kubernetes.io/restartedAt"] = utils.isotime() api_apps.replace_namespaced_stateful_set(self.sts.metadata.name, self.sts.metadata.namespace, body=self.sts) if len(self.router_deploy_patch) and (deploy:= self.cluster.get_router_deployment()): self.logger.info(f"Patching Deployment with {self.router_deploy_patch}") router_objects.update_deployment_spec(deploy, self.router_deploy_patch)