mysqloperator/controller/backup/backup_objects.py (181 lines of code) (raw):

# Copyright (c) 2020, 2023, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # from typing import List from logging import Logger import os import yaml import kopf from copy import deepcopy from .backup_api import BackupProfile, BackupSchedule, MySQLBackupSpec from .. import utils, config, consts from .. innodbcluster.cluster_api import InnoDBClusterSpec from .. kubeutils import api_cron_job, k8s_cluster_domain from . import meb_cert def prepare_meb_code_configmap(spec: InnoDBClusterSpec) -> dict: script_dir = os.path.dirname(os.path.abspath(__file__)) directory_path = os.path.join(script_dir, "meb/") data = {} for filename in os.listdir(directory_path): filepath = os.path.join(directory_path, filename) if not filename.endswith(".py"): continue if os.path.isfile(filepath): with open(filepath, 'r') as f: data[filename] = f.read() spec = { "metadata": { "name": spec.name+"-mebcode" }, "data": data } return spec def _prepare_backup_auth_secret(spec: InnoDBClusterSpec) -> dict: """ Secrets for authenticating backup tool with MySQL. """ backup_user = utils.b64encode(config.BACKUP_USER_NAME) backup_pwd = utils.b64encode(utils.generate_password()) # We use a separate secrets object for the backup, so that we don't need to # give access for the main secret to backup instances. # No need to namespace it. A namespaced secret will be created by the caller tmpl = f""" apiVersion: v1 kind: Secret metadata: name: {spec.name}-backup labels: tier: mysql mysql.oracle.com/cluster: {spec.name} app.kubernetes.io/name: mysql-innodbcluster app.kubernetes.io/instance: idc-{spec.name} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator data: backupUsername: {backup_user} backupPassword: {backup_pwd} """ return yaml.safe_load(tmpl) def prepare_backup_secrets(spec: InnoDBClusterSpec) -> list[dict]: secrets = [_prepare_backup_auth_secret(spec)] if any(getattr(profile, 'meb', None) for profile in spec.backupProfiles): secrets.append(meb_cert.prepare_meb_tls_secret(spec)) return secrets def prepare_backup_job(jobname: str, spec: MySQLBackupSpec) -> dict: cluster_domain = k8s_cluster_domain(None) # No need to namespace it. A namespaced job will be created by the caller tmpl = f""" apiVersion: batch/v1 kind: Job metadata: name: {jobname} labels: tier: mysql mysql.oracle.com/cluster: {spec.clusterName} app.kubernetes.io/name: mysql-innodbcluster-backup-task app.kubernetes.io/instance: idc-{spec.clusterName} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: template: metadata: labels: tier: mysql mysql.oracle.com/cluster: {spec.clusterName} app.kubernetes.io/name: mysql-innodbcluster-backup-task app.kubernetes.io/instance: idc-{spec.clusterName} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: serviceAccountName: {spec.serviceAccountName} securityContext: runAsUser: 27 runAsGroup: 27 fsGroup: 27 containers: - name: operator-backup-job image: {spec.operator_image} imagePullPolicy: {spec.operator_image_pull_policy} command: ["mysqlsh", "--pym", "mysqloperator", "backup", "--command", "execute-backup", "--namespace", "{spec.namespace}", "--backup-object-name", "{spec.name}", "--job-name", "{jobname}", "--backup-dir", "/mnt/storage" ] securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true capabilities: drop: - ALL env: - name: MYSQLSH_USER_CONFIG_HOME value: /mysqlsh - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN value: {cluster_domain} volumeMounts: - name: shellhome mountPath: /mysqlsh volumes: - name: shellhome emptyDir: {{}} restartPolicy: Never terminationGracePeriodSeconds: 60 """ job = yaml.safe_load(tmpl) metadata = {} if spec.backupProfile.podAnnotations: metadata['annotations'] = spec.backupProfile.podAnnotations if spec.backupProfile.podLabels: metadata['labels'] = spec.backupProfile.podLabels if len(metadata): utils.merge_patch_object(job["spec"]["template"], {"metadata" : metadata }) spec.add_to_pod_spec(job["spec"]["template"], "operator-backup-job") return job def prepare_mysql_backup_object_by_profile_name(name: str, cluster_name: str, backup_profile_name: str) -> dict: # No need to namespace it. A namespaced job will be created by the caller tmpl = f""" apiVersion: {consts.GROUP}/{consts.VERSION} kind: {consts.MYSQLBACKUP_KIND} metadata: name: {name} labels: tier: mysql mysql.oracle.com/cluster: {cluster_name} app.kubernetes.io/name: mysql-innodbcluster-backup-task app.kubernetes.io/instance: idc-{cluster_name} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: clusterName: {cluster_name} backupProfileName: {backup_profile_name} addTimestampToBackupDirectory: false """ return yaml.safe_load(tmpl.replace("\n\n", "\n")) def prepare_mysql_backup_object_by_profile_object(name: str, cluster_name: str, backup_profile: dict) -> dict: pod_labels = backup_profile.get("podLabels") if pod_labels: labels = f""" podLabels: {pod_labels} """ pod_annotations = backup_profile.get("podAnnotations") if pod_annotations: annotations = f""" podAnnotations: {pod_annotations} """ # No need to namespace it. A namespaced job will be created by the caller tmpl = f""" apiVersion: {consts.GROUP}/{consts.VERSION} kind: {consts.MYSQLBACKUP_KIND} metadata: name: {name} labels: tier: mysql mysql.oracle.com/cluster: {cluster_name} app.kubernetes.io/name: mysql-innodbcluster-backup-task app.kubernetes.io/instance: idc-{cluster_name} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: clusterName: {cluster_name} backupProfile: name: {name} {utils.indent(labels, 4) if pod_labels else ""} {utils.indent(annotations, 4) if pod_annotations else ""} addTimestampToBackupDirectory: false """ backup_object = yaml.safe_load(tmpl.replace("\n\n", "\n")) utils.merge_patch_object(backup_object['spec'], {'backupProfile' : backup_profile}, "spec.backupProfile") return backup_object def backup_job_name(cluster_name, schedule_name: str) -> str: return f"{cluster_name}-{schedule_name}{utils.timestamp(dash = False, four_digit_year = False)}" def schedule_cron_job_name(cluster_name, schedule_name : str) -> str: # cb = create backup return f"{cluster_name}-{schedule_name}-cb" def schedule_cron_job_job(namespace, cluster_name, schedule_name : str): name = schedule_cron_job_name(cluster_name, schedule_name) return api_cron_job.read_namespaced_cron_job(name, namespace) def patch_cron_template_for_backup_schedule(base: dict, cluster_name: str, schedule_profile: BackupSchedule) -> dict: new_object = deepcopy(base) new_object["metadata"]["name"] = schedule_cron_job_name(cluster_name, schedule_profile.name) new_object["spec"]["suspend"] = not schedule_profile.enabled new_object["spec"]["schedule"] = schedule_profile.schedule new_object["spec"]["jobTemplate"]["spec"]["template"]["spec"]["containers"][0]["command"].extend(["--schedule-name", schedule_profile.name]) if schedule_profile.timeZone: new_object["spec"]["timeZone"] = schedule_profile.timeZone metadata = {} if schedule_profile.backupProfile.podAnnotations: metadata['annotations'] = schedule_profile.backupProfile.podAnnotations if schedule_profile.backupProfile.podLabels: metadata['labels'] = schedule_profile.backupProfile.podLabels if len(metadata): utils.merge_patch_object(new_object["spec"]["jobTemplate"]["spec"]["template"], {"metadata" : metadata }) return new_object def get_cron_job_template(spec: InnoDBClusterSpec) -> dict: tmpl = f""" apiVersion: batch/v1 kind: CronJob metadata: labels: tier: mysql mysql.oracle.com/cluster: {spec.name} app.kubernetes.io/name: mysql-innodbcluster app.kubernetes.io/instance: idc-{spec.name} app.kubernetes.io/managed-by: mysql-operator app.kubernetes.io/created-by: mysql-operator spec: concurrencyPolicy: Forbid jobTemplate: spec: backoffLimit: 0 template: spec: securityContext: runAsUser: 27 runAsGroup: 27 fsGroup: 27 runAsNonRoot: true containers: - name: operator-backup-job-cron image: {spec.operator_image} imagePullPolicy: {spec.operator_image_pull_policy} command: ["mysqlsh", "--pym", "mysqloperator", "backup", "--command", "create-backup-object", "--namespace", "{spec.namespace}", "--cluster-name", "{spec.name}" ] securityContext: # These can't go to spec.template.spec.securityContext # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level) # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: true # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that runAsNonRoot: true capabilities: drop: - ALL env: - name: MYSQLSH_USER_CONFIG_HOME value: /mysqlsh volumeMounts: - name: shellhome mountPath: /mysqlsh volumes: - name: shellhome emptyDir: {{}} restartPolicy: Never terminationGracePeriodSeconds: 60 serviceAccountName: {spec.serviceAccountName} """ base = yaml.safe_load(tmpl.replace("\n\n", "\n")) return base def compare_schedules(spec: InnoDBClusterSpec, old: dict, new: dict, logger: Logger) -> dict: logger.info(f"backup_objects.compare_schedules {spec.namespace}/{spec.name}") old_schedules = {} if not old is None: for old_object in old: schedule = BackupSchedule(spec) # Don't try to load the profile as it might not exist anymore: # If the profile name is changed together with the schedule that references it, # then we can't load with the old name from k8s API, as won't exist. All what exists # is in new. schedule.parse(old_object, "", load_profile=False) old_schedules[schedule.name] = schedule if old == new: return { 'removed': {}, 'added': {}, 'modified': {}, 'unmodified': old_schedules } new_schedules = {} if not new is None: for new_object in new: schedule = BackupSchedule(spec) schedule.parse(new_object, "") new_schedules[schedule.name] = schedule removed = {} added = {} modified = {} unmodified = {} # Check for modified, non-modified and removed objects for old_schedule_name, old_schedule_obj in old_schedules.items(): if old_schedule_name in new_schedules: new_schedule_obj = new_schedules[old_schedule_name] if old_schedule_obj == new_schedule_obj: unmodified[old_schedule_name] = old_schedule_obj else: modified[old_schedule_name] = { 'old' : old_schedule_obj, 'new' : new_schedule_obj} else: removed[old_schedule_name] = old_schedule_obj # Now it's time to check if something was added for new_schedule_name, new_schedule_obj in new_schedules.items(): if not (new_schedule_name in old_schedules): added[new_schedule_name] = new_schedule_obj return { 'removed': removed, 'added': added, 'modified': modified, 'unmodified': unmodified } def update_schedules(spec: InnoDBClusterSpec, old: dict, new: dict, logger: Logger) -> int: logger.info("backup_objects.updates_schedules") namespace = spec.namespace cluster_name = spec.name diff = compare_schedules(spec, old, new, logger) logger.info(f"backup_objects.update_schedules: diff={diff}") if (len(diff['removed']) == 0 and \ len(diff['added']) == 0 and \ len(diff['modified']) == 0): logger.info("No backup schedules changes") return 0 if len(diff['removed']): logger.info(f"backup_objects.update_schedules: will delete {len(diff['removed'])} backup schedule objects") for rm_schedule_name in diff['removed']: cj_name = schedule_cron_job_name(cluster_name, rm_schedule_name) logger.info(f"backup_objects.update_schedules: deleting schedule {cj_name} in {namespace} ") api_cron_job.delete_namespaced_cron_job(cj_name, namespace) if len(diff['added']): logger.info(f"backup_objects.update_schedules: will add {len(diff['added'])} backup schedule objects") cj_template = get_cron_job_template(spec) for add_schedule_name, add_schedule_obj in diff['added'].items(): cj_name = schedule_cron_job_name(cluster_name, add_schedule_name) logger.info(f"backup_objects.update_schedules: adding schedule {cj_name} in {namespace}") cronjob = patch_cron_template_for_backup_schedule(cj_template, spec.name, add_schedule_obj) kopf.adopt(cronjob) api_cron_job.create_namespaced_cron_job(namespace=namespace, body=cronjob) if len(diff['modified']): logger.info(f"backup_objects.update_schedules: will modify {len(diff['modified'])} backup schedule objects") cj_template = get_cron_job_template(spec) for mod_schedule_name, mod_schedule_objects in diff['modified'].items(): cj_name = schedule_cron_job_name(cluster_name, mod_schedule_name) logger.info(f"backup_objects.update_schedules: modifying schedule {cj_name} in {namespace}") cronjob = patch_cron_template_for_backup_schedule(cj_template, spec.name, mod_schedule_objects["new"]) logger.info(f"backup_objects.update_schedules: {cronjob}") api_cron_job.replace_namespaced_cron_job(name=cj_name, namespace=namespace, body=cronjob) def ensure_schedules_use_current_image(spec: InnoDBClusterSpec, logger: Logger) -> None: for schedule in spec.backupSchedules: logger.info(f"Checking operator version for backup schedule {spec.namespace}/{spec.name}/{schedule.name}") cj = schedule_cron_job_job(spec.namespace, spec.name, schedule.name) old_image = cj.spec.job_template.spec.template.spec.containers[0].image if old_image != spec.operator_image: cj.spec.job_template.spec.template.spec.containers[0].image = spec.operator_image cj_name = schedule_cron_job_name(spec.name, schedule.name) api_cron_job.replace_namespaced_cron_job(name=cj_name, namespace=spec.namespace, body=cj)