# Copyright (c) 2020, 2025, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#

import random
import string
from logging import Logger, getLogger
import kopf
from typing import List, Dict, Optional, cast
from ..kubeutils import client as api_client
from .. import utils, config, consts
from .cluster_api import InnoDBCluster, AbstractServerSetSpec, InnoDBClusterSpec, ReadReplicaSpec, InnoDBClusterSpecProperties
from .. import fqdn
import yaml
from ..kubeutils import api_core, api_apps, api_customobj, k8s_cluster_domain, ApiException
from . import router_objects
import base64
import os
from ..backup import backup_objects

# TODO replace app field with component (mysqld,router) and tier (mysql)

# This service includes all instances, even those that are not ready

def prepare_cluster_service(spec: AbstractServerSetSpec, logger: Logger) -> dict:
    extra_label = ""
    if type(spec) is InnoDBClusterSpec:
        instance_type = "group-member"
        instances = spec.instances
    elif type(spec) is ReadReplicaSpec:
        instance_type = "read-replica"
        extra_label = f"mysql.oracle.com/read-replica: {spec.name}"
    else:
        raise NotImplementedError(f"Unknown subtype {type(spec)} for creating StatefulSet")

    tmpl = f"""
apiVersion: v1
kind: Service
metadata:
  name: {spec.headless_service_name}
  namespace: {spec.namespace}
  labels:
    tier: mysql
    mysql.oracle.com/cluster: {spec.cluster_name}
    mysql.oracle.com/instance-type: {instance_type}
    {extra_label}
spec:
  clusterIP: None
  publishNotReadyAddresses: true
  ports:
  - name: mysql
    port: {spec.mysql_port}
    targetPort: {spec.mysql_port}
  - name: mysqlx
    port: {spec.mysql_xport}
    targetPort: {spec.mysql_xport}
  - name: gr-xcom
    port: {spec.mysql_grport}
    targetPort: {spec.mysql_grport}
  selector:
    component: mysqld
    tier: mysql
    mysql.oracle.com/cluster: {spec.cluster_name}
    mysql.oracle.com/instance-type: {instance_type}
    {extra_label}
  type: ClusterIP
"""

    svc = yaml.safe_load(tmpl)

    if spec.instanceService.annotations:
        if not 'annotations' in svc['metadata']:
            svc['metadata']['annotations'] = {}
        svc['metadata']['annotations'] = spec.instanceService.annotations

    if spec.instanceService.labels:
        if not 'labels' in svc['metadata']:
            svc['metadata']['labels'] = {}
        svc['metadata']['labels'] = spec.instanceService.labels | svc['metadata']['labels']

    for subsystem in spec.get_add_to_svc_cbs:
        print(f"\t\tChecking subsystem {subsystem}")
        for add_to_svc_cb in spec.get_add_to_svc_cbs[subsystem]:
            print(f"\t\tAdding {subsystem} SVC bits")
            add_to_svc_cb(svc, logger)

    return svc


def prepare_secrets(spec: InnoDBClusterSpec) -> dict:
    def encode(s):
        return base64.b64encode(bytes(s, "ascii")).decode("ascii")

    # TODO: should we share the suffix with router&backup and stor in IC?
    # miught make it simpler to diagnose and remove
    characters = string.ascii_letters + string.digits
    suffix = ''.join(random.choice(characters) for _ in range(10))

    admin_user = encode(config.CLUSTER_ADMIN_USER_NAME + '-' + suffix)
    admin_pwd = encode(utils.generate_password())

    tmpl = f"""
apiVersion: v1
kind: Secret
metadata:
  name: {spec.name}-privsecrets
data:
  clusterAdminUsername: {admin_user}
  clusterAdminPassword: {admin_pwd}
"""
    return yaml.safe_load(tmpl)


def prepare_cluster_pod_disruption_budget(spec: InnoDBClusterSpec) -> dict:
    tmpl = f"""
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: {spec.name}-pdb
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      component: mysqld
      tier: mysql
      mysql.oracle.com/cluster: {spec.name}
"""
    pdb = yaml.safe_load(tmpl.replace("\n\n", "\n"))

    return pdb

def get_restore_container(cluster: InnoDBCluster, spec: AbstractServerSetSpec, cluster_domain: str):
    if not isinstance(spec, InnoDBClusterSpec):
        # restore happens on a primary, read replica doesn't need a restore container
        return ("", "")

    if not spec.initDB or not spec.initDB.meb:
        # No MEB Restore - no extra container
        return ("", "")

    container = f"""
      - name: restore
        command:
        - mysqlsh
        - --pym
        - meb.restore_main
        - --pod-name
        - "$(POD_NAME)"
        - --pod-namespace
        - "$(POD_NAMESPACE)"
        - --cluster-name
        - "{spec.name}"
        - --datadir
        - /var/lib/mysql

        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
          value: {cluster_domain}
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /tmp
        - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
          value: never
        image: {spec.mysql_image}
        imagePullPolicy: {spec.mysql_image_pull_policy}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
          privileged: false
          readOnlyRootFilesystem: true
          runAsGroup: 27
          runAsUser: 27
        volumeMounts:
        - mountPath: /usr/lib/mysqlsh/python-packages/meb
          name: mebcode
        - mountPath: /var/lib/mysql
          name: datadir
        - mountPath: /tmp
          # sharing with initconf to transfer restore information
          name: initconf-tmp
        - mountPath: /mysqlsh
          name: mebmyslhshhome
        - name: rundir
          mountPath: /var/run/mysqld
"""

    volumes = f"""
      - name: mebmyslhshhome
        emptyDir: {{}}
      - name: mebcode
        configMap:
            name: {cluster.name}-mebcode
"""

    return (container, volumes)


def get_meb_container(cluster: InnoDBCluster, spec: InnoDBClusterSpec,
                      cluster_domain: str):
    if not isinstance(spec, InnoDBClusterSpec):
        # Currently backup can't be done on RRs
        return ("", "")

    if all(not getattr(profile, 'meb', None) for profile in spec.backupProfiles):
        # No profile requests MEB
        return ("", "")

    if spec.tlsUseSelfSigned:
        ssl_cert = "/var/lib/mysql/server-cert.pem"
        ssl_key = "/var/lib/mysql/server-key.pem"
        mount = ""
    else:
        ssl_cert = "//etc/mysql-ssl/tls.crt"
        ssl_key = "/etc/mysql-ssl/tls.key"
        mount = """
        - mountPath: /etc/mysql-ssl"
          name: ssldata
       """

    container = f"""
      - name: meb
        command:
        - mysqlsh
        - --pym
        - meb.meb_main
        - --pod-name
        - "$(POD_NAME)"
        - --pod-namespace
        - "$(POD_NAMESPACE)"
        - --datadir
        - /var/lib/mysql
        - --ssl-cert
        - {ssl_cert}
        - --ssl-key
        - {ssl_key}

        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
          value: {cluster_domain}
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /tmp
        - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
          value: never
        image: {spec.mysql_image}
        imagePullPolicy: {spec.mysql_image_pull_policy}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
          privileged: false
          readOnlyRootFilesystem: true
          runAsGroup: 27
          runAsUser: 27
        volumeMounts:
        - mountPath: /usr/lib/mysqlsh/python-packages/meb
          name: mebcode
        - mountPath: /var/lib/mysql
          name: datadir
        - mountPath: /tmp
          # sharing with initconf to transfer restore information
          name: initconf-tmp
        - mountPath: /mysqlsh
          name: mebmyslhshhome
        - name: rundir
          mountPath: /var/run/mysqld
        - name: mebtls
          mountPath: /tls
{mount}
"""

    volumes = f"""
      - name: mebmyslhshhome
        emptyDir: {{}}
      - name: mebcode
        configMap:
            name: {cluster.name}-mebcode
      - name: mebtls
        secret:
            secretName: {cluster.name}-meb-tls
"""

    return (container, volumes)


# TODO - check if we need to add a finalizer to the sts and svc (and if so, what's the condition to remove them)
# TODO - check if we need to make readinessProbe take into account innodb recovery times

# TODO - create ServiceAccount ({cluster.name}-sidecar-sa) for the mysql pods and bind it to the mysql-sidecar role

# ## About lifecycle probes:
#
# ### startupProbe
#
# used to let k8s know that the container is still starting up.
#
# * Server startup can take anywhere from a few seconds to several minutes.
# * If the server is initializing for the first time, it will take a few seconds.
# * If the server is restarting after a clean shut down and there's not much data,
#   it will take even less to startup.
# * But if it's restarting after a crash and there's a lot of data, the InnoDB
#   recovery can take a very long time to finish.
# Since we want success to be reported asap, we set the interval to a small value.
# We also set the successThreshold to > 1, so that we can report success once
# every now and then to reset the failure counter.
# NOTE: Currently, the startup probe will never fail the startup. We assume that
# mysqld will abort if the startup fails. Once a method to check whether the
# server is actually frozen during startup, the probe should be updated to stop
# resetting the failure counter and let it actually fail.
#
# ### readinessProbe
#
# used to let k8s know that the container can be marked as ready, which means
# it can accept external connections. We need mysqld to be always accessible,
# so the probe should always succeed as soon as startup succeeds.
# Any failures that happen after it's up don't matter for the probe, because
# we want GR and the operator to control the fate of the container, not the
# probe.
#
# ### livenessProbe
#
# this checks that the server is still healthy. If it fails above the threshold
# (e.g. because of a deadlock), the container is restarted.
#
def prepare_cluster_stateful_set(cluster: InnoDBCluster, spec: AbstractServerSetSpec, logger: Logger) -> dict:
    init_mysql_argv = ["mysqld", "--user=mysql"]
#    if config.enable_mysqld_general_log:
#        init_mysql_argv.append("--general-log=1")

    mysql_argv = init_mysql_argv

    # we only need this in initconf, we pass it to all operator images to be
    # on the safe side
    cluster_domain = k8s_cluster_domain(logger)

    fqdn_template = fqdn.idc_service_fqdn_template(spec)

    extra_label = ""
    if type(spec) is InnoDBClusterSpec:
        instance_type = "group-member"
    elif type(spec) is ReadReplicaSpec:
        instance_type = "read-replica"
        extra_label = f"mysql.oracle.com/read-replica: {spec.name}"
        # initial startup no replica, we scale up once the group is running
        # spec.instances therefore will be reduced by the caller!
    else:
        raise NotImplementedError(f"Unknown subtype {type(spec)} for creating StatefulSet")

    fixdatadir_container = ""
    if spec.dataDirPermissions.setRightsUsingInitContainer:
        fixdatadir_container = f"""
- name: fixdatadir
  image: {spec.operator_image}
  imagePullPolicy: {spec.sidecar_image_pull_policy}
  command: ["bash", "-c", "chown 27:27 /var/lib/mysql && chmod 0700 /var/lib/mysql"]
  securityContext:
    # make an exception for this one
    runAsNonRoot: false
    runAsUser: 0
    # These can't go to spec.template.spec.securityContext
    # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
    # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
    # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
    # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
    allowPrivilegeEscalation: false
    privileged: false
    readOnlyRootFilesystem: true
    capabilities:
      add:
      - CHOWN
      - FOWNER
      drop:
      - ALL
  volumeMounts:
  - name: datadir
    mountPath: /var/lib/mysql
  env:
  - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
    value: {cluster_domain}
  - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
    value: never
"""
    logger.info(f"Fix data container {'EN' if fixdatadir_container else 'DIS'}ABLED")

    # if meb  restore ...
    (restore_container, restore_volumes) = get_restore_container(cluster, spec, cluster_domain)
    (meb_container, meb_volumes) = get_meb_container(cluster, spec, cluster_domain)

    # TODO re-add "--log-file=",
    tmpl = f"""
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: {spec.name}
  annotations:
      mysql.oracle.com/fqdn-template: '{fqdn_template}'
  labels:
    tier: mysql
    mysql.oracle.com/cluster: {spec.cluster_name}
    mysql.oracle.com/instance-type: {instance_type}
    {extra_label}
    app.kubernetes.io/name: mysql-innodbcluster
    app.kubernetes.io/instance: mysql-innodbcluster-{spec.name}
    app.kubernetes.io/component: database
    app.kubernetes.io/managed-by: mysql-operator
    app.kubernetes.io/created-by: mysql-operator
spec:
  serviceName: {spec.headless_service_name}
  replicas: {spec.instances}
  podManagementPolicy: Parallel
  selector:
    matchLabels:
      component: mysqld
      tier: mysql
      mysql.oracle.com/cluster: {spec.cluster_name}
      mysql.oracle.com/instance-type: {instance_type}
      {extra_label}
      app.kubernetes.io/name: mysql-innodbcluster-mysql-server
      app.kubernetes.io/instance: mysql-innodbcluster-{spec.name}-mysql-server
      app.kubernetes.io/component: database
      app.kubernetes.io/managed-by: mysql-operator
      app.kubernetes.io/created-by: mysql-operator
  template:
    metadata:
      annotations:
        mysql.oracle.com/fqdn-template: '{fqdn_template}'
      labels:
        component: mysqld
        tier: mysql
        mysql.oracle.com/cluster: {spec.cluster_name}
        mysql.oracle.com/instance-type: {instance_type}
        {extra_label}
        app.kubernetes.io/name: mysql-innodbcluster-mysql-server
        app.kubernetes.io/instance: mysql-innodbcluster-{spec.name}-mysql-server
        app.kubernetes.io/component: database
        app.kubernetes.io/managed-by: mysql-operator
        app.kubernetes.io/created-by: mysql-operator
    spec:
      readinessGates:
      - conditionType: "mysql.oracle.com/configured"
      - conditionType: "mysql.oracle.com/ready"
      serviceAccountName: {spec.serviceAccountName}
      securityContext:
        runAsUser: 27
        runAsGroup: 27
        fsGroup: 27
{utils.indent("fsGroupChangePolicy: " + spec.dataDirPermissions.fsGroupChangePolicy, 8) if spec.dataDirPermissions.fsGroupChangePolicy else ""}
        runAsNonRoot: true
      terminationGracePeriodSeconds: 120
      initContainers:
{utils.indent(fixdatadir_container, 6)}

      - name: initconf
        image: {spec.operator_image}
        imagePullPolicy: {spec.sidecar_image_pull_policy}
        # For datadir see the datadir volum mount
        command: ["mysqlsh", "--log-level=@INFO", "--pym", "mysqloperator", "init",
                  "--pod-name", "$(POD_NAME)",
                  "--pod-namespace", "$(POD_NAMESPACE)",
                  "--datadir", "/var/lib/mysql"
        ]
        securityContext:
          # These can't go to spec.template.spec.securityContext
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: true
          # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that
          runAsNonRoot: true
          capabilities:
            drop:
            - ALL
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
          value: {cluster_domain}
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /tmp
        - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
          value: never
        volumeMounts:
        - name: initconfdir
          mountPath: /mnt/initconf
          readOnly: true
        - name: datadir
          mountPath: /var/lib/mysql
        - name: mycnfdata
          mountPath: /mnt/mycnfdata
        - name: initconf-tmp
          mountPath: /tmp
        - name: rootcreds
          readOnly: true
          # rootHost is not obligatory and thus might not exist in the secret
          # Nevertheless K8s won't complain and instead of mounting an empty file
          # will create a directory (/rootcreds/rootHost will be an empty directory)
          # For more information see below the comment regarding rootcreds.
          subPath: rootHost
          mountPath: /rootcreds/rootHost

{restore_container}

      - name: initmysql
        image: {spec.mysql_image}
        imagePullPolicy: {spec.mysql_image_pull_policy}
        args: {init_mysql_argv}
        securityContext:
          # These can't go to spec.template.spec.securityContext
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: true
          # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that
          runAsNonRoot: true
          capabilities:
            drop:
            - ALL
        env:
        - name: MYSQL_INITIALIZE_ONLY
          value: "1"
        - name: MYSQL_RANDOM_ROOT_PASSWORD
          value: "1"
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /tmp
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/mysql
        - name: rundir
          mountPath: /var/run/mysqld
        - name: mycnfdata
          mountPath: /etc/my.cnf.d
          subPath: my.cnf.d
        - name: mycnfdata
          mountPath: /docker-entrypoint-initdb.d
          subPath: docker-entrypoint-initdb.d
        - name: mycnfdata
          mountPath: /etc/my.cnf
          subPath: my.cnf
        - name: initmysql-tmp
          mountPath: /tmp
        - name: varlibmysqlfiles # The entrypoint of the container `touch`-es 2 files there
          mountPath: /var/lib/mysql-files
      containers:
      - name: sidecar
        image: {spec.operator_image}
        imagePullPolicy: {spec.sidecar_image_pull_policy}
        command: ["mysqlsh", "--pym", "mysqloperator", "sidecar",
                  "--pod-name", "$(POD_NAME)",
                  "--pod-namespace", "$(POD_NAMESPACE)",
                  "--datadir", "/var/lib/mysql"
        ]
        securityContext:
          # These can't go to spec.template.spec.securityContext
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: true
          # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that
          runAsNonRoot: true
          capabilities:
            drop:
            - ALL
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MYSQL_UNIX_PORT
          value: /var/run/mysqld/mysql.sock
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /mysqlsh
        - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
          value: {cluster_domain}
        - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
          value: never
        volumeMounts:
        - name: rundir
          mountPath: /var/run/mysqld
        - name: mycnfdata
          mountPath: /etc/my.cnf.d
          subPath: my.cnf.d
        - name: mycnfdata
          mountPath: /etc/my.cnf
          subPath: my.cnf
        - name: shellhome
          mountPath: /mysqlsh
        - name: sidecar-tmp
          mountPath: /tmp
{utils.indent(spec.extra_sidecar_volume_mounts, 8)}
      - name: mysql
        image: {spec.mysql_image}
        imagePullPolicy: {spec.mysql_image_pull_policy}
        args: {mysql_argv}
        securityContext:
          # These can't go to spec.template.spec.securityContext
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodTemplateSpec / https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSpec
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#PodSecurityContext - for pods (top level)
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#Container
          # See: https://pkg.go.dev/k8s.io/api@v0.26.1/core/v1#SecurityContext - for containers
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: true
          # The value is is inherited from the PodSecurityContext but dumb sec checkers might not know that
          runAsNonRoot: true
          capabilities:
            drop:
            - ALL
        lifecycle:
          preStop:
            exec:
              # 60 is the default value for dba.gtidWaitTimeout
              # see https://dev.mysql.com/doc/mysql-shell/8.0/en/mysql-innodb-cluster-working-with-cluster.html
              command: ["sh", "-c", "sleep 60 && mysqladmin -ulocalroot shutdown"]
        startupProbe:
          exec:
            command: ["/livenessprobe.sh", "8"]
          initialDelaySeconds: 5
          periodSeconds: 3
          failureThreshold: 10000
          successThreshold: 1
          timeout: 2
        readinessProbe:
          exec:
            command: ["/readinessprobe.sh"]
          periodSeconds: 5
          initialDelaySeconds: 10
          failureThreshold: 10000
        livenessProbe:
          exec:
            command: ["/livenessprobe.sh"]
          initialDelaySeconds: 15
          periodSeconds: 15
          failureThreshold: 10
          successThreshold: 1
          timeout: 5
        env:
        - name: MYSQL_UNIX_PORT
          value: /var/run/mysqld/mysql.sock
        - name: MYSQLSH_CREDENTIAL_STORE_SAVE_PASSWORDS
          value: never
{utils.indent(spec.extra_env, 8)}
        ports:
        - containerPort: {spec.mysql_port}
          name: mysql
        - containerPort: {spec.mysql_xport}
          name: mysqlx
        - containerPort: {spec.mysql_grport}
          name: gr-xcom
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/mysql
        - name: rundir
          mountPath: /var/run/mysqld
        - name: mycnfdata
          mountPath: /etc/my.cnf.d
          subPath: my.cnf.d
        - name: mycnfdata
          mountPath: /etc/my.cnf
          subPath: my.cnf
        - name: initconfdir
          mountPath: /livenessprobe.sh
          subPath: livenessprobe.sh
        - name: initconfdir
          mountPath: /readinessprobe.sh
          subPath: readinessprobe.sh
        - name: varlibmysqlfiles # The entrypoint of the container `touch`-es 2 files there
          mountPath: /var/lib/mysql-files
        - name: mysql-tmp
          mountPath: /tmp
{utils.indent(spec.extra_volume_mounts, 8)}
{meb_container}
      volumes:
      - name: mycnfdata
        emptyDir: {{}}
      - name: rundir
        emptyDir: {{}}
      - name: varlibmysqlfiles
        emptyDir: {{}}
      - name: initconfdir
        configMap:
          name: {spec.name}-initconf
          defaultMode: 0755
      - name: shellhome
        emptyDir: {{}}
      - name: initconf-tmp
        emptyDir: {{}}
      - name: initmysql-tmp
        emptyDir: {{}}
      - name: mysql-tmp
        emptyDir: {{}}
      - name: sidecar-tmp
        emptyDir: {{}}
{meb_volumes}
{restore_volumes}
      # If we declare it and not use it anywhere as backing for a volumeMount K8s won't check
      # if the volume exists. K8s seems to be lazy in that regard. We don't need the information
      # from this secret directly, as the sidecar of pod 0 will fetch the information using the K8s API
      # However, we won't not to be lazy in checking if the secret exists and make it easier for the
      # administrator to find out if the secret is missing. If we mount it in a init or normal container,
      # the pod # will get stuck into "Ready:0/2 Init:0/3" with
      # Warning  FailedMount  XXs (....)  kubelet  "MountVolume.SetUp failed for volume "rootcreds" : secret ".........." not found" error to be seen in describe.
      - name: rootcreds
        secret:
          secretName: {spec.secretName}
          defaultMode: 0400
{utils.indent(spec.extra_volumes, 6)}
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 2Gi
"""

    statefulset = yaml.safe_load(tmpl.replace("\n\n", "\n"))

    metadata = {}
    if spec.podAnnotations:
        print("\t\tAdding podAnnotations")
        metadata['annotations'] = spec.podAnnotations
    if spec.podLabels:
        print("\t\tAdding podLabels")
        metadata['labels'] = spec.podLabels

    if len(metadata):
        utils.merge_patch_object(statefulset["spec"]["template"], {"metadata" : metadata })

    if spec.keyring:
        print("\t\tAdding keyring STS bit")
        spec.keyring.add_to_sts_spec(statefulset)

    for subsystem in spec.add_to_sts_cbs:
        print(f"\t\tadd_to_sts_cb: Checking subsystem {subsystem}")
        for add_to_sts_cb in spec.add_to_sts_cbs[subsystem]:
            print(f"\t\tAdding {subsystem} STS bits")
            add_to_sts_cb(statefulset, None, logger)

    if os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_CM"):
        ps_cm_ns = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_NS", "mysql-operator")
        ps_cm_name = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_CM")
        ps_cm_item = os.getenv("MYSQL_OPERATOR_GLOBAL_PODSPEC_ITEM", "podspec.yaml")
        ps_cm = api_core.get_namespaced_config_map(ps_cm_name, ps_cm_name)
        ps_override = yaml.safe_load(ps_cm[ps_cm_item])
        print("\t\tAdding global podSpec")
        utils.merge_patch_object(statefulset["spec"]["template"]["spec"],
                                 ps_override, f"{ps_cm_ns}.{ps_cm_name}.{ps_cm_item}")

    if spec.podSpec:
        print("\t\tAdding podSpec")
        utils.merge_patch_object(statefulset["spec"]["template"]["spec"],
                                 spec.podSpec, "spec.podSpec")

    if spec.datadirVolumeClaimTemplate:
        print("\t\tAdding datadirVolumeClaimTemplate")
        utils.merge_patch_object(statefulset["spec"]["volumeClaimTemplates"][0]["spec"],
                                 spec.datadirVolumeClaimTemplate, "spec.volumeClaimTemplates[0].spec")
    return statefulset

def update_stateful_set_size(cluster: InnoDBCluster, rr_spec: ReadReplicaSpec, logger: Logger) -> None:
    sts = cluster.get_read_replica_stateful_set(rr_spec.name)
    if sts:
        patch = {"spec": {"replicas": rr_spec.instances}}
        api_apps.patch_namespaced_stateful_set(
            sts.metadata.name, sts.metadata.namespace, body=patch)


def prepare_service_account_sidecar(spec: AbstractServerSetSpec) -> dict:
    account = f"""
apiVersion: v1
kind: ServiceAccount
metadata:
  name: {spec.serviceAccountName}
  namespace: {spec.namespace}
{spec.image_pull_secrets}
"""

    account = yaml.safe_load(account)

    return account


def prepare_service_account_patch_for_image_pull_secrets(spec: AbstractServerSetSpec) -> Optional[Dict]:
    if not spec.imagePullSecrets:
        return None
    return {
        "imagePullSecrets" : spec.imagePullSecrets
    }


def prepare_service_account_switchover(spec: AbstractServerSetSpec) -> dict:
    account = f"""
apiVersion: v1
kind: ServiceAccount
metadata:
  name: {spec.switchoverServiceAccountName}
  namespace: {spec.namespace}
{spec.image_pull_secrets}
"""

    account = yaml.safe_load(account)

    return account


def prepare_role_binding_sidecar(spec: AbstractServerSetSpec) -> dict:
    rolebinding = f"""
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: {spec.roleBindingName}
  namespace: {spec.namespace}
subjects:
  - kind: ServiceAccount
    name: {spec.serviceAccountName}
roleRef:
  kind: ClusterRole
  name: mysql-sidecar
  apiGroup: rbac.authorization.k8s.io
"""
    rolebinding = yaml.safe_load(rolebinding)

    return rolebinding


def prepare_role_binding_switchover(spec: AbstractServerSetSpec) -> dict:
    rolebinding = f"""
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: {spec.switchoverRoleBindingName}
  namespace: {spec.namespace}
subjects:
  - kind: ServiceAccount
    name: {spec.switchoverServiceAccountName}
roleRef:
  kind: ClusterRole
  name: mysql-switchover
  apiGroup: rbac.authorization.k8s.io
"""
    rolebinding = yaml.safe_load(rolebinding)

    return rolebinding


def prepare_additional_configmaps(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]:
    configmaps = []
    prefix = ''
    for subsystem in spec.get_configmaps_cbs:
        for cb in spec.get_configmaps_cbs[subsystem]:
            if cms := cb(prefix, logger):
              for (cm_name, cm) in cms:
                  if cm:
                      configmaps.append(cm)

    # TODO: This should use the CB mechanism like other CMs above
    if isinstance(spec, InnoDBClusterSpec):
        clusterspec = cast(InnoDBClusterSpec, spec)
        if  (clusterspec.initDB and clusterspec.initDB.meb) \
                or any(getattr(profile, 'meb', None) for profile in clusterspec.backupProfiles):

            if spec.edition != config.Edition.enterprise:
                raise kopf.TemporaryError("A BackupProfile requires MySQL Enterprise Backup, but the cluster doesn't use Enterprise Edition")

            configmaps.append(backup_objects.prepare_meb_code_configmap(clusterspec))

    return configmaps


def prepare_component_config_configmaps(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]:
    configmaps = [
        spec.keyring.get_component_config_configmap_manifest()
    ]

    return configmaps


def prepare_component_config_secrets(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]:
    secrets = []
    cm = spec.keyring.get_component_config_secret_manifest()
    if cm:
        secrets.append(cm)

    return secrets

def prepare_initconf(cluster: InnoDBCluster, spec: AbstractServerSetSpec, logger: Logger) -> dict:

    with open(os.path.dirname(os.path.abspath(__file__))+'/router-entrypoint-run.sh.tpl', 'r') as entryfile:
        router_entrypoint = "".join(entryfile.readlines())

    liveness_probe = """#!/bin/bash
# Copyright (c) 2020, 2021, Oracle and/or its affiliates.

# Insert 1 success every this amount of failures
# (assumes successThreshold is > 1)
max_failures_during_progress=$1

# Ping the server to see if it's up
mysqladmin -umysqlhealthchecker ping
# If it's up, we succeed
if [ $? -eq 0 ]; then
  exit 0
fi

if [ -z $max_failures_during_progress ]; then
  exit 1
fi

# If the init/startup/InnoDB recovery is still ongoing, we're
# not succeeded nor failed yet, so keep failing and getting time
# extensions until it succeeds.
# We currently rely on the server to exit/abort if the init/startup fails,
# but ideally there would be a way to check whether the server is
# still making progress and not just stuck waiting on a frozen networked
# volume, for example.

if [ -f /tmp/fail-counter ]; then
  fail_count=$(($(cat /tmp/fail-counter) + 1))
else
  fail_count=1
fi

if [ $fail_count -gt $max_failures_during_progress ]; then
  # Report success to reset the failure counter upstream and get
  # a time extension
  rm -f /tmp/fail-counter
  exit 0
else
  # Update the failure counter and fail out
  echo $fail_count > /tmp/fail-counter
  exit 1
fi
"""

    readiness_probe = """#!/bin/bash
# Copyright (c) 2020, 2022, Oracle and/or its affiliates.

# Once the container is ready, it's always ready.
if [ -f /tmp/mysql-ready ]; then
  exit 0
fi

# Ping server to see if it is ready
if mysqladmin -umysqlhealthchecker ping; then
  touch /tmp/mysql-ready
  exit 0
else
  exit 1
fi
"""

    has_crl = cluster.tls_has_crl()

    if not spec.tlsUseSelfSigned:
        ca_file_name = cluster.get_ca_and_tls().get("CA", "ca.pem")
    else:
        ca_file_name = ""

    tmpl = f"""
apiVersion: v1
kind: ConfigMap
metadata:
  name: {spec.name}-initconf
data:
  initdb-localroot.sql: |
    set sql_log_bin=0;
    # Create socket authenticated localroot@localhost account
    CREATE USER localroot@localhost IDENTIFIED WITH auth_socket AS 'mysql';
    GRANT ALL ON *.* TO localroot@localhost WITH GRANT OPTION;
    GRANT PROXY ON ''@'' TO localroot@localhost WITH GRANT OPTION;
    # Drop the default account created by the docker image
    DROP USER IF EXISTS healthchecker@localhost;
    # Create account for liveness probe
    CREATE USER mysqlhealthchecker@localhost IDENTIFIED WITH auth_socket AS 'mysql';
    set sql_log_bin=1;


  readinessprobe.sh: |
{utils.indent(readiness_probe, 4)}


  livenessprobe.sh: |
{utils.indent(liveness_probe, 4)}

  router-entrypoint-run.sh.tpl: |
{utils.indent(router_entrypoint, 4)}


  my.cnf.in: |
    # Server identity related options (not shared across instances).
    # Do not edit.
    [mysqld]
    server_id=@@SERVER_ID@@
    report_host=@@HOSTNAME@@
    datadir=/var/lib/mysql
    loose_mysqlx_socket=/var/run/mysqld/mysqlx.sock
    socket=/var/run/mysqld/mysql.sock
    local-infile=1

    [mysql]
    socket=/var/run/mysqld/mysql.sock

    [mysqladmin]
    socket=/var/run/mysqld/mysql.sock

    !includedir /etc/my.cnf.d


  00-basic.cnf: |
    # Basic configuration.
    # Do not edit.
    [mysqld]
    plugin_load_add=auth_socket.so
    loose_auth_socket=FORCE_PLUS_PERMANENT
    skip_log_error
    log_error_verbosity=3

  01-group_replication.cnf: |
    # GR and replication related options
    # Do not edit.
    [mysqld]
    log_bin={spec.name}
    enforce_gtid_consistency=ON
    gtid_mode=ON
    skip_replica_start=1

  02-ssl.cnf: |
    # SSL configurations
    # Do not edit.
    [mysqld]
    {"# " if spec.tlsUseSelfSigned else ""}ssl-ca=/etc/mysql-ssl/{ca_file_name}
    {"# " if not has_crl else ""}ssl-crl=/etc/mysql-ssl/crl.pem
    {"# " if spec.tlsUseSelfSigned else ""}ssl-cert=/etc/mysql-ssl/tls.crt
    {"# " if spec.tlsUseSelfSigned else ""}ssl-key=/etc/mysql-ssl/tls.key

    loose_group_replication_recovery_use_ssl=1
    {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_verify_server_cert=1

    {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_ca=/etc/mysql-ssl/{ca_file_name}
    #{"# " if not has_crl else ""}loose_group_replication_recovery_ssl_crl=/etc/mysql-ssl/crl.pem
    {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_cert=/etc/mysql-ssl/tls.crt
    {"# " if spec.tlsUseSelfSigned else ""}loose_group_replication_recovery_ssl_key=/etc/mysql-ssl/tls.key

  99-extra.cnf: |
    # Additional user configurations taken from spec.mycnf in InnoDBCluster.
    # Do not edit directly.
{utils.indent(spec.mycnf, 4) if spec.mycnf else ""}
"""

    cm = yaml.safe_load(tmpl)

    prefix = 5
    for subsystem in spec.add_to_initconf_cbs:
        for add_to_initconf_cb in spec.add_to_initconf_cbs[subsystem]:
          add_to_initconf_cb(cm, f"{prefix:02d}-", logger)
          prefix = prefix + 1

    return cm


def prepare_service_failover_job(batchjob_name: str, force: str, options: dict, spec: InnoDBClusterSpec, logger: Logger) -> dict:
    cluster_domain = k8s_cluster_domain(logger)

    cluster_name = spec.cluster_name
    namespace = spec.namespace
    extras =", --force" if force else ""

    if options:
        extras += f", --timeout={options['timeout']}" if "timeout" in options else ""
        # TODO shell escape! Probably it's better to have the pod read this itself
        extras += f", --invalidate-replica-clusters={','.join(options['invalidateReplicaClusters'])}" if "invalidateReplicaClusters" in options else ""

    job = f"""
apiVersion: batch/v1
kind: Job
metadata:
  name: {batchjob_name}
  namespace: {namespace}
spec:
  template:
    spec:
      serviceAccountName: mysql-switchover-sa
      securityContext:
        runAsUser: 27
        runAsGroup: 27
        fsGroup: 27
      restartPolicy: Never
      containers:
      - name: failover
        image: {spec.operator_image}
        imagePullPolicy: {spec.mysql_image_pull_policy}
        command: ["mysqlsh",  "--pym", "mysqloperator", "csfo", --cluster-name, {cluster_name}, "--namespace", {namespace} {extras}]
        securityContext:
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: true
          capabilities:
            drop:
            - ALL
        env:
        - name: MYSQLSH_USER_CONFIG_HOME
          value: /tmp
        - name: MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN
          value: {cluster_domain}
        volumeMounts:
        - name: tmp
          mountPath: /tmp
      volumes:
      - name: tmp
        emptyDir: {{}}
"""

    return  yaml.safe_load(job)



def prepare_metrics_service_monitors(spec: AbstractServerSetSpec, logger: Logger) -> List[Dict]:
    monitors = []
    prefix = ''
    for subsystem in spec.get_svc_monitor_cbs:
        for cb in spec.get_svc_monitor_cbs[subsystem]:
            (monitor_name, monitor) = cb(logger)
            if monitor:
                monitors.append(monitor)

    return monitors


def update_stateful_set_spec(sts : api_client.V1StatefulSet, patch: dict) -> None:
    api_apps.patch_namespaced_stateful_set(
        sts.metadata.name, sts.metadata.namespace, body=patch)


def update_service(svc: api_client.V1Deployment, spec: InnoDBClusterSpec, logger: Logger) -> None:
    body = prepare_cluster_service(spec, logger)
    print(body)
    api_core.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, body=body)


def update_mysql_image(sts: api_client.V1StatefulSet, cluster: InnoDBCluster,
                       spec: AbstractServerSetSpec,
                       patcher,
                       logger: Logger) -> None:
    """Update MySQL Server image

    This will also update the sidecar container to the current operator version,
    so that a single rolling upgrade covers both and we don't require a restart
    for upgrading sidecar.
    """
    logger.info("update_mysql_image")
    # Operators <= 8.0.32-2.0.8 don't set this environment variable, we have to make sure it is there
    cluster_domain_env = [{
        "name": "MYSQL_OPERATOR_K8S_CLUSTER_DOMAIN",
        "value": k8s_cluster_domain(logger)
    }]

    patch = {"spec": {"template":
                      {"spec": {
                          "containers": [
                               {"name": "sidecar",
                                "image": spec.operator_image,
                                "env": cluster_domain_env
                               },
                               {"name": "mysql",
                                "image": spec.mysql_image,
                                "env": cluster_domain_env
                               },
                          ],
                          "initContainers": [
                              {"name": "fixdatadir",
                               "image": spec.operator_image,
                               "env": cluster_domain_env
                                },
                              {"name": "initconf",
                               "image": spec.operator_image,
                               "env": cluster_domain_env
                              },
                              {"name": "initmysql",
                               "image": spec.mysql_image,
                               "env": cluster_domain_env
                              },
                          ]}
                       }}}

    # TODO [compat8.3.0] remove this when compatibility pre 8.3.0 isn't needed anymore
    keyring_update = spec.keyring.upgrade_to_component(sts, spec, logger)

    if keyring_update:
        logger.info("Need to upgrade keyring from plugin to component")
        (cm, key_sts_patch) = keyring_update
        utils.merge_patch_object(patch["spec"]["template"], key_sts_patch)

        kopf.adopt(cm)
        patcher.create_configmap(spec.namespace, cm["metadata"]["name"], cm, on_apiexception_generic_handler)
        #api_core.create_namespaced_config_map(spec.namespace, cm)

        initconf_patch = [{"op": "remove", "path": "/data/03-keyring-oci.cnf"}]
        #try:
        #    api_core.patch_namespaced_config_map(f"{spec.cluster_name}-initconf",
        #                                            spec.namespace, initconf_patch)
        #except ApiException as exc:
        #    # This might happen during a retry or some other case where it was
        #    # removed already
        #    logger.info(f"Failed to remove keyring config from initconf, ignoring: {exc}")
        patcher.patch_configmap(spec.namespace, f"{spec.cluster_name}-initconf", initconf_patch, on_apiexception_404_handler)

    cm = prepare_initconf(cluster, spec, logger)
    patcher.patch_configmap(spec.namespace, cm['metadata']['name'], cm, on_apiexception_generic_handler)
    #api_core.patch_namespaced_config_map(
    #    cm['metadata']['name'], sts.metadata.namespace, body=cm)

    patcher.patch_sts(patch)
#    update_stateful_set_spec(sts, patch)


def update_operator_image(sts: api_client.V1StatefulSet, spec: InnoDBClusterSpec) -> None:
    patch = {"spec": {"template":
                      {"spec": {
                          "containers": [
                               {"name": "sidecar", "image": spec.operator_image}
                          ],
                          "initContainers": [
                              {"name": "fixdatadir", "image": spec.operator_image},
                              {"name": "initconf", "image": spec.operator_image}
                          ]}
                       }}}
    update_stateful_set_spec(sts, patch)


def update_pull_policy(sts: api_client.V1StatefulSet, spec: InnoDBClusterSpec, logger: Logger) -> dict:
    patch = {"spec": {"template":
                      {"spec": {
                          "initContainers": [
                              {"name": "initconf", "imagePullPolicy": spec.sidecar_image_pull_policy},
                              {"name": "initmysql", "imagePullPolicy": spec.mysql_image_pull_policy}
                          ],
                          "containers": [
                               {"name": "sidecar", "imagePullPolicy": spec.sidecar_image_pull_policy},
                               {"name": "mysql", "imagePullPolicy": spec.mysql_image_pull_policy}
                          ]}
                       }}}
    return patch

def update_template_property(sts: api_client.V1StatefulSet, property_name: str, property_value: str, logger: Logger) -> None:
    patch = {"spec": {"template": {"spec": { property_name: property_value }}}}
    update_stateful_set_spec(sts, patch)


def update_objects_for_subsystem(subsystem: InnoDBClusterSpecProperties,
                                 cluster: InnoDBCluster,
                                 patcher: 'InnoDBClusterObjectModifier',
                                 logger: Logger) -> None:
    logger.info(f"update_objects_for_subsystem: {subsystem}")

    sts = cluster.get_stateful_set()
    svc = cluster.get_service()

    spec = cluster.parsed_spec

    if subsystem in spec.get_configmaps_cbs:
        print(f"\t\tWalking over get_configmaps_cbs len={len(spec.get_configmaps_cbs[subsystem])}")
        #TODO: This won't delete old CMs but only replace old ones, if are still in use, with new content
        #      or create new ones. The solution is to use tuple returning like get_svc_monitor_cbs, where
        #      the cm name will be returned as first tuple element and second will be just None. This will
        #      signal that this CM should be removed, as not in use anymore.
        for get_configmap_cb in spec.get_configmaps_cbs[subsystem]:
            prefix = ''
            new_configmaps = get_configmap_cb(prefix, logger)
            if not new_configmaps:
                continue
            for (cm_name, new_cm) in new_configmaps:
                current_cm = cluster.get_configmap(cm_name)
                if current_cm:
                    if not new_cm:
                        print(f"\t\t\tDeleting CM {cluster.namespace}/{cm_name}")
                        #patcher.delete_configmap(cluster.namespace, cm_name, on_apiexception_404_handler)
                        cluster.delete_configmap(cm_name)
                        continue

                    data_differs = current_cm.data != new_cm["data"]
                    if data_differs:
                        print(f"\t\t\tReplacing CM {cluster.namespace}/{cm_name}")
                        current_cm.data = new_cm["data"]
                        #patcher.replace_configmap(cluster.namespace, cm_name, current_cm, on_apiexception_404_handler)
                        api_core.replace_namespaced_config_map(cm_name, cluster.namespace, body=current_cm)
                else:
                    print(f"\t\t\tNo such cm exists. Creating {cluster.namespace}/{new_cm}")
                    kopf.adopt(new_cm)
                    #patcher.create_configmap(cluster.namespace, new_cm['metadata']['name'], new_cm, on_apiexception_generic_handler)
                    api_core.create_namespaced_config_map(cluster.namespace, new_cm)

    if subsystem in spec.add_to_sts_cbs:
        print(f"\t\tCurrent container count: {len(sts.spec.template.spec.containers)}")
        print(f"\t\tWalking over add_to_sts_cbs len={len(spec.add_to_sts_cbs[subsystem])}")
        changed = False
        sts.spec = spec_to_dict(sts.spec)
        for add_to_sts_cb in spec.add_to_sts_cbs[subsystem]:
            changed = True
            print("\t\t\tPatching STS")
            add_to_sts_cb(sts, patcher, logger)
        if changed:
            new_container_names = [c["name"] for c in patcher.get_sts_path('/spec/template/spec/containers') if c["name"] not in ["mysql", "sidecar"]]
            print(f"\t\t\tNew containers: {new_container_names}")
            new_volumes_names = [c["name"] for c in patcher.get_sts_path('/spec/template/spec/volumes')]
            print(f"\t\t\tNew volumes: {new_volumes_names}")
            new_volume_mounts = [(c["name"], c["volumeMounts"]) for c in patcher.get_sts_path('/spec/template/spec/containers') if c["name"] not in ["mysql", "sidecar"]]
            print(f"\t\t\tNew volume mounts: {new_volume_mounts}")

            # There might be configmap changes, which when mounted will change the server, so we rollover
            # For fine grained approache the get_configmap should return whether there are such changes that require
            # a restart. With a restart, for example, the Cluster1LFSGeneralLogEnableDisableEnable test will hang
            restart_patch = {"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":utils.isotime()}}}}}
            patcher.patch_sts(restart_patch)
            #patcher.submit_patches(restart_sts=True)

        print(f"\t\t\tSTS {'patched' if changed else 'unchanged. No rollover upgrade!'}")

    if subsystem in spec.get_add_to_svc_cbs:
        print(f"\t\tWalking over get_add_to_svc_cbs len={len(spec.get_add_to_svc_cbs[subsystem])}")
        changed = False
        for add_to_svc_cb in spec.get_add_to_svc_cbs[subsystem]:
            changed = True
            print("\t\t\tPatching SVC")
            add_to_svc_cb(svc, logger)
        if changed:
            api_core.replace_namespaced_service(svc.metadata.name, svc.metadata.namespace, svc)

        print(f"\t\t\tSVC {'patched' if changed else 'unchanged'}")

    if subsystem in spec.get_svc_monitor_cbs:
      for subsystem in spec.get_svc_monitor_cbs:
          for cb in spec.get_svc_monitor_cbs[subsystem]:
              (monitor_name, monitor) = cb(logger)
              # monitor could be empty, this means - delete old monitor with monitor_name
              print(f"\t\t\tChecking for old ServiceMonitor {monitor_name}")
              if cluster.get_service_monitor(monitor_name):
                  print(f"\t\t\tRemoving old ServiceMonitor {monitor_name}")
                  try:
                      api_customobj.delete_namespaced_custom_object("monitoring.coreos.com", "v1", cluster.namespace,
                                                                    "servicemonitors", monitor_name)
                  except Exception as exc:
                      print(f"\t\t\tPrevious ServiceMonitor {monitor_name} was not removed. Reason: {exc}")
              if monitor:
                  kopf.adopt(monitor)
                  print(f"\t\t\tCreating ServiceMonitor {monitor} ...")
                  try:
                      api_customobj.create_namespaced_custom_object("monitoring.coreos.com", "v1", cluster.namespace,
                                                                    "servicemonitors", monitor)
                  except Exception as exc:
                      # This might be caused by Prometheus Operator missing
                      # we won't fail for that
                      print(f"\t\t\tServiceMonitor {monitor_name} NOT created!")
                      print(exc)
                      cluster.warn(action="CreateCluster", reason="CreateResourceFailed", message=f"{exc}")
              else:
                  print(f"\t\t\tNew ServiceMonitor {monitor_name} will not be created. Monitoring disabled.")


def update_objects_for_logs(cluster: InnoDBCluster, patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None:
    subsystem = InnoDBClusterSpecProperties.LOGS.value
    update_objects_for_subsystem(subsystem, cluster, patcher, logger)

def update_objects_for_metrics(cluster: InnoDBCluster, patcher: 'InnoDBClusterObjectModifier', logger: Logger) -> None:
    subsystem = InnoDBClusterSpecProperties.METRICS.value
    update_objects_for_subsystem(subsystem, cluster, patcher, logger)


def remove_read_replica(cluster: InnoDBCluster, rr: ReadReplicaSpec):
    name = rr['name']
    try:
        api_core.delete_namespaced_config_map(f"{cluster.name}-{name}-initconf", cluster.namespace)
    except Exception as exc:
        print(f"ConfigMap for ReadReplica {name} was not removed. This is usually ok. Reason: {exc}")

    try:
        api_core.delete_namespaced_service(rr.headless_service_name, cluster.namespace)
    except Exception as exc:
        print(f"Service for ReadReplica {name} was not removed. This is usually ok. Reason: {exc}")

    try:
        api_apps.delete_namespaced_stateful_set(f"{cluster.name}-{name}", cluster.namespace)
    except Exception as exc:
        print(f"StatefulSet for ReadReplica  {name} was not removed. This is usually ok. Reason: {exc}")


def on_first_cluster_pod_created(cluster: InnoDBCluster, logger: Logger) -> None:
    # Add finalizer to the cluster object to prevent it from being deleted
    # until the last pod is properly deleted.
    cluster.add_cluster_finalizer()


def on_last_cluster_pod_removed(cluster: InnoDBCluster, logger: Logger) -> None:
    # Remove cluster finalizer because the last pod was deleted, this lets
    # the cluster object to be deleted too
    logger.info(
        f"Last pod for cluster {cluster.name} was deleted, removing cluster finalizer...")
    cluster.remove_cluster_finalizer()


from enum import Enum
from typing import Callable, cast
from .. import kubeutils

class PatchTarget(Enum):
    STS = "STS"
    DEPLOY = "DEPLOYMENT"
    CM = "CONFIGMAP"

class ApiCommandType(Enum):
    PATCH_STS = "PATCH_STS"
    PATCH_DEPLOY = "PATCH_DEPLOY"
    CREATE_CM = "CREATE_CM"
    DELETE_CM = "DELETE_CM"
    REPLACE_CM = "REPLACE_CM"
    PATCH_CM = "PATCH_CM"

OnApiExceptionHandler = Callable[[ApiException, Logger], None]

def on_apiexception_404_handler(exc: ApiException, logger: Logger):
    if exc.status == 404:
        logger.warning("Object not found! Exception: {exc}")
        return
    raise exc

def on_apiexception_generic_handler(exc: ApiException, logger: Logger):
    logger.warning("ApiException: {exc}")


class ApiCommand:
    def __init__(self,
                 type: ApiCommandType,
                 namespace: str,
                 name: str,
                 body: Optional[dict] = None,
                 on_api_exception: Optional[OnApiExceptionHandler] = None):
        self.type = type
        self.namespace = namespace
        self.name = name
        self.body = body
        self.on_api_exception = on_api_exception

    def run(self, logger: Logger) -> Optional[api_client.V1Status]:
        try:
            if self.type == ApiCommandType.CREATE_CM:
                status = cast(api_client.V1Status,
                              api_core.create_namespaced_config_map(self.namespace, self.body))
            elif self.type == ApiCommandType.DELETE_CM:
                delete_body = api_client.V1DeleteOptions(grace_period_seconds=0)
                status = cast(api_client.V1Status,
                              api_core.delete_namespaced_config_map(self.name, self.namespace, body=delete_body))
                return status
            elif self.type == ApiCommandType.REPLACE_CM:
                status = cast(api_client.V1Status,
                              api_core.replace_namespaced_config_map(self.name, self.namespace, body=self.body))
            elif self.type == ApiCommandType.PATCH_CM:
                status = cast(api_client.V1Status,
                              api_core.patch_namespaced_config_map(self.name, self.namespace, self.body))
        except kubeutils.ApiException as exc:
            if self.on_api_exception is not None:
                self.on_api_exception(exc, logger)
            else:
                raise

        return status


def snail_to_camel(s: str) -> str:
    if s.find("_") == -1:
        return s
    # Special case for '_exec'
    # For some reason for preStop with `exec` when dict-ified we get `pre_stop`` with `_exec`
    #  'lifecycle': {
    #    'post_start': None,
    #     'pre_stop': {'_exec': {'command': ['sh', '-c', 'sleep 60 && mysqladmin -ulocalroot shutdown']},
    # If we don't handle that it becomes `Exec`
    if len(s) and s[0] == "_":
        s = s[1:]

    words = s.split("_")
    ret = words[0] + "".join(word.title() for word in words[1:])
    return ret

def item_snail_to_camel(item):
    if isinstance(item, dict):
        # k8s API will return some fields as None, like
        # spec.containers[1].readinessProbe` : Required value: must specify a handler type
        # spec.containers[1].startupProbe: Required value: must specify a handler type
        # So we strip here the None values. Might hit somewhere where None is legit but for now it works!
        return {snail_to_camel(key):item_snail_to_camel(value) for key, value in item.items() if value is not None}
    if isinstance(item, list):
        return [item_snail_to_camel(value) for value in item]
    return item

def spec_to_dict(spec) -> dict:
    return item_snail_to_camel(spec.to_dict())

def strategic_merge(original, patch):
    if isinstance(original, dict) and isinstance(patch, dict):
        return merge_dicts(original, patch)
    elif isinstance(original, list) and isinstance(patch, list):
        return original + patch
    return patch

def merge_dicts(original, patch):
    for key, value in patch.items():
        if key in original:
            original[key] = strategic_merge(original[key], value)
        else:
            original[key] = value
    return original


class InnoDBClusterObjectModifier:
    def __init__(self, cluster: InnoDBCluster, logger: Logger):
        self.server_sts_patch = {}
        self.sts_changed = False
        self.sts_template_changed = False
        self.deploy_changed = False
        self.router_deploy_patch = {}
        self.cluster = cluster
        self.logger = logger
        self.commands: list[ApiCommand] = []
        self.sts = self.cluster.get_stateful_set()
        self.sts.spec = spec_to_dict(self.sts.spec)
        self.sts_spec_changed = False

    def _apply_server_sts_patch_to_sts_spec_if_needed(self):
        if len(self.server_sts_patch):
            # update with accumulated patches before overwriting
            self.logger.info(f"Applying accumulated patches {self.server_sts_patch['spec']} to sts.spec")
            utils.merge_patch_object(self.sts.spec, self.server_sts_patch["spec"], none_deletes=True)
            self.sts_spec_changed = True
            self.server_sts_patch = {}

    def _get_or_patch_sts_path(self, path: str, patch: Optional[dict] = None):
        self.logger.info(f"get_sts_path: patch_path={path}\n")
        # patches could be cached in self.server_sts_patch, so apply them, if any, before returning parts of self.sts.spec
        self._apply_server_sts_patch_to_sts_spec_if_needed()
        base = self.sts.spec
        # first is leading backslash, then is 'spec', so we skip
        path_elements = path.split("/")[2:]
        if len(path) > 1:
            for path_element in path_elements[0:-1]:
                #self.logger.info(f"{path_element} in base = {path_element in base}\n")
                assert path_element in base
                base = base[path_element]
            if patch is not None:
                base[path_elements[-1]] = patch
                self.logger.info(f"get_sts_path: after patching self.sts.spec={self.sts.spec}")
        return base[path_elements[-1]]

    def get_sts_path(self, path: str):
        return self._get_or_patch_sts_path(path, None)

    def patch_sts(self, patch: dict) -> None:
        self.sts_changed = True
        if "template" in patch:
            self.sts_template_changed = True
        self.logger.info(f"Accumulating patch={patch}\n")
        # cache the patches without merging into self.sts.spec
        # in case there is no call to patch_sts_overwrite then we won't "replace"
        # the existing sts object but "patch" it
        # if an sts_overwrite happens, we have to apply the patches to the self.sts.spec before overwriting
        utils.merge_patch_object(self.server_sts_patch, patch, none_deletes=True)

    def patch_sts_overwrite(self, patch: dict, patch_path: str) -> None:
        self.sts_changed = True
        if "template" in patch:
            self.sts_template_changed = True

        self.sts_spec_changed = True
        self._get_or_patch_sts_path(patch_path, patch)
        return

        if len(self.server_sts_patch):
            # update with accumulated patches before overwriting
            self.logger.info(f"Applying accumulated patches before applying overwriting patch patch={self.server_sts_patch['spec']}")
            #self.logger.info(f"STS.spec before apply={self.sts.spec}")
            utils.merge_patch_object(self.sts.spec, self.server_sts_patch["spec"], none_deletes=True)
            #self.logger.info(f"STS.spec after  apply={self.sts.spec}")
            self.server_sts_patch = {}

        #self.logger.info(f"patch_sts_overwrite: patch_path={patch_path} patch={patch}\n")

        base = self.sts.spec
        # first is leading backslash, then is spec, so we skip
        patch_path_elements = patch_path.split("/")[2:]
        if len(patch_path) > 1:
            for patch_path_element in patch_path_elements[0:-1]:
                #self.logger.info(f"{patch_path_element} in base = {patch_path_element in base}")
                assert patch_path_element in base
                base = base[patch_path_element]
            #  base[]
            #self.logger.info(f"\nExchanging {base[patch_path_elements[-1]]} \nwith\n{patch}")
            base[patch_path_elements[-1]] = patch
            self.logger.info(f"\n\nself.sts.spec={self.sts.spec}\n\n")


    def patch_deploy(self, patch: dict) -> None:
        self.deploy_changed = True
        self.logger.info(f"patch={patch}")
        utils.merge_patch_object(self.router_deploy_patch, patch, none_deletes=True)

    def create_configmap(self, namespace: str, name: str, body: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None:
        self.commands.append(ApiCommand(ApiCommandType.CREATE_CM, namespace, name, body, on_api_exception))

    def delete_configmap(self, namespace: str, name: str, on_api_exception: Optional[OnApiExceptionHandler]) -> None:
        self.commands.append(ApiCommand(ApiCommandType.DELETE_CM, namespace, name, None, on_api_exception))

    def replace_configmap(self, namespace: str, name: str, body: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None:
        self.commands.append(ApiCommand(ApiCommandType.REPLACE_CM, namespace, name, body, on_api_exception))

    def patch_configmap(self, namespace: str, name: str, patch: dict, on_api_exception: Optional[OnApiExceptionHandler]) -> None:
        self.commands.append(ApiCommand(ApiCommandType.PATCH_CM, namespace, name, patch, on_api_exception))

    def submit_patches(self) -> None:
        self.logger.info(f"InnoDBClusterObjectModifier::submit_patches sts_changed={self.sts_changed} sts_spec_changed={self.sts_spec_changed} len(router_deploy_patch)={len(self.router_deploy_patch)} len(commands)={len(self.commands)}")
        if (self.sts_changed or len(self.router_deploy_patch) or len(self.commands)):
              if len(self.commands):
                  for command in self.commands:
                      command.run(self.logger)
              if self.sts_changed:
                  if self.sts_spec_changed:
                      # this should apply server_sts_patch over self.sts.spec and empty self.server_sts_patch
                      # in the next step we will `replace` the STS and not `patch` it
                      # Only if the self.sts.spec is not touched should be server_sts_patch be applied, as otherwise
                      # changes to self.sts.spec will be skipped/forgotten
                      self._apply_server_sts_patch_to_sts_spec_if_needed()

                  if len(self.server_sts_patch):
                      self.logger.info(f"Patching STS.spec with {self.server_sts_patch}")
                      if self.sts_template_changed:
                          restart_patch = {"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":utils.isotime()}}}}}
                          utils.merge_patch_object(self.server_sts_patch, restart_patch)
                      api_apps.patch_namespaced_stateful_set(self.sts.metadata.name, self.sts.metadata.namespace, body=self.server_sts_patch)
                      self.server_sts_patch = {}
                  else:
                      self.logger.info(f"Replacing STS.spec with {self.sts.spec}")
                      # only if template has been changed. It could be that only scale up/down (spec['replica'] changed) has happened
                      # in that case if we set an annotation in the template the whole STS will be rolled over and this is not
                      # what is wanted. If replica count is increased only new pods are needed, respectively when replica is decreased.
                      # if replica is up and we set the annotation actually what will happen is rollover update and then scale up - we disturb the cluster
                      # (and tests will fail)
                      if self.sts_template_changed:
                          if not "annotations" in self.sts.spec["template"]["metadata"] or self.sts.spec["template"]["metadata"]["annotations"] is None:
                              self.sts.spec["template"]["metadata"]["annotations"] = {}
                          self.sts.spec["template"]["metadata"]["annotations"]["kubectl.kubernetes.io/restartedAt"] = utils.isotime()
                      api_apps.replace_namespaced_stateful_set(self.sts.metadata.name, self.sts.metadata.namespace, body=self.sts)
              if len(self.router_deploy_patch) and (deploy:= self.cluster.get_router_deployment()):
                  self.logger.info(f"Patching Deployment with {self.router_deploy_patch}")
                  router_objects.update_deployment_spec(deploy, self.router_deploy_patch)
