mysqloperator/controller/innodbcluster/operator_cluster.py (756 lines of code) (raw):
# Copyright (c) 2020, 2024, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
from typing import Any, List, Optional, Callable
from kopf._cogs.structs.bodies import Body
from kubernetes.client.rest import ApiException
from mysqloperator.controller.api_utils import ApiSpecError
from .. import consts, kubeutils, config, utils, errors, diagnose
from .. import shellutils
from ..group_monitor import g_group_monitor
from ..utils import g_ephemeral_pod_state
from ..kubeutils import api_core, api_apps, api_policy, api_rbac, api_customobj, api_batch, k8s_version
from ..backup import backup_objects
from ..config import DEFAULT_OPERATOR_VERSION_TAG
from .cluster_controller import ClusterController, ClusterMutex
from . import cluster_objects, router_objects, cluster_api
from .cluster_api import InnoDBCluster, InnoDBClusterSpec, MySQLPod, get_all_clusters
import kopf
import mysqlsh
import json
from logging import Logger
import time
import traceback
# TODO check whether we should store versions in status to make upgrade easier
def on_group_view_change(cluster: InnoDBCluster, members: list[tuple], view_id_changed: bool) -> None:
"""
Triggered from the GroupMonitor whenever the membership view changes.
This handler should react to changes that wouldn't be noticed by regular
pod and cluster events.
It also updates cluster status in the pods and cluster objects.
"""
c = ClusterController(cluster)
c.on_group_view_change(members, view_id_changed)
def monitor_existing_clusters(clusters: List[InnoDBCluster], logger: Logger) -> None:
for cluster in clusters:
if cluster.get_create_time():
g_group_monitor.monitor_cluster(
cluster, on_group_view_change, logger)
def ensure_backup_schedules_use_current_image(clusters: List[InnoDBCluster], logger: Logger) -> None:
for cluster in clusters:
try:
backup_objects.ensure_schedules_use_current_image(cluster.parsed_spec, logger)
except Exception as exc:
# In case of any error we report but continue
logger.warn(f"Error while ensuring {cluster.namespace}/{cluster.name} uses current operator version for scheduled backups: {exc}")
def ensure_router_accounts_are_uptodate(clusters: List[InnoDBCluster], logger: Logger) -> None:
for cluster in clusters:
router_objects.update_router_account(cluster,
lambda: logger.warning(f"Cluster {cluster.namespace}/{cluster.name} unreachable"),
logger)
def ignore_404(f) -> Any:
try:
return f()
except ApiException as e:
if e.status == 404:
return None
raise
def do_create_read_replica(cluster: InnoDBCluster, rr: cluster_objects.ReadReplicaSpec,
set_replicas_to_zero: bool,
indention: str, logger: Logger) -> None:
namespace = cluster.namespace
print(f"{indention}Components ConfigMaps and Secrets")
for cm in cluster_objects.prepare_component_config_configmaps(rr, logger):
if not cluster.get_configmap(cm['metadata']['name']):
print(f"{indention}\tCreating CM {cm['metadata']['name']} ...")
kopf.adopt(cm)
api_core.create_namespaced_config_map(namespace, cm)
for secret in cluster_objects.prepare_component_config_secrets(rr, logger):
if not cluster.get_secret(secret['metadata']['name']):
print(f"{indention}\tCreating Secret {secret['metadata']['name']} ...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace, secret)
print(f"{indention}Initconf")
if not ignore_404(lambda: cluster.get_initconf(rr)):
print(f"{indention}\tPreparing... {rr.name}")
configs = cluster_objects.prepare_initconf(cluster, rr, logger)
print(f"{indention}\tCreating...")
kopf.adopt(configs)
api_core.create_namespaced_config_map(namespace, configs)
print(f"{indention}RR ServiceAccount")
existing_sa = ignore_404(lambda: cluster.get_service_account_sidecar(rr))
print(f"{indention}\tExisting SA: {existing_sa}")
print(f"{indention}\tImagePullSecrets: {rr.imagePullSecrets}")
if not existing_sa:
print(f"{indention}\tPreparing...")
sa = cluster_objects.prepare_service_account_sidecar(rr)
print(f"{indention}\tCreating...{sa}")
kopf.adopt(sa)
api_core.create_namespaced_service_account(namespace=namespace, body=sa)
elif rr.imagePullSecrets:
patch = cluster_objects.prepare_service_account_patch_for_image_pull_secrets(rr)
print(f"{indention}\tPatching existing SA with {patch}")
api_core.patch_namespaced_service_account(name=existing_sa.metadata.name, namespace=namespace, body=patch)
print(f"{indention}RR RoleBinding Sidecar")
if not ignore_404(lambda: cluster.get_role_binding_sidecar(rr)):
print(f"{indention}\tPreparing...")
rb = cluster_objects.prepare_role_binding_sidecar(rr)
print(f"{indention}\tCreating RoleBinding {rb['metadata']['name']} {rb}...")
kopf.adopt(rb)
api_rbac.create_namespaced_role_binding(namespace=namespace, body=rb)
print(f"{indention}RR Service")
if not ignore_404(lambda: cluster.get_read_replica_service(rr)):
print(f"{indention}\tPreparing... {rr.name} Service")
service = cluster_objects.prepare_cluster_service(rr, logger)
print(f"{indention}\tCreating...{service}")
kopf.adopt(service)
api_core.create_namespaced_service(namespace=namespace, body=service)
print(f"{indention}RR STS")
if not ignore_404(lambda: cluster.get_read_replica_stateful_set(rr.name)):
print(f"{indention}\tPreparing {rr.name} StatefulSet")
statefulset = cluster_objects.prepare_cluster_stateful_set(cluster, rr, logger)
if set_replicas_to_zero:
# This is initial startup where scaling the read reaplica is delayed
# till the clsuter is read
statefulset['spec']['replicas'] = 0
print(f"{indention}\tCreating...{statefulset}")
kopf.adopt(statefulset)
api_apps.create_namespaced_stateful_set(namespace=namespace, body=statefulset)
def do_reconcile_read_replica(cluster: InnoDBCluster,
rr: cluster_objects.ReadReplicaSpec,
logger: Logger) -> None:
statefulset = cluster_objects.prepare_cluster_stateful_set(cluster, rr, logger)
kopf.adopt(statefulset)
api_apps.patch_namespaced_stateful_set(namespace=cluster.namespace,
name=rr.name,
body=statefulset)
@kopf.on.create(consts.GROUP, consts.VERSION,
consts.INNODBCLUSTER_PLURAL) # type: ignore
def on_innodbcluster_create(name: str, namespace: Optional[str], body: Body,
logger: Logger, **kwargs) -> None:
logger.info(
f"Initializing InnoDB Cluster name={name} namespace={namespace} on K8s {k8s_version()}")
cluster = InnoDBCluster(body)
# TODO: If we set the status here it will be emptied for unknown reasons later
# and hide other later set status (i.e. when using an invalid spec.version)
#
#cluster.set_status({
# "cluster": {
# "status": diagnose.ClusterDiagStatus.INITIALIZING.value,
# "onlineInstances": 0,
# "lastProbeTime": utils.isotime()
# }})
try:
cluster.parse_spec()
cluster.parsed_spec.validate(logger)
except ApiSpecError as e:
cluster.set_status({
"cluster": {
"status": diagnose.ClusterDiagStatus.INVALID.value,
"onlineInstances": 0,
"lastProbeTime": utils.isotime()
}})
cluster.error(action="CreateCluster",
reason="InvalidArgument", message=str(e))
raise kopf.TemporaryError(f"Error in InnoDBCluster spec: {e}")
icspec = cluster.parsed_spec
#print(f"Default operator IC edition: {config.MYSQL_OPERATOR_DEFAULT_IC_EDITION} Edition")
cluster.log_cluster_info(logger)
cluster.update_cluster_fqdn()
if not cluster.ready:
try:
print("0. Components ConfigMaps and Secrets")
for cm in cluster_objects.prepare_component_config_configmaps(icspec, logger):
if not cluster.get_configmap(cm['metadata']['name']):
print(f"\tCreating CM {cm['metadata']['name']} ...")
kopf.adopt(cm)
api_core.create_namespaced_config_map(namespace, cm)
for secret in cluster_objects.prepare_component_config_secrets(icspec, logger):
if not cluster.get_secret(secret['metadata']['name']):
print(f"\tCreating Secret {secret['metadata']['name']} ...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace, secret)
print("0.5. Additional ConfigMaps")
for cm in cluster_objects.prepare_additional_configmaps(icspec, logger):
if not cluster.get_configmap(cm['metadata']['name']):
print(f"\tCreating CM {cm['metadata']['name']} ...")
kopf.adopt(cm)
api_core.create_namespaced_config_map(namespace, cm)
print("1. Initial Configuration ConfigMap and Container Probes")
if not ignore_404(lambda: cluster.get_initconf(icspec)):
print("\tPreparing...")
configs = cluster_objects.prepare_initconf(cluster, icspec, logger)
print("\tCreating...")
kopf.adopt(configs)
api_core.create_namespaced_config_map(namespace, configs)
print("2. Cluster Accounts")
if not ignore_404(cluster.get_private_secrets):
print("\tPreparing...")
secret = cluster_objects.prepare_secrets(icspec)
print("\tCreating...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("3. Router Accounts")
if not ignore_404(cluster.get_router_account):
print("\tPreparing...")
secret = router_objects.prepare_router_secrets(icspec)
print("\tCreating...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("4. Cluster Service")
if not ignore_404(cluster.get_service):
print("\tPreparing...")
service = cluster_objects.prepare_cluster_service(icspec, logger)
print(f"\tCreating Service {service['metadata']['name']}...{service}")
kopf.adopt(service)
api_core.create_namespaced_service(namespace=namespace, body=service)
print("5. Sidecar ServiceAccount")
existing_sa = ignore_404(lambda: cluster.get_service_account_sidecar(icspec))
print(f"\tExisting SA: {existing_sa}")
print(f"\tImagePullSecrets: {icspec.imagePullSecrets}")
if not existing_sa:
print("\tPreparing...")
sa = cluster_objects.prepare_service_account_sidecar(icspec)
print(f"\tCreating...{sa}")
kopf.adopt(sa)
api_core.create_namespaced_service_account(namespace=namespace, body=sa)
elif icspec.imagePullSecrets:
patch = cluster_objects.prepare_service_account_patch_for_image_pull_secrets(icspec)
print(f"\tPatching existing SA with {patch}")
api_core.patch_namespaced_service_account(name=existing_sa.metadata.name, namespace=namespace, body=patch)
print("6. Switchover ServiceAccount")
if not ignore_404(lambda: cluster.get_service_account_switchover(icspec)):
print("\tPreparing...")
sa = cluster_objects.prepare_service_account_switchover(icspec)
print(f"\tCreating...{sa}")
kopf.adopt(sa)
api_core.create_namespaced_service_account(namespace=namespace, body=sa)
print("7. Sidecar RoleBinding ")
if not ignore_404(lambda: cluster.get_role_binding_sidecar(icspec)):
print("\tPreparing...")
rb = cluster_objects.prepare_role_binding_sidecar(icspec)
print(f"\tCreating RoleBinding {rb['metadata']['name']} ...")
kopf.adopt(rb)
api_rbac.create_namespaced_role_binding(namespace=namespace, body=rb)
print("8. Switchover RoleBinding")
if not ignore_404(lambda: cluster.get_role_binding_switchover(icspec)):
print("\tPreparing...")
rb = cluster_objects.prepare_role_binding_switchover(icspec)
print(f"\tCreating RoleBinding {rb['metadata']['name']} ...")
kopf.adopt(rb)
api_rbac.create_namespaced_role_binding(namespace=namespace, body=rb)
print("9. Cluster StatefulSet")
if not ignore_404(cluster.get_stateful_set):
print("\tPreparing...")
statefulset = cluster_objects.prepare_cluster_stateful_set(cluster, icspec, logger)
print(f"\tCreating...{statefulset}")
kopf.adopt(statefulset)
api_apps.create_namespaced_stateful_set(namespace=namespace, body=statefulset)
print("10. Cluster PodDisruptionBudget")
if not ignore_404(cluster.get_disruption_budget):
print("\tPreparing...")
disruption_budget = cluster_objects.prepare_cluster_pod_disruption_budget(icspec)
print("\tCreating...")
kopf.adopt(disruption_budget)
api_policy.create_namespaced_pod_disruption_budget(namespace=namespace, body=disruption_budget)
print("11. Read Replica StatefulSets")
if len(icspec.readReplicas) > 0:
print(f"\t{len(icspec.readReplicas)} Read Replica STS ...")
for rr in icspec.readReplicas:
do_create_read_replica(cluster, rr, True, "\t\t", logger)
else:
print("\tNo Read Replica")
print("12. Router Service")
if not ignore_404(cluster.get_router_service):
print("\tPreparing...")
router_service = router_objects.prepare_router_service(icspec)
print("\tCreating...")
kopf.adopt(router_service)
api_core.create_namespaced_service(namespace=namespace, body=router_service)
print("13. Router Deployment")
if not ignore_404(cluster.get_router_deployment):
if icspec.router.instances > 0:
print("\tPreparing...")
# This will create the deployment but 0 instances. When the cluster is created (first
# instance joins it) the instance count will be set to icspec.router.instances
router_deployment = router_objects.prepare_router_deployment(cluster, logger, init_only=True)
print(f"\tCreating...{router_deployment}")
kopf.adopt(router_deployment)
api_apps.create_namespaced_deployment(namespace=namespace, body=router_deployment)
else:
# If the user decides to set !0 routers, the routine that handles that that
# will create the deployment
print("\tRouter count is 0. No Deployment is created.")
print("14. Backup Secrets")
if not ignore_404(cluster.get_backup_account):
print("\tPreparing...")
secrets = backup_objects.prepare_backup_secrets(icspec)
print("\tCreating...")
for secret in secrets:
print("\t\t", secret["metadata"]["name"])
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("15. Service Monitors")
monitors = cluster_objects.prepare_metrics_service_monitors(cluster.parsed_spec, logger)
if len(monitors) == 0:
print("\tNone requested")
for monitor in monitors:
if not ignore_404(lambda: cluster.get_service_monitor(monitor['metadata']['name'])):
print(f"\tCreating ServiceMonitor {monitor} ...")
kopf.adopt(monitor)
try:
api_customobj.create_namespaced_custom_object(
"monitoring.coreos.com", "v1", cluster.namespace,
"servicemonitors", monitor)
except Exception as exc:
# This might be caused by Prometheus Operator missing
# we won't fail for that
print(f"\tServiceMonitor {monitor['metadata']['name']} NOT created!")
print(exc)
cluster.warn(action="CreateCluster", reason="CreateResourceFailed", message=f"{exc}")
except Exception as exc:
cluster.warn(action="CreateCluster", reason="CreateResourceFailed",
message=f"{exc}")
raise
print(f"13. Setting operator version for the IC to {DEFAULT_OPERATOR_VERSION_TAG}")
cluster.set_operator_version(DEFAULT_OPERATOR_VERSION_TAG)
cluster.info(action="CreateCluster", reason="ResourcesCreated",
message="Dependency resources created, switching status to PENDING")
cluster.set_status({
"cluster": {
"status": diagnose.ClusterDiagStatus.PENDING.value,
"onlineInstances": 0,
"type": diagnose.ClusterInClusterSetType.PRIMARY.value if (not icspec.initDB or not icspec.initDB.cluster_set) else diagnose.ClusterInClusterSetType.REPLICA_CANDIDATE.value,
"lastProbeTime": utils.isotime()
}})
@kopf.on.delete(consts.GROUP, consts.VERSION,
consts.INNODBCLUSTER_PLURAL) # type: ignore
def on_innodbcluster_delete(name: str, namespace: str, body: Body,
logger: Logger, **kwargs):
cluster = InnoDBCluster(body)
logger.info(f"Deleting cluster {name}")
g_group_monitor.remove_cluster(cluster)
# Notify routers for deletion. It will be too late in on_router_pod_delete.
# Do this first before getting STS to 0, which will kill the servers and the metadata can't be updated then
# At last scale down the clusters. If we scale down the Router Deployment to 0, should do the same but is async.
# This async process is a problem, as we need it to be finished before scaling down the STS to 0 / removing the
# cluster from the clusterset
# So, practically we duplicate on_router_pod_delete() here
routers = cluster.get_routers()
if routers:
logger.info(f"Time to notify router(s) {routers} for IC deletion")
controller = ClusterController(cluster)
try:
controller.on_router_pod_delete(routers, logger)
except Exception as exc:
# Ignore errors, there isn't much we could do
# and there is no point in retrying forever
logger.warning(f"on_innodbcluster_delete: Failed to remove metadata for {routers}: {exc}")
#print(traceback.format_exc())
logger.warning("on_innodbcluster_delete: Exception ignored, there might be stale metadata left")
# Scale down the cluster to 0
sts = cluster.get_stateful_set()
if sts:
pods = cluster.get_pods()
# First we need to check if there is only one pod there and whether it is being deleted
# In case it is being deleted on_pod_delete() won't be called when we scale down the STS to 0
# In this case the code that calls cluster finalizer removal won't be called too and the
# cluster finalizer will stay hanging
# If we check after scaling down to 0, and there is only one pod, it will be moved to Terminating
# state and we won't know whether it was in Terminating beforehand. If it wasn't then
# on_pod_delete() will be called and we will try to remove the finalizer again663/385000on_spec
# then len(pods) == maxUnavailable and all pods should be inspected whether they are terminating
if len(pods) == 1 and pods[0].deleting:
# if there is only one pod and it is deleting then on_pod_delete() won't be called
# in this case the IC finalizer won't be removed and the IC will hang
logger.info("on_innodbcluster_delete: The cluster's only one pod is already deleting. Removing cluster finalizer here")
cluster.remove_cluster_finalizer()
if len(pods):
# TODO: this should be moved to controller or elsewhere
# TODO: try more pods, if one fails and more are avaialble
# TODO: if this is PRIMARY we got to do something ... maybe force a failover?
# TODO: this shouldn't block decomission (catch and log/ignore errors)
# TODO: remove admin/backup/metrics/router/... accounts as far as they are replicated to primary
with shellutils.DbaWrap(shellutils.connect_to_pod_dba(pods[0], logger)) as dba:
try:
cluster_status = dba.get_cluster().status({"extended": 1})
if "clusterRole" in cluster_status:
logger.info("9.3.0+ cluster, ClusterSet enabled")
my_name = dba.get_cluster().name
cs = dba.get_cluster_set()
cs_status = cs.status(extended=1)
logger.info(f"CSet={json.dumps(cs_status, indent=4)}")
if cs_status["clusters"][my_name]["clusterRole"] == "PRIMARY" and len(cs_status["clusters"]) > 1:
#raise kopf.TemporaryError(f"Cluster {my_name} is PRIMARY. Can not remove, trigger a failover first!")
# Check if all REPLICAS are still there, if not there / stale, remove them
invalidated = 0
ok = {}
for cluster_name, cluster_data in cs_status["clusters"].items():
if cluster_data["clusterRole"] == "REPLICA":
if cluster_data["globalStatus"] == "INVALIDATED" and cluster_data["status"] == "UNREACHABLE":
invalidated = invalidated + 1
else:
# we can also throw here directly on first occurence, but let's just collect some data for the exception message
ok[cluster_name] = cluster_data
# Without the primary
if (len(cs_status["clusters"]) - 1) != invalidated:
raise kopf.TemporaryError(f"Cluster {my_name} is PRIMARY. Can not remove, trigger a failover first! The following replicas seem to be ok {json.dumps(ok, indent=4)}")
# else this is the only cluster in the clusterset and we are fine
for cluster_name in cs_status["clusters"].keys():
logger.info(f"Removing INVALIDATED and UNREACHABLE cluster {cluster_name} from the cluster")
cs.remove_cluster(cluster_name, {"force": True})
cs.remove_cluster(my_name)
else:
logger.info("pre 9.3.0 cluster, not ClusterSet enabled")
except mysqlsh.Error as exc:
# For whatever reaon we fail: this shouldn't stop us from
# decomissioning our pods. Even if not unregistered.
# TODO: maybe the only reason might be if this were the
# primary cluster, while other clusters exist ...
# but a) that is a user error and b) there shouldn't
# be an exception .. but let's keep an eye on it
logger.error(f"Error while trying to check ClusterSet status for unregistering: {exc}")
logger.info(f"Updating InnoDB Cluster StatefulSet.instances to 0")
cluster_objects.update_stateful_set_spec(sts, {"spec": {"replicas": 0}})
# Scale down routers to 0
logger.info(f"Updating Router Deployment.replicas to 0")
router_objects.update_size(cluster, 0, False, logger)
# TODO add a busy state and prevent changes while on it
def on_innodbcluster_field_instances(old, new, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
patcher.patch_sts({
"spec": {
"replicas": new
}
})
# No need for kopf.on.field. This is called by on_spec()
def on_innodbcluster_field_version(old, new, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
logger: Logger, **kwargs):
logger.info("on_innodbcluster_field_version")
sts = cluster.get_stateful_set()
if sts:
logger.info(f"Propagating spec.version={new} for {cluster.namespace}/{cluster.name} (was {old})")
try:
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_router_upgrade(logger)
cluster_ctl.on_server_version_change(new)
except:
# revert version in the spec
raise
# should not be earlier, as on_server_version_change() checks also for the version and raises
# a PermanentError while validate() raises ApiSpecError which is turned by Kopf to a TemporaryError
# spec.version requires this special handling
cluster.parsed_spec.validate(logger)
cluster_objects.update_mysql_image(sts, cluster, cluster.parsed_spec, patcher, logger)
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_router_image(router_deploy, cluster.parsed_spec, patcher, logger)
# No need for kopf.on.field. This is called by on_spec()
def on_innodbcluster_field_image_repository(old, new, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
logger: Logger, **kwargs):
sts = cluster.get_stateful_set()
if sts:
logger.info(f"Propagating spec.imageRepository={new} for {cluster.namespace}/{cluster.name} (was {old})")
try:
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_router_upgrade(logger)
except:
# revert version in the spec
raise
cluster.parsed_spec.validate(logger)
cluster_objects.update_mysql_image(sts, cluster, cluster.parsed_spec, patcher, logger)
cluster_objects.update_operator_image(sts, cluster.parsed_spec)
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_router_image(router_deploy, cluster.parsed_spec, patcher, logger)
def on_innodbcluster_field_image_pull_policy(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
sts = cluster.get_stateful_set()
patcher.patch_sts(cluster_objects.update_pull_policy(sts, cluster.parsed_spec, logger))
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_pull_policy(router_deploy, cluster.parsed_spec, patcher, logger)
# No need for kopf.on.field. This is called by on_spec()
def on_innodbcluster_field_image(old, new, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
logger: Logger, **kwargs):
# TODO - identify what cluster statuses should allow this change
sts = cluster.get_stateful_set()
if sts:
logger.info( f"Propagating spec.image={new} for {cluster.namespace}/{cluster.name} (was {old})")
try:
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_server_image_change(new)
except:
# revert version in the spec
raise
cluster.parsed_spec.validate(logger)
cluster_objects.update_mysql_image(sts, cluster, cluster.parsed_spec, patcher, logger)
def on_innodbcluster_field_router_instances(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
patcher.patch_deploy(router_objects.update_size(cluster, new, True, logger))
def on_innodbcluster_field_router_version(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
try:
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_router_upgrade(logger)
except:
# revert version in the spec
raise
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_router_image(router_deploy, cluster.parsed_spec, patcher, logger)
def on_innodbcluster_field_router_bootstrap_options(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_bootstrap_options(router_deploy, cluster, patcher, logger)
def on_innodbcluster_field_router_container_options(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
router_deploy = cluster.get_router_deployment()
if router_deploy:
router_objects.update_options(router_deploy, cluster.parsed_spec, patcher, logger)
# on_innodbcluster_field_router_options is safe to no go thru on_spec() as this method neither touches the STS nor the Deploy
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec.router.routingOptions") # type: ignore
def on_innodbcluster_field_router_options(old: dict, new: dict, body: Body,
logger: Logger, **kwargs):
if old == new:
return
cluster = InnoDBCluster(body)
# ignore spec changes if the cluster is still being initialized
if not cluster.get_create_time():
logger.debug(
"Ignoring spec.router.routingOptions change for unready cluster")
return
cluster.parsed_spec.validate(logger)
with ClusterMutex(cluster):
if old is None:
old = {}
if new is None:
new = {}
c = ClusterController(cluster)
c.on_router_routing_option_chahnge(old, new, logger)
# on_innodbcluster_field_backup_schedules is safe to no go thru on_spec() as this method neither touches the STS nor the Deploy
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec.backupSchedules") # type: ignore
def on_innodbcluster_field_backup_schedules(old: str, new: str, body: Body,
logger: Logger, **kwargs):
if old == new:
return
logger.info("on_innodbcluster_field_backup_schedules")
cluster = InnoDBCluster(body)
# Ignore spec changes if the cluster is still being initialized
# This handler will be called even when the cluster is being initialized as the
# `old` value will be None and the `new` value will be the schedules that the cluster has.
# This makes it possible to create them here and not in on_innodbcluster_create().
# There in on_innodbcluster_create(), only the objects which are critical for the creation
# of the server should be created.
# After the cluster is ready we will add the schedules. This also allows to have the schedules
# created (especially when `enabled`) after the cluster has been created, solving issues with
# cron job not bein called or cron jobs being created as suspended and then when the cluster is
# running to be enabled again - which would end to be a 2-step process.
# The cluster is created after the first instance is up and running. Thus,
# don't need to take actions in post_create_actions() in the cluster controller
# but async await for Kopf to call again this handler.
if not cluster.get_create_time():
raise kopf.TemporaryError("The cluster is not ready. Will create the schedules once the first instance is up and running", delay=10)
cluster.parsed_spec.validate(logger)
with ClusterMutex(cluster):
backup_objects.update_schedules(cluster.parsed_spec, old, new, logger)
def on_sts_field_update(cluster: InnoDBCluster, field: str, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
cluster.parsed_spec.validate(logger)
patcher.patch_sts(cluster_objects.prepare_cluster_stateful_set(cluster, cluster.parsed_spec, logger))
def on_innodbcluster_field_tls_use_self_signed(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
logger.info("on_innodbcluster_field_tls_use_self_signed")
return on_sts_field_update(cluster, "spec.tlsUseSelfSigned", patcher, logger)
def on_innodbcluster_field_tls_secret_name(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
logger.info("on_innodbcluster_field_tls_secret_name")
return on_sts_field_update(cluster, "spec.tlsSecretName", patcher, logger)
def on_innodbcluster_field_router_tls_secret_name(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
logger.info("on_innodbcluster_field_router_tls_secret_name")
return on_sts_field_update(cluster, "spec.router.tlsSecretName", patcher, logger)
def on_innodbcluster_field_tls_ca_secret_name(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
logger.info("on_innodbcluster_field_tls_ca_secret_name")
return on_sts_field_update(cluster, "spec.tlsCASecretName", patcher, logger)
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec.readReplicas") # type: ignore
def on_innodbcluster_read_replicas_changed(old: dict, new: dict, body: Body,
logger: Logger, **kwargs):
logger.info("on_innodbcluster_read_replicas_changed")
if old == new:
return
cluster = InnoDBCluster(body)
if not cluster.get_create_time():
raise kopf.TemporaryError("The cluster is not ready. Will retry", delay=30)
cluster.parsed_spec.validate(logger)
if old is None:
old = []
if new is None:
new = []
with ClusterMutex(cluster):
# Remove read replica sets which were removed
for rr in old:
if rr['name'] not in map(lambda nrr: nrr['name'], new):
cluster_objects.remove_read_replica(cluster, rr)
# Add or update read replica sets
for rr in new:
old_rr = next(filter(lambda orr: orr['name'] == rr['name'], old), None)
rrspec = cluster.parsed_spec.get_read_replica(rr['name'])
if rrspec is None:
# This should never happen except maybe a very short race
# when user adds it and immediateyl removes or in a retry
# loop. But in all those cases its removed after adding,
# thus not creating is fine
logger.warn(f"Could not find Spec for ReadReplica {rr['name']} in InnoDBCluster")
continue
if old_rr == rr:
# no change
pass
elif old_rr:
# Old Read Replica -> Update
do_reconcile_read_replica(cluster, rrspec, logger)
else:
# New Read Replica -> Create it
do_create_read_replica(cluster, rrspec, False, "", logger)
@kopf.on.create("", "v1", "pods",
labels={"component": "mysqld"}) # type: ignore
def on_pod_create(body: Body, logger: Logger, **kwargs):
"""
Handle MySQL server Pod creation, which can happen when:
- cluster is being first created
- cluster is being scaled up (more members added)
"""
# TODO ensure that the pod is owned by us
pod = MySQLPod.from_json(body)
# check general assumption
assert not pod.deleting
print(f"on_pod_create: pod={pod.name} ContainersReady={pod.check_condition('ContainersReady')} Ready={pod.check_condition('Ready')} gate[configured]={pod.get_member_readiness_gate('configured')}")
configured = pod.get_member_readiness_gate("configured")
if not configured:
# TODO add extra diagnostics about why the pod is not ready yet, for
# example, unbound volume claims, initconf not finished etc
raise kopf.TemporaryError(f"Sidecar of {pod.name} is not yet configured", delay=30)
# If we are here all containers have started. This means, that if we are initializing
# the database from a donor (cloning) the sidecar has already started a seed instance
# and cloned from the donor into it (see initdb.py::start_clone_seed_pod())
cluster = pod.get_cluster()
assert cluster
logger.info(f"on_pod_create: cluster create time {cluster.get_create_time()}")
with ClusterMutex(cluster, pod):
first_pod = pod.index == 0 and not cluster.get_create_time()
if first_pod:
print("on_pod_create: first pod created")
cluster_objects.on_first_cluster_pod_created(cluster, logger)
g_group_monitor.monitor_cluster(
cluster, on_group_view_change, logger)
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_pod_created(pod, logger)
# Remember how many restarts happened as of now
g_ephemeral_pod_state.set(pod, "mysql-restarts", pod.get_container_restarts("mysql"), context="on_pod_create")
@kopf.on.event("", "v1", "pods",
labels={"component": "mysqld"}) # type: ignore
def on_pod_event(event, body: Body, logger: Logger, **kwargs):
"""
Handle low-level MySQL server pod events. The events we're interested in are:
- when a container restarts in a Pod (e.g. because of mysqld crash)
"""
# TODO ensure that the pod is owned by us
while True:
try:
pod = MySQLPod.from_json(body)
member_info = pod.get_membership_info()
ready = pod.check_containers_ready()
logger.debug(f"pod event: pod={pod.name} containers_ready={ready} deleting={pod.deleting} phase={pod.phase} member_info={member_info}")
if pod.phase != "Running" or pod.deleting or not member_info:
logger.info(f"ignored pod event")
return
mysql_restarts = pod.get_container_restarts("mysql")
event = ""
if g_ephemeral_pod_state.get(pod, "mysql-restarts") != mysql_restarts:
event = "mysql-restarted"
containers = [
f"{c.name}={'ready' if c.ready else 'not-ready'}" for c in pod.status.container_statuses]
conditions = [
f"{c.type}={c.status}" for c in pod.status.conditions]
logger.debug(f"POD EVENT {event}: pod={pod.name} containers_ready={ready} deleting={pod.deleting} phase={pod.phase} member_info={member_info} restarts={mysql_restarts} containers={containers} conditions={conditions}")
cluster = pod.get_cluster()
if not cluster:
logger.info(f"Ignoring event for pod {pod.name} belonging to a deleted cluster")
return
with ClusterMutex(cluster, pod):
cluster_ctl = ClusterController(cluster)
# Check if a container in the pod restarted
if ready and event == "mysql-restarted":
logger.info("Pod got restarted")
cluster_ctl.on_pod_restarted(pod, logger)
logger.info("Updating restart count")
g_ephemeral_pod_state.set(pod, "mysql-restarts", mysql_restarts, context="on_pod_event")
# Check if we should refresh the cluster status
status = cluster_ctl.probe_status_if_needed(pod, logger)
if status == diagnose.ClusterDiagStatus.UNKNOWN:
raise kopf.TemporaryError(
f"Cluster has unreachable members. status={status}", delay=15)
break
except kopf.TemporaryError as e:
# TODO review this
# Manually handle retries, the event handler isn't getting called again
# by kopf (maybe a bug or maybe we're using it wrong)
logger.info(f"{e}: retrying after {e.delay} seconds")
if e.delay:
time.sleep(e.delay)
continue
@kopf.on.delete("", "v1", "pods",
labels={"component": "mysqld"}) # type: ignore
def on_pod_delete(body: Body, logger: Logger, **kwargs):
"""
Handle MySQL server Pod deletion, which can happen when:
- cluster is being scaled down (members being removed)
- cluster is being deleted
- user deletes a pod by hand
"""
logger.info("on_pod_delete")
# TODO ensure that the pod is owned by us
pod = MySQLPod.from_json(body)
# check general assumption
assert pod.deleting
# removeInstance the pod
cluster = pod.get_cluster()
if cluster:
with ClusterMutex(cluster, pod):
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_pod_deleted(pod, body, logger)
if pod.index == 0 and cluster.deleting:
print("Last cluster pod removed being removed!")
cluster_objects.on_last_cluster_pod_removed(cluster, logger)
else:
pod.remove_member_finalizer(body)
logger.error(f"Owner cluster for {pod.name} does not exist anymore")
# An example of a `when` hook for finding secrets belonging to a IC
#
#def secret_belongs_to_a_cluster_checker(meta, namespace:str, name, logger: Logger, **_) -> bool:
# clusters = get_all_clusters(namespace)
# for cluster in clusters:
# if name in (cluster.parsed_spec.tlsCASecretName,
# cluster.parsed_spec.tlsSecretName,
# cluster.parsed_spec.router.tlsSecretName):
# return True
# return False
#
# Use like the following
#@kopf.on.create("", "v1", "secrets", when=secret_belongs_to_a_cluster_checker) # type: ignore
#@kopf.on.update("", "v1", "secrets", when=secret_belongs_to_a_cluster_checker) # type: ignore
#
DefaultValueLambda = Callable[[], Any]
OnFieldHandler = Callable[[dict, dict, Any, InnoDBCluster, cluster_objects.InnoDBClusterObjectModifier, Logger], None]
OnFieldHandlerList = list[tuple[str, DefaultValueLambda, OnFieldHandler]]
def compare_two_dicts_return_keys(d1: dict, d2: dict) -> list[str]:
return set([l for l, v in (set(d1.items()) ^ set(d2.items()))])
def on_pod_metadata_field_compare_and_generate_diff(old: dict, new: dict) -> Optional[dict]:
diff = compare_two_dicts_return_keys(old, new)
metadata = None
if len(diff):
metadata = {}
for label in diff:
# (new or changed value) | (deleted value)
metadata[label] = new[label] if label in new else None
return metadata
def on_pod_metadata_field(old: dict, new: dict, body: Body, what: str, cluster: InnoDBCluster, patcher: callable, logger: Logger) -> None:
values = on_pod_metadata_field_compare_and_generate_diff(old, new)
if values is not None:
patcher({"spec": {"template": {"metadata": {what : values}}}})
def on_server_pod_labels(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
on_pod_metadata_field(old, new, body, "labels", cluster, lambda patch: patcher.patch_sts(patch), logger)
def on_server_pod_annotations(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
on_pod_metadata_field(old, new, body, "annotations", cluster, lambda patch: patcher.patch_sts(patch), logger)
def on_router_pod_labels(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
on_pod_metadata_field(old, new, body, "labels", cluster, lambda patch: patcher.patch_deploy(patch), logger)
def on_router_pod_annotations(old: dict, new: dict, body: Body, cluster: InnoDBCluster, patcher: cluster_objects.InnoDBClusterObjectModifier, logger: Logger) -> None:
on_pod_metadata_field(old, new, body, "annotations", cluster, lambda patch: patcher.patch_deploy(patch), logger)
def on_innodbcluster_field_logs(old: str, new: str, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
logger: Logger):
cluster.parsed_spec.validate(logger)
cluster_objects.update_objects_for_logs(cluster, patcher, logger)
def on_innodbcluster_field_metrics(old: str, new: str, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
logger: Logger):
cluster.parsed_spec.validate(logger)
# We have to edit the user account first, else the server might go away
# whie we are trying to change the user
# if we want to allow custom usernames we'd have to delete old here
cluster_ctl = ClusterController(cluster)
cluster_ctl.on_change_metrics_user(logger)
cluster_objects.update_objects_for_metrics(cluster, patcher, logger)
def call_kopf_style_on_handler_if_needed(old_dict: dict, new_dict: dict, key: str, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
handler: callable,
logger: Logger) -> None:
old_value = old_dict[key] if key in old_dict else None
new_value = new_dict[key] if key in new_dict else None
if old_value != new_value:
handler(old_value, new_value, body, cluster, patcher, logger)
def change_between_old_and_new(old: dict, new: dict, key: str, default_value: DefaultValueLambda) -> tuple[Any, Any]:
if key in old:
if key in new:
if old[key] != new[key]:
# changed
return (old[key], new[key])
else:
return (old[key], default_value())
# deleted
pass
elif key in new:
# added
return (default_value(), new[key])
return (None, None)
spec_tld_handlers : OnFieldHandlerList = [\
("version", lambda: None, on_innodbcluster_field_version),
("image", lambda: None, on_innodbcluster_field_image),
("imageRepository",lambda: None, on_innodbcluster_field_image_repository),
("podLabels", lambda: {}, on_server_pod_labels),
("podAnnotations", lambda: {}, on_server_pod_annotations),
("instances", lambda: None, on_innodbcluster_field_instances),
("imagePullPolicy",lambda: None, on_innodbcluster_field_image_pull_policy),
("tlsUseSelfSigned",lambda: None,on_innodbcluster_field_tls_use_self_signed),
("tlsSecretName", lambda: None, on_innodbcluster_field_tls_secret_name),
("tlsCASecretName",lambda: None, on_innodbcluster_field_tls_ca_secret_name),
("logs", lambda: {}, on_innodbcluster_field_logs),
("metrics", lambda: {}, on_innodbcluster_field_metrics)
]
spec_router_handlers : OnFieldHandlerList = [\
("podLabels", lambda: {}, on_router_pod_labels),
("podAnnotations", lambda: {}, on_router_pod_annotations),
("instances", lambda: None, on_innodbcluster_field_router_instances),
("version", lambda: None, on_innodbcluster_field_router_version),
("options", lambda: {}, on_innodbcluster_field_router_container_options),
("bootstrapOptions",lambda: {}, on_innodbcluster_field_router_bootstrap_options),
("tlsSecretName", lambda: None, on_innodbcluster_field_router_tls_secret_name)
]
def handle_fields(old, new, body: Body,
cluster: InnoDBCluster,
patcher: cluster_objects.InnoDBClusterObjectModifier,
handlers: OnFieldHandlerList,
prefix: str, logger: Logger) -> None:
if old != new:
for cr_name, default_value, handler in handlers:
o, n = change_between_old_and_new(old, new, cr_name, default_value)
# (None, None) means no change
if (o, n) != (None, None):
logger.info(f"\tValue differs for {prefix}{cr_name}")
handler(o, n, body, cluster, patcher, logger)
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec") # type: ignore
def on_spec(body: Body, diff, old, new, logger: Logger, **kwargs):
logger.info("on_spec")
logger.info(f"old={old}")
logger.info(f"new={new}")
if not old:
# on IC object created, nothing to do here
logger.debug(f"on_spec: Old is empty")
return
cluster = InnoDBCluster(body)
if not cluster.ready:
# ignore spec changes if the cluster is still being initialized
logger.debug(f"on_spec: Ignoring on_spec change for unready cluster")
return
if not (sts:= cluster.get_stateful_set()):
logger.warning("STS doesn't exist yet. If this is a change during cluster start it might race and be lost")
return
patcher = cluster_objects.InnoDBClusterObjectModifier(cluster, logger)
# TODOA: Enable and test this
#cluster.parsed_spec.validate(logger)
handle_fields(old, new, body, cluster, patcher, spec_tld_handlers, "spec.", logger)
old_router, new_router = change_between_old_and_new(old, new, "router", lambda: {})
handle_fields(old_router, new_router, body, cluster, patcher, spec_router_handlers, "spec.router.", logger)
logger.info("Fields handled. Time to submit the patches to K8s API!")
# It's time to patch
with ClusterMutex(cluster):
patcher.submit_patches()
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec.instanceService") # type: ignore
def on_innodbcluster_field_service_type(old: str, new: str, body: Body,
logger: Logger, **kwargs):
if old == new:
return
cluster = InnoDBCluster(body)
with ClusterMutex(cluster):
svc = cluster.get_service()
cluster_objects.update_service(svc, cluster.parsed_spec, logger)
@kopf.on.field(consts.GROUP, consts.VERSION, consts.INNODBCLUSTER_PLURAL,
field="spec.service") # type: ignore
def on_innodbcluster_field_service_type(old: str, new: str, body: Body,
logger: Logger, **kwargs):
if old == new:
return
cluster = InnoDBCluster(body)
with ClusterMutex(cluster):
svc = cluster.get_router_service()
router_objects.update_service(svc, cluster.parsed_spec, logger)
@kopf.on.create(consts.GROUP, consts.VERSION,
"mysqlclustersetfailovers") # type: ignore
def on_failover_create(name: str, namespace: Optional[str], body: Body,
logger: Logger, **kwargs) -> None:
# TODO: move this to a proper structure
logger.info(f'Fetching {body["spec"]["clusterName"]}')
cluster = cluster_api.InnoDBCluster.read(namespace, body["spec"]["clusterName"])
job = cluster_objects.prepare_service_failover_job(name, body["spec"]["force"], body["spec"].get("options"), cluster.parsed_spec, logger)
kopf.adopt(job)
api_batch.create_namespaced_job(namespace, body=job)
@kopf.on.delete("", "v1", "pods",
labels={"component": "mysqlrouter"}) # type: ignore
def on_router_pod_delete(body: Body, logger: Logger, namespace: str, **kwargs):
logger.info("on_router_pod_delete")
router_name = body["metadata"]["name"]
try:
cluster_name = body["metadata"]["labels"]["mysql.oracle.com/cluster"]
cluster = cluster_api.InnoDBCluster.read(namespace, cluster_name)
controller = ClusterController(cluster)
controller.on_router_pod_delete(router_name, logger)
except Exception as exc:
# Ignore errors, there isn't much we could do
# and there is no point in retrying forever
logger.warning(f"on_router_pod_delete: Failed to remove metadata for {router_name}: {exc}")
#print(traceback.format_exc())
logger.warning("on_router_pod_delete: Exception ignored, there might be stale metadata left")