mysqloperator/controller/innodbcluster/cluster_controller.py (676 lines of code) (raw):
# Copyright (c) 2020, 2024, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
from kopf._cogs.structs.bodies import Body
from .. import consts, errors, shellutils, utils, config, mysqlutils
from .. import diagnose
from ..backup import backup_objects
from ..shellutils import DbaWrap
from . import cluster_objects, router_objects
from .cluster_api import MySQLPod, InnoDBCluster, client
import typing
from typing import Optional, TYPE_CHECKING, Dict, cast, Callable, Union
from logging import Logger
if TYPE_CHECKING:
from mysqlsh.mysql import ClassicSession
from mysqlsh import Dba, Cluster
import os
import copy
import mysqlsh
import kopf
import datetime
import time
common_gr_options = {
# Abort the server if member is kicked out of the group, which would trigger
# an event from the container restart, which we can catch and act upon.
# This also makes autoRejoinTries irrelevant.
"exitStateAction": "ABORT_SERVER"
}
def select_pod_with_most_gtids(gtids: Dict[int, str]) -> int:
pod_indexes = list(gtids.keys())
pod_indexes.sort(key = lambda a: mysqlutils.count_gtids(gtids[a]))
return pod_indexes[-1]
class ClusterMutex:
def __init__(self, cluster: InnoDBCluster, pod: Optional[MySQLPod] = None, context: str = "n/a"):
self.cluster = cluster
self.pod = pod
self.context = context
def __enter__(self, *args):
owner_lock_creation_time: datetime.datetime
(owner, owner_context, owner_lock_creation_time) = utils.g_ephemeral_pod_state.testset(
self.cluster, "cluster-mutex", self.pod.name if self.pod else self.cluster.name, context=self.context)
if owner:
raise kopf.TemporaryError(
f"{self.cluster.name} busy. lock_owner={owner} owner_context={owner_context} lock_created_at={owner_lock_creation_time.isoformat()}", delay=10)
def __exit__(self, *args):
utils.g_ephemeral_pod_state.set(self.cluster, "cluster-mutex", None, context=self.context)
class ClusterController:
"""
This is the controller for a innodbcluster object.
It's the main controller for a cluster and drives the lifecycle of the
cluster including creation, scaling and restoring from outages.
"""
def __init__(self, cluster: InnoDBCluster):
self.cluster = cluster
self.dba: Optional[Dba] = None
self.dba_cluster: Optional[Cluster] = None
@property
def dba_cluster_name(self) -> str:
"""Return the name of the cluster as defined in the k8s resource
as a InnoDB Cluster compatible name."""
return self.cluster.name.replace("-", "_").replace(".", "_")
def publish_status(self, diag: diagnose.ClusterStatus, logger: Logger) -> None:
cluster_status = self.cluster.get_cluster_status()
if cluster_status and cluster_status["status"] != diag.status.name:
self.cluster.info(action="ClusterStatus", reason="StatusChange",
message=f"Cluster status changed to {diag.status.name}. {len(diag.online_members)} member(s) ONLINE")
type = diag.type
if (diag.status == diagnose.ClusterDiagStatus.PENDING or diag.status == diagnose.ClusterDiagStatus.INITIALIZING):
if (not self.cluster.parsed_spec.initDB):
type = diagnose.ClusterInClusterSetType.PRIMARY
elif not self.cluster.parsed_spec.initDB.cluster_set:
type = diagnose.ClusterInClusterSetType.PRIMARY
else:
# TODO: Should we declare it a replica or wait it to be diagnosed as replica, if initDB.cluster_set succeeds?
type = diagnose.ClusterInClusterSetType.REPLICA_CANDIDATE
logger.info("Publishing cluster status")
cluster_status = {
"status": diag.status.name,
"onlineInstances": len(diag.online_members),
"type": type.value,
"lastProbeTime": utils.isotime()
}
self.cluster.set_cluster_status(cluster_status)
def probe_status(self, logger: Logger) -> diagnose.ClusterStatus:
diag = diagnose.diagnose_cluster(self.cluster, logger)
if not self.cluster.deleting:
self.publish_status(diag, logger)
logger.info(f"cluster probe: status={diag.status} online={diag.online_members}")
return diag
def probe_status_if_needed(self, changed_pod: MySQLPod, logger: Logger) -> diagnose.ClusterDiagStatus:
cluster_probe_time = self.cluster.get_cluster_status("lastProbeTime")
member_transition_time = changed_pod.get_membership_info("lastTransitionTime")
last_status = self.cluster.get_cluster_status("status")
unreachable_states = (diagnose.ClusterDiagStatus.UNKNOWN,
diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN,
diagnose.ClusterDiagStatus.OFFLINE_UNCERTAIN,
diagnose.ClusterDiagStatus.NO_QUORUM_UNCERTAIN,
diagnose.ClusterDiagStatus.SPLIT_BRAIN_UNCERTAIN)
logger.info(f"cluster_probe_time={cluster_probe_time} member_transition_time={member_transition_time} last_status={last_status}")
if cluster_probe_time and member_transition_time and cluster_probe_time <= member_transition_time or last_status in unreachable_states:
return self.probe_status(logger).status
logger.info("Returning last status")
return last_status
def probe_member_status(self, pod: MySQLPod, session: 'ClassicSession', joined: bool, logger: Logger) -> None:
# TODO use diagnose?
minfo = shellutils.query_membership_info(session)
member_id, role, status, view_id, version, mcount, rmcount = minfo
logger.debug(
f"instance probe: role={role} status={status} view_id={view_id} version={version} members={mcount} reachable_members={rmcount}")
pod.update_membership_status(
member_id, role, status, view_id, version, joined=joined)
# TODO
if status == "ONLINE":
pod.update_member_readiness_gate("ready", True)
else:
pod.update_member_readiness_gate("ready", False)
return minfo
def connect_to_primary(self, primary_pod: MySQLPod, logger: Logger) -> 'Cluster':
if primary_pod:
self.dba = shellutils.connect_dba(
primary_pod.endpoint_co, logger, max_tries=2)
self.dba_cluster = self.dba.get_cluster()
else:
# - check if we should consider pod marker for whether the instance joined
self.connect_to_cluster(logger, need_primary=True)
assert self.dba_cluster
return self.dba_cluster
def connect_to_cluster(self, logger: Logger, need_primary:bool = False) -> MySQLPod:
# Get list of pods and try to connect to one of them
def try_connect() -> MySQLPod:
last_exc = None
offline_pods = []
all_pods = self.cluster.get_pods()
for pod in all_pods:
if pod.name in offline_pods or pod.deleting:
continue
try:
self.dba = mysqlsh.connect_dba(pod.endpoint_co)
if need_primary:
res = self.dba.session.run_sql(
"SELECT member_role"
" FROM performance_schema.replication_group_members"
" WHERE member_host = @@report_host")
r = res.fetch_one()
if r[0] != "PRIMARY":
logger.info(f"Primary requested, but {pod.name} is no primary")
self.dba.session.close()
continue
except Exception as e:
logger.debug(f"connect_dba: target={pod.name} error={e}")
# Try another pod if we can't connect to it
last_exc = e
continue
try:
self.dba_cluster = self.dba.get_cluster()
logger.info(f"Connected to {pod}")
return pod
except mysqlsh.Error as e:
logger.info(
f"get_cluster() from {pod.name} failed: {e}")
if e.code == errors.SHERR_DBA_BADARG_INSTANCE_NOT_ONLINE:
# This member is not ONLINE, so there's no chance of
# getting a cluster handle from it
offline_pods.append(pod.name)
except Exception as e:
logger.info(
f"get_cluster() from {pod.name} failed: {e}")
# If all pods are connectable but OFFLINE, then we have complete outage and need a reboot
if len(offline_pods) == len(all_pods):
raise kopf.TemporaryError(
"Could not connect to any cluster member", delay=15)
if last_exc:
raise last_exc
raise kopf.TemporaryError(
"Could not connect to any cluster member", delay=15)
return try_connect()
def log_mysql_info(self, pod: MySQLPod, session: 'ClassicSession', logger: Logger) -> None:
row = session.run_sql(
"select @@server_id, @@server_uuid, @@report_host").fetch_one()
server_id, server_uuid, report_host = row
try:
row = session.run_sql(
"select @@globals.gtid_executed, @@globals.gtid_purged").fetch_one()
gtid_executed, gtid_purged = row
except:
gtid_executed, gtid_purged = None, None
logger.info(
f"server_id={server_id} server_uuid={server_uuid} report_host={report_host} gtid_executed={gtid_executed} gtid_purged={gtid_purged}")
def create_cluster(self, seed_pod: MySQLPod, logger: Logger) -> None:
logger.info("Creating cluster at %s" % seed_pod.name)
assume_gtid_set_complete = False
initial_data_source = "blank"
if self.cluster.parsed_spec.initDB:
# TODO store version
# TODO store last known quorum
if self.cluster.parsed_spec.initDB.clone:
initial_data_source = f"clone={self.cluster.parsed_spec.initDB.clone.uri}"
elif self.cluster.parsed_spec.initDB.dump and seed_pod.index == 0: # A : Should we check for index?
if self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage.bucketName}"
elif self.cluster.parsed_spec.initDB.dump.storage.s3:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.s3.bucketName}"
elif self.cluster.parsed_spec.initDB.dump.storage.azure:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.azure.containerName}",
elif self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim}"
else:
assert 0, "Unknown Dump storage mechanism"
elif self.cluster.parsed_spec.initDB.meb:
initial_data_source = "meb"
elif self.cluster.parsed_spec.initDB.cluster_set:
initial_data_source = f"clusterSet={self.cluster.parsed_spec.initDB.cluster_set.uri}"
else:
print(f"{self.cluster.parsed_spec.initDB=} -> {self.cluster.parsed_spec.initDB.cluster_set=}")
assert 0, "Unknown initDB source"
else:
# We're creating the cluster from scratch, so GTID set is sure to be complete
assume_gtid_set_complete = True
# The operator manages GR, so turn off start_on_boot to avoid conflicts
create_options = {
"gtidSetIsComplete": assume_gtid_set_complete,
"manualStartOnBoot": True,
"memberSslMode": "REQUIRED" if self.cluster.parsed_spec.tlsUseSelfSigned else "VERIFY_IDENTITY",
}
if not self.cluster.parsed_spec.tlsUseSelfSigned:
logger.info("Using TLS GR authentication")
rdns = seed_pod.get_cluster().get_tls_issuer_and_subject_rdns()
create_options["memberAuthType"] = "CERT_SUBJECT"
create_options["certIssuer"] = rdns["issuer"]
create_options["certSubject"] = rdns["subject"]
else:
logger.info("Using PASSWORD GR authentication")
create_options.update(common_gr_options)
cluster_info = {
"initialDataSource" : initial_data_source,
"createOptions" : create_options,
}
self.cluster.update_cluster_info(cluster_info)
def should_retry(err):
if seed_pod.deleting:
return False
return True
with DbaWrap(shellutils.connect_dba(seed_pod.endpoint_co, logger, is_retriable=should_retry)) as dba:
try:
self.dba_cluster = dba.get_cluster()
# maybe from a previous incomplete create attempt
logger.info("Cluster already exists")
except:
self.dba_cluster = None
seed_pod.add_member_finalizer()
if not self.dba_cluster:
self.log_mysql_info(seed_pod, dba.session, logger)
logger.info(f"CREATE CLUSTER: seed={seed_pod.name}, options={create_options}")
try:
self.dba_cluster = dba.create_cluster(
self.dba_cluster_name, create_options)
logger.info("create_cluster OK")
except mysqlsh.Error as e:
# If creating the cluster failed, remove the membership finalizer
seed_pod.remove_member_finalizer()
# can happen when retrying
if e.code == errors.SHERR_DBA_BADARG_INSTANCE_ALREADY_IN_GR:
logger.info(
f"GR already running at {seed_pod.endpoint}, stopping before retrying...")
try:
dba.session.run_sql("STOP GROUP_REPLICATION")
except mysqlsh.Error as e:
logger.info(f"Could not stop GR plugin: {e}")
# throw a temporary error for a full retry later
raise kopf.TemporaryError(
"GR already running while creating cluster but could not stop it", delay=3)
raise
# Only on first cluster! Not if this is supposed to join some other
if not self.cluster.parsed_spec.initDB or not self.cluster.parsed_spec.initDB.cluster_set:
try:
_ = self.dba_cluster.get_cluster_set()
# maybe from a previous incomplete create attempt
logger.info("ClusterSet already exists")
except:
# TODO: what to do if this fails?
self.dba_cluster_set = self.dba_cluster.create_cluster_set(self.dba_cluster_name)
routing_options = self.cluster.parsed_spec.router.routingOptions
for routing_option in routing_options:
try:
routing_value = routing_options[routing_option]
logger.info(f"Setting Router Option [{routing_option}] to [{routing_value}]")
self.dba_cluster_set.set_routing_option(routing_option, routing_value)
except mysqlsh.Error as e:
# We don't fail when setting an option fails
logger.warning(f"Failed setting routing option {routing_option} to {routing_value}: {e}")
self.probe_member_status(seed_pod, dba.session, True, logger)
logger.debug("Cluster created %s" % self.dba_cluster.status())
# if there's just 1 pod, then the cluster is ready... otherwise, we
# need to wait until all pods have joined
if self.cluster.parsed_spec.instances == 1:
self.post_create_actions(dba.session, self.dba_cluster, logger)
self.probe_member_status(seed_pod, dba.session, True, logger)
def post_create_actions(self, session: 'ClassicSession', dba_cluster: 'Cluster', logger: Logger) -> None:
logger.info("cluster_controller::post_create_actions")
# create router account
user, password = self.cluster.get_router_account()
update = True
try:
session.run_sql("show grants for ?@'%'", [user])
except mysqlsh.Error as e:
if e.code == mysqlsh.mysql.ErrorCode.ER_NONEXISTING_GRANT:
update = False
else:
raise
logger.debug(
f"{'Updating' if update else 'Creating'} router account {user}")
dba_cluster.setup_router_account(
user, {"password": password, "update": update})
# update read replicas
for rr in self.cluster.parsed_spec.readReplicas:
logger.debug(f"Setting {rr.name} replicas to {rr.instances}")
cluster_objects.update_stateful_set_size(self.cluster, rr, logger)
# update the router deployment
n = self.cluster.parsed_spec.router.instances
if n:
logger.debug(f"Setting router replicas to {n}")
router_objects.update_size(self.cluster, n, False, logger)
def reboot_cluster(self, seed_pod_index: MySQLPod, logger: Logger) -> None:
pods = self.cluster.get_pods()
seed_pod = pods[seed_pod_index]
logger.info(f"Rebooting cluster {self.cluster.name} from pod {seed_pod}...")
self.dba = shellutils.connect_dba(seed_pod.endpoint_co, logger)
self.log_mysql_info(seed_pod, self.dba.session, logger)
seed_pod.add_member_finalizer()
self.dba_cluster = self.dba.reboot_cluster_from_complete_outage()
logger.info(f"reboot_cluster_from_complete_outage OK.")
# rejoin other pods
for pod in pods:
if pod.index != seed_pod_index:
with shellutils.connect_to_pod(pod, logger, timeout=5) as session:
try:
self.rejoin_instance(pod, session, logger)
except:
# TODO - verify this will be retried from elsewhere
print("==================================================")
print(f"INSTANCE REJOIN FAILED for {pod.name}")
print("=================================================")
import traceback
traceback.print_exc()
# TODO: May not be in a ClusterSet (old Cluster?)
cs = self.dba_cluster.get_cluster_set()
cs_status = cs.status()
# TODO: is there really a valid case where
# cs_status["clusters"][name] won't exist?
if cs_status.get("clusters", {}).get(self.cluster.name, {}).get("globalStatus") == "INVALIDATED":
try:
cs.rejoin_cluster(self.cluster.name)
except:
print("========================")
print("CLUSTERSET REJOIN FAILED")
print("========================")
import traceback
traceback.print_exc()
status = self.dba_cluster.status()
logger.info(f"Cluster reboot successful. status={status}")
self.probe_member_status(seed_pod, self.dba.session, True, logger)
def force_quorum(self, seed_pod, logger: Logger) -> None:
logger.info(
f"Forcing quorum of cluster {self.cluster.name} using {seed_pod.name}...")
self.connect_to_primary(seed_pod, logger)
self.dba_cluster.force_quorum_using_partition_of(seed_pod.endpoint_co)
status = self.dba_cluster.status()
logger.info(f"Force quorum successful. status={status}")
# TODO Rejoin OFFLINE members
def destroy_cluster(self, last_pod, logger: Logger) -> None:
logger.info(f"Stopping GR for last cluster member {last_pod.name}")
try:
with shellutils.connect_to_pod(last_pod, logger, timeout=5) as session:
# Just stop GR
session.run_sql("STOP group_replication")
except Exception as e:
logger.warning(
f"Error stopping GR at last cluster member, ignoring... {e}")
# Remove the pod membership finalizer even if we couldn't do final cleanup
# (it's just stop GR, which should be harmless most of the time)
last_pod.remove_member_finalizer()
return
logger.info("Stop GR OK")
last_pod.remove_member_finalizer()
def reconcile_pod(self, primary_pod: MySQLPod, pod: MySQLPod, logger: Logger) -> None:
with DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger)) as pod_dba_session:
cluster = self.connect_to_primary(primary_pod, logger)
status = diagnose.diagnose_cluster_candidate(
self.dba.session, cluster, pod, pod_dba_session, logger)
logger.info(
f"Reconciling {pod}: state={status.status} deleting={pod.deleting} cluster_deleting={self.cluster.deleting}")
if pod.deleting or self.cluster.deleting:
return
# TODO check case where a member pod was deleted and then rejoins with the same address but different uuid
if status.status == diagnose.CandidateDiagStatus.JOINABLE:
self.cluster.info(action="ReconcilePod", reason="Join",
message=f"Joining {pod.name} to cluster")
self.join_instance(pod, pod_dba_session, logger)
elif status.status == diagnose.CandidateDiagStatus.REJOINABLE:
self.cluster.info(action="ReconcilePod", reason="Rejoin",
message=f"Rejoining {pod.name} to cluster")
self.rejoin_instance(pod, pod_dba_session.session, logger)
elif status.status == diagnose.CandidateDiagStatus.MEMBER:
logger.info(f"{pod.endpoint} already a member")
self.probe_member_status(pod, pod_dba_session.session, False, logger)
elif status.status == diagnose.CandidateDiagStatus.UNREACHABLE:
# TODO check if we should throw a tmp error or do nothing
logger.error(f"{pod.endpoint} is unreachable")
self.probe_member_status(pod, pod_dba_session.session, False, logger)
else:
# TODO check if we can repair broken instances
# It would be possible to auto-repair an instance with errant
# transactions by cloning over it, but that would mean these
# errants are lost.
logger.error(f"{pod.endpoint} is in state {status.status}")
self.probe_member_status(pod, pod_dba_session.session, False, logger)
def join_instance(self, pod: MySQLPod, pod_dba_session: 'Dba', logger: Logger) -> None:
logger.info(f"Adding {pod.endpoint} to cluster")
peer_pod = self.connect_to_cluster(logger)
self.log_mysql_info(pod, pod_dba_session.session, logger)
# TODO - always use clone when dataset is big
# With Shell Bug #33900165 fixed we should use "auto" by default
# and remove the retry logic below
recovery_method = "incremental"
if self.cluster.parsed_spec.initDB and self.cluster.parsed_spec.initDB.meb:
# With a restore from a MEB backup server might not find the right
# binlogs for incremental restore and provision an empty replica
# clone does the right thing
recovery_method = "clone"
add_options = {
"recoveryMethod": recovery_method,
}
# TODO : # add_replica_instance doesn't support cert base auth, thus certSubject works only for group-member-s - WL15056
# If a cluster was created with cert based auth between the group members no replica can join the cluster
for option in self.dba_cluster.options()["defaultReplicaSet"]["globalOptions"]:
if option["option"] == "memberAuthType" and option["value"] in ["CERT_SUBJECT", "CERT_SUBJECT_PASSWORD"]:
rdns = pod.get_cluster().get_tls_issuer_and_subject_rdns()
# add_instance() needs only certSubject and but not memberAuthType and certIssuer
add_options["certSubject"] = rdns["subject"]
if pod.instance_type == "group-member":
add_options.update(common_gr_options)
logger.info(
f"ADD INSTANCE: target={pod.endpoint} instance_type={pod.instance_type} cluster_peer={peer_pod.endpoint} options={add_options}...")
pod.add_member_finalizer()
report_host = pod_dba_session.session.run_sql('SELECT @@report_host').fetch_one()[0]
print(f"DBA SESSION GOES TO {report_host}!")
try:
if pod.instance_type == "read-replica":
self.dba_cluster.add_replica_instance(pod.endpoint, add_options)
else:
self.dba_cluster.add_instance(pod.endpoint_co, add_options)
logger.debug("add_instance OK")
except (mysqlsh.Error, RuntimeError) as e:
logger.warning(f"add_instance failed: error={e}")
# Incremetnal may fail if transactions are missing from binlog
# retry using clone
add_options["recoveryMethod"] = "clone"
logger.warning(f"trying add_instance with clone")
try:
if pod.instance_type == "read-replica":
self.dba_cluster.add_replica_instance(pod.endpoint, add_options)
else:
self.dba_cluster.add_instance(pod.endpoint_co, add_options)
except (mysqlsh.Error, RuntimeError) as e:
logger.warning(f"add_instance failed second time: error={e}")
raise
if pod.instance_type == "read-replica":
# This is not perfect, as we don't track this further, but async
# replication gives us limited information only
pod.update_member_readiness_gate("ready", True)
else:
with DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger)) as dba_session:
# TODO: pod_dba_session may be invalid on caller side if the
# pod was provisioned via clone, which may lead to future
# bugs, also always using a new connection here is "inefficient"
# to a small degree.
# In case clone is used and we need a reconnect we have
# to communicate that to the caller, else we see bugs in
# futre
minfo = self.probe_member_status(pod, dba_session.session,
True, logger)
member_id, role, status, view_id, version, member_count, reachable_member_count = minfo
logger.info(f"JOINED {pod.name}: {minfo}")
# if the cluster size is complete, ensure routers are deployed
if not router_objects.get_size(self.cluster) and member_count == self.cluster.parsed_spec.instances:
self.post_create_actions(self.dba.session, self.dba_cluster, logger)
def rejoin_instance(self, pod: MySQLPod, pod_session, logger: Logger) -> None:
logger.info(f"Rejoining {pod.endpoint} to cluster")
if not self.dba_cluster:
self.connect_to_cluster(logger)
self.log_mysql_info(pod, pod_session, logger)
rejoin_options = {}
logger.info(
f"rejoin_instance: target={pod.endpoint} options={rejoin_options}...")
try:
self.dba_cluster.rejoin_instance(pod.endpoint, rejoin_options)
logger.debug("rejoin_instance OK")
except mysqlsh.Error as e:
logger.warning(f"rejoin_instance failed: error={e}")
raise
self.probe_member_status(pod, pod_session, False, logger)
def remove_instance(self, pod: MySQLPod, pod_body: Body, logger: Logger, force: bool = False) -> None:
try:
self.__remove_instance_aux(pod, logger, force)
except Exception as e:
logger.info(f"Exception {e} caught")
pass
finally:
# Remove the membership finalizer to allow the pod to be removed
pod.remove_member_finalizer(pod_body)
logger.info(f"Removed finalizer for pod {pod_body['metadata']['name']}")
def __remove_instance_aux(self, pod: MySQLPod, logger: Logger, force: bool = False) -> None:
print(f"Removing {pod.endpoint} from cluster FORCE={force}")
# TODO improve this check
other_pods = self.cluster.get_pods()
if len(other_pods) == 1 and pod.instance_type == 'group-member':
print("There is only one pod left in the cluster. Won't remove it, as this will dissolve the cluster. It will be removed only if the cluster is being deleted.")
if len(other_pods) > 1 or (len(other_pods) > 0 and pod.instance_type == 'read-replica'):
try:
print("connect_to_cluster")
peer_pod = self.connect_to_cluster(logger)
print(f"peer_pod={peer_pod}")
except mysqlsh.Error as e:
peer_pod = None
if self.cluster.deleting:
logger.warning(
f"Could not connect to cluster, but ignoring because we're deleting: error={e}")
else:
logger.error(f"Could not connect to cluster: error={e}")
raise
if peer_pod:
removed = False
remove_options = {}
if not force:
logger.info(
f"remove_instance: {pod.name} peer={peer_pod.name} options={remove_options}")
try:
self.dba_cluster.remove_instance(pod.endpoint, remove_options)
removed = True
logger.debug("remove_instance OK")
except mysqlsh.Error as e:
logger.warning(f"remove_instance failed: error={e}")
if e.code == mysqlsh.mysql.ErrorCode.ER_OPTION_PREVENTS_STATEMENT:
# super_read_only can still be true on a PRIMARY for a
# short time
raise kopf.TemporaryError(
f"{peer_pod.name} is a PRIMARY but super_read_only is ON", delay=5)
elif e.code == errors.SHERR_DBA_MEMBER_METADATA_MISSING:
# already removed and we're probably just retrying
removed = True
print(f"removed={removed}")
if not removed:
remove_options["force"] = True
logger.info(
f"remove_instance: {pod.name} peer={peer_pod.name} options={remove_options}")
try:
self.dba_cluster.remove_instance(pod.endpoint, remove_options)
logger.info("FORCED remove_instance OK")
except mysqlsh.Error as e:
logger.warning(f"remove_instance failed: error={e}")
if e.code == errors.SHERR_DBA_MEMBER_METADATA_MISSING:
pass
else:
deleting = not self.cluster or self.cluster.deleting
if deleting:
logger.info(
f"force remove_instance failed. Ignoring because cluster is deleted: error={e} peer={peer_pod.name}")
else:
logger.error(
f"force remove_instance failed. error={e} deleting_cluster={deleting} peer={peer_pod.name}")
raise
except RuntimeError as e:
logger.info(f"force remove_instance failed. RuntimeError {e}")
if str(e).find("The cluster object is disconnected") == -1:
logger.info(f"Can't do anything to remove {pod.name} cleanly")
raise
else:
logger.error(
f"Cluster is not available, skipping clean removal of {pod.name}")
def repair_cluster(self, pod: MySQLPod, diagnostic: diagnose.ClusterStatus, logger: Logger) -> None:
# TODO check statuses where router has to be put down
# Restore cluster to an ONLINE state
if diagnostic.status == diagnose.ClusterDiagStatus.ONLINE:
# Nothing to do
return
elif diagnostic.status == diagnose.ClusterDiagStatus.ONLINE_PARTIAL:
# Nothing to do, rejoins handled on pod events
return
elif diagnostic.status == diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN:
# Nothing to do
# TODO maybe delete unreachable pods if enabled?
return
elif diagnostic.status == diagnose.ClusterDiagStatus.OFFLINE:
# Reboot cluster if all pods are reachable
if len([g for g in diagnostic.gtid_executed.values() if g is not None]) == len(self.cluster.get_pods()):
seed_pod = select_pod_with_most_gtids(diagnostic.gtid_executed)
self.cluster.info(action="RestoreCluster", reason="Rebooting",
message=f"Restoring OFFLINE cluster through pod {seed_pod}")
shellutils.RetryLoop(logger).call(self.reboot_cluster, seed_pod, logger)
else:
logger.debug(f"Cannot reboot cluster because not all pods are reachable")
raise kopf.TemporaryError(
f"Cluster cannot be restored because there are unreachable pods", delay=5)
elif diagnostic.status == diagnose.ClusterDiagStatus.OFFLINE_UNCERTAIN:
# TODO delete unconnectable pods after timeout, if enabled
raise kopf.TemporaryError(
f"Unreachable members found while in state {diagnostic.status}, waiting...")
elif diagnostic.status == diagnose.ClusterDiagStatus.NO_QUORUM:
# Restore cluster
self.cluster.info(action="RestoreCluster", reason="RestoreQuorum",
message="Restoring quorum of cluster")
shellutils.RetryLoop(logger).call(
self.force_quorum, diagnostic.quorum_candidates[0], logger)
elif diagnostic.status == diagnose.ClusterDiagStatus.NO_QUORUM_UNCERTAIN:
# Restore cluster
# TODO delete unconnectable pods after timeout, if enabled
raise kopf.TemporaryError(
f"Unreachable members found while in state {diagnostic.status}, waiting...")
elif diagnostic.status == diagnose.ClusterDiagStatus.SPLIT_BRAIN:
self.cluster.error(action="UnrecoverableState", reason="SplitBrain",
message="Cluster is in a SPLIT-BRAIN state and cannot be restored automatically.")
# TODO check if recoverable case
# Fatal error, user intervention required
raise kopf.PermanentError(
f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")
elif diagnostic.status == diagnose.ClusterDiagStatus.SPLIT_BRAIN_UNCERTAIN:
# TODO check if recoverable case and if NOT, then throw a permanent error
self.cluster.error(action="UnrecoverableState", reason="SplitBrain",
message="Cluster is in state SPLIT-BRAIN with unreachable instances and cannot be restored automatically.")
raise kopf.PermanentError(
f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")
# TODO delete unconnectable pods after timeout, if enabled
raise kopf.TemporaryError(
f"Unreachable members found while in state {diagnostic.status}, waiting...")
elif diagnostic.status == diagnose.ClusterDiagStatus.UNKNOWN:
# Nothing to do, but we can try again later and hope something comes back
raise kopf.TemporaryError(
f"No members of the cluster could be reached. state={diagnostic.status}")
elif diagnostic.status == diagnose.ClusterDiagStatus.INVALID:
self.cluster.error(action="UnrecoverableState", reason="Invalid",
message="Cluster state is invalid and cannot be restored automatically.")
raise kopf.PermanentError(
f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")
elif diagnostic.status == diagnose.ClusterDiagStatus.FINALIZING:
# Nothing to do
return
else:
raise kopf.PermanentError(
f"Invalid cluster state {diagnostic.status}")
def on_router_tls_changed(self) -> None:
"""
Router pods need to be recreated in order for new certificates to get
reloaded.
"""
pass
def on_pod_created(self, pod: MySQLPod, logger: Logger) -> None:
print("on_pod_created: probing cluster")
diag = self.probe_status(logger)
print(f"on_pod_created: pod={pod.name} primary={diag.primary} cluster_state={diag.status}")
if diag.status == diagnose.ClusterDiagStatus.INITIALIZING:
# If cluster is not yet created, then we create it at pod-0
if pod.index == 0:
if self.cluster.get_create_time():
raise kopf.PermanentError(
f"Internal inconsistency: cluster marked as initialized, but create requested again")
print("Time to create the cluster")
shellutils.RetryLoop(logger).call(self.create_cluster, pod, logger)
# Mark the cluster object as already created
self.cluster.set_create_time(datetime.datetime.now())
else:
# Other pods must wait for the cluster to be ready
raise kopf.TemporaryError("Cluster is not yet ready", delay=15)
elif diag.status in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL, diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN):
print("Reconciling pod")
# Cluster exists and is healthy, join the pod to it
shellutils.RetryLoop(logger).call(
self.reconcile_pod, diag.primary, pod, logger)
else:
print("Attempting to repair the cluster")
self.repair_cluster(pod, diag, logger)
# Retry from scratch in another iteration
raise kopf.TemporaryError(f"Cluster repair from state {diag.status} attempted", delay=5)
def on_pod_restarted(self, pod: MySQLPod, logger: Logger) -> None:
diag = self.probe_status(logger)
logger.debug(
f"on_pod_restarted: pod={pod.name} primary={diag.primary} cluster_state={diag.status}")
if diag.status not in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL):
self.repair_cluster(pod, diag, logger)
shellutils.RetryLoop(logger).call(
self.reconcile_pod, diag.primary, pod, logger)
def on_pod_deleted(self, pod: MySQLPod, pod_body: Body, logger: Logger) -> None:
diag = self.probe_status(logger)
print(f"on_pod_deleted: pod={pod.name} primary={diag.primary} cluster_state={diag.status} cluster.deleting={self.cluster.deleting}")
if self.cluster.deleting:
# cluster is being deleted, if this is pod-0 shut it down
if pod.index == 0:
self.destroy_cluster(pod, logger)
pod.remove_member_finalizer(pod_body)
return
if pod.deleting and diag.status in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL, diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN, diagnose.ClusterDiagStatus.FINALIZING):
print(f"REMOVING INSTANCE {pod.name}")
shellutils.RetryLoop(logger).call(
self.remove_instance, pod, pod_body, logger)
else:
print("ATTEMPTING CLUSTER REPAIR")
self.repair_cluster(pod, diag, logger)
# Retry from scratch in another iteration
print("RETRYING ON POD DELETE")
raise kopf.TemporaryError(f"Cluster repair from state {diag.status} attempted", delay=3)
# TODO maybe not needed? need to make sure that shrinking cluster will be reported as ONLINE
self.probe_status(logger)
def on_group_view_change(self, members: list[tuple], view_id_changed) -> None:
"""
Query membership info about the cluster and update labels and
annotations in each pod.
This is for monitoring only and should not trigger any changes other
than in informational k8s fields.
"""
for pod in self.cluster.get_pods():
info = pod.get_membership_info()
if info:
pod_member_id = info.get("memberId")
else:
pod_member_id = None
for member_id, role, status, view_id, endpoint, version in members:
if pod_member_id and member_id == pod_member_id:
pass
elif endpoint == pod.endpoint:
pass
else:
continue
pod.update_membership_status(
member_id, role, status, view_id, version)
if status == "ONLINE":
pod.update_member_readiness_gate("ready", True)
else:
pod.update_member_readiness_gate("ready", False)
break
def on_server_image_change(self, version: str) -> None:
return self.on_upgrade(version = version)
def on_server_version_change(self, version: str) -> None:
return self.on_upgrade(version = version)
def on_upgrade(self, version: str) -> None:
# TODO change status as needed - especially on version error, but make sure we recover
[version_valid, version_error] = utils.version_in_range(version)
if not version_valid:
raise kopf.PermanentError(version_error)
def on_router_upgrade(self, logger: Logger) -> None:
def on_nonupdated() -> None:
raise kopf.TemporaryError(f"Cluster {self.cluster.namespace}/{self.cluster.name} unreachable", delay=5)
router_objects.update_router_account(self.cluster, on_nonupdated, logger)
def on_change_metrics_user(self, logger: Logger) -> None:
metrics = self.cluster.parsed_spec.metrics
self.connect_to_primary(None, logger)
if not metrics or not metrics.enable:
# This will use default name. needs to adapt when supporting custom
# names
mysqlutils.remove_metrics_user(self.dba.session)
return
user = metrics.dbuser_name
grants = metrics.dbuser_grants
max_connections = metrics.dbuser_max_connections
mysqlutils.setup_metrics_user(self.dba.session, user, grants,
max_connections)
def on_router_pod_delete(self, name: Union[str, list], logger: Logger) -> None:
self.connect_to_cluster(logger)
if type(name) is str:
router_list = [name]
elif type(name) is list:
router_list = name
for router_name in router_list:
logger.info(f"Removing metadata for router {router_name} from {self.cluster.name}")
ret = self.dba_cluster.remove_router_metadata(router_name + '::')
logger.info(f"remove_router_metadata returned {ret}")
def on_router_routing_option_chahnge(self, old: dict, new: dict, logger: Logger) -> None:
self.connect_to_primary(None, logger)
dba = self.dba_cluster
try:
dba = self.dba_cluster.get_cluster_set()
logger.info("ClusterSet enabled cluster")
except:
logger.info("Old cluster without clusterset")
pass
# Unset removed entries
for key in old:
if key not in new:
try:
dba.set_routing_option(key, None)
except mysqlsh.Error as e:
# We don't fail when setting an option fails
logger.warning(f"Failed unsetting routing option {key}: {e}")
# Set new values, this resets existing values
for key in new:
try:
dba.set_routing_option(key, new[key])
except mysqlsh.Error as e:
# We don't fail when setting an option fails
logger.warning(f"Failed setting routing option {key} to {new[key]}: {e}")