# Copyright (c) 2020, 2024, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#

from kopf._cogs.structs.bodies import Body
from .. import consts, errors, shellutils, utils, config, mysqlutils
from .. import diagnose
from ..backup import backup_objects
from ..shellutils import DbaWrap
from . import cluster_objects, router_objects
from .cluster_api import MySQLPod, InnoDBCluster, client
import typing
from typing import Optional, TYPE_CHECKING, Dict, cast, Callable, Union
from logging import Logger
if TYPE_CHECKING:
    from mysqlsh.mysql import ClassicSession
    from mysqlsh import Dba, Cluster
import os
import copy
import mysqlsh
import kopf
import datetime
import time

common_gr_options = {
    # Abort the server if member is kicked out of the group, which would trigger
    # an event from the container restart, which we can catch and act upon.
    # This also makes autoRejoinTries irrelevant.
    "exitStateAction": "ABORT_SERVER"
}

def select_pod_with_most_gtids(gtids: Dict[int, str]) -> int:
    pod_indexes = list(gtids.keys())
    pod_indexes.sort(key = lambda a: mysqlutils.count_gtids(gtids[a]))
    return pod_indexes[-1]


class ClusterMutex:
    def __init__(self, cluster: InnoDBCluster, pod: Optional[MySQLPod] = None, context: str = "n/a"):
        self.cluster = cluster
        self.pod = pod
        self.context = context

    def __enter__(self, *args):
        owner_lock_creation_time: datetime.datetime
        (owner, owner_context, owner_lock_creation_time) = utils.g_ephemeral_pod_state.testset(
            self.cluster, "cluster-mutex", self.pod.name if self.pod else self.cluster.name, context=self.context)
        if owner:
            raise kopf.TemporaryError(
                f"{self.cluster.name} busy. lock_owner={owner} owner_context={owner_context} lock_created_at={owner_lock_creation_time.isoformat()}", delay=10)

    def __exit__(self, *args):
        utils.g_ephemeral_pod_state.set(self.cluster, "cluster-mutex", None, context=self.context)


class ClusterController:
    """
    This is the controller for a innodbcluster object.
    It's the main controller for a cluster and drives the lifecycle of the
    cluster including creation, scaling and restoring from outages.
    """

    def __init__(self, cluster: InnoDBCluster):
        self.cluster = cluster
        self.dba: Optional[Dba] = None
        self.dba_cluster: Optional[Cluster] = None

    @property
    def dba_cluster_name(self) -> str:
        """Return the name of the cluster as defined in the k8s resource
        as a InnoDB Cluster compatible name."""
        return self.cluster.name.replace("-", "_").replace(".", "_")

    def publish_status(self, diag: diagnose.ClusterStatus, logger: Logger) -> None:
        cluster_status = self.cluster.get_cluster_status()
        if cluster_status and cluster_status["status"] != diag.status.name:
            self.cluster.info(action="ClusterStatus", reason="StatusChange",
                              message=f"Cluster status changed to {diag.status.name}. {len(diag.online_members)} member(s) ONLINE")

        type = diag.type
        if (diag.status == diagnose.ClusterDiagStatus.PENDING or diag.status == diagnose.ClusterDiagStatus.INITIALIZING):
            if (not self.cluster.parsed_spec.initDB):
                type = diagnose.ClusterInClusterSetType.PRIMARY
            elif not self.cluster.parsed_spec.initDB.cluster_set:
                type = diagnose.ClusterInClusterSetType.PRIMARY
            else:
                # TODO: Should we declare it a replica or wait it to be diagnosed as replica, if initDB.cluster_set succeeds?
                type = diagnose.ClusterInClusterSetType.REPLICA_CANDIDATE

        logger.info("Publishing cluster status")
        cluster_status = {
            "status": diag.status.name,
            "onlineInstances": len(diag.online_members),
            "type": type.value,
            "lastProbeTime": utils.isotime()
        }
        self.cluster.set_cluster_status(cluster_status)

    def probe_status(self, logger: Logger) -> diagnose.ClusterStatus:
        diag = diagnose.diagnose_cluster(self.cluster, logger)
        if not self.cluster.deleting:
            self.publish_status(diag, logger)
        logger.info(f"cluster probe: status={diag.status} online={diag.online_members}")
        return diag

    def probe_status_if_needed(self, changed_pod: MySQLPod, logger: Logger) -> diagnose.ClusterDiagStatus:
        cluster_probe_time = self.cluster.get_cluster_status("lastProbeTime")
        member_transition_time = changed_pod.get_membership_info("lastTransitionTime")
        last_status = self.cluster.get_cluster_status("status")
        unreachable_states = (diagnose.ClusterDiagStatus.UNKNOWN,
                              diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN,
                              diagnose.ClusterDiagStatus.OFFLINE_UNCERTAIN,
                              diagnose.ClusterDiagStatus.NO_QUORUM_UNCERTAIN,
                              diagnose.ClusterDiagStatus.SPLIT_BRAIN_UNCERTAIN)
        logger.info(f"cluster_probe_time={cluster_probe_time}  member_transition_time={member_transition_time}  last_status={last_status}")
        if cluster_probe_time and member_transition_time and cluster_probe_time <= member_transition_time or last_status in unreachable_states:
            return self.probe_status(logger).status

        logger.info("Returning last status")
        return last_status

    def probe_member_status(self, pod: MySQLPod, session: 'ClassicSession', joined: bool, logger: Logger) -> None:
        # TODO use diagnose?
        minfo = shellutils.query_membership_info(session)
        member_id, role, status, view_id, version, mcount, rmcount = minfo
        logger.debug(
            f"instance probe: role={role} status={status} view_id={view_id} version={version} members={mcount} reachable_members={rmcount}")
        pod.update_membership_status(
            member_id, role, status, view_id, version, joined=joined)
        # TODO
        if status == "ONLINE":
            pod.update_member_readiness_gate("ready", True)
        else:
            pod.update_member_readiness_gate("ready", False)

        return minfo

    def connect_to_primary(self, primary_pod: MySQLPod, logger: Logger) -> 'Cluster':
        if primary_pod:
            self.dba = shellutils.connect_dba(
                primary_pod.endpoint_co, logger, max_tries=2)
            self.dba_cluster = self.dba.get_cluster()
        else:
            # - check if we should consider pod marker for whether the instance joined
            self.connect_to_cluster(logger, need_primary=True)
        assert self.dba_cluster
        return self.dba_cluster

    def connect_to_cluster(self, logger: Logger, need_primary:bool = False) -> MySQLPod:
        # Get list of pods and try to connect to one of them
        def try_connect() -> MySQLPod:
            last_exc = None
            offline_pods = []
            all_pods = self.cluster.get_pods()
            for pod in all_pods:
                if pod.name in offline_pods or pod.deleting:
                    continue

                try:
                    self.dba = mysqlsh.connect_dba(pod.endpoint_co)

                    if need_primary:
                        res = self.dba.session.run_sql(
                            "SELECT member_role"
                            " FROM performance_schema.replication_group_members"
                            " WHERE member_host = @@report_host")

                        r = res.fetch_one()
                        if r[0] != "PRIMARY":
                            logger.info(f"Primary requested, but {pod.name} is no primary")
                            self.dba.session.close()
                            continue

                except Exception as e:
                    logger.debug(f"connect_dba: target={pod.name} error={e}")
                    # Try another pod if we can't connect to it
                    last_exc = e
                    continue

                try:
                    self.dba_cluster = self.dba.get_cluster()
                    logger.info(f"Connected to {pod}")
                    return pod
                except mysqlsh.Error as e:
                    logger.info(
                        f"get_cluster() from {pod.name} failed: {e}")

                    if e.code == errors.SHERR_DBA_BADARG_INSTANCE_NOT_ONLINE:
                        # This member is not ONLINE, so there's no chance of
                        # getting a cluster handle from it
                        offline_pods.append(pod.name)

                except Exception as e:
                    logger.info(
                        f"get_cluster() from {pod.name} failed: {e}")

            # If all pods are connectable but OFFLINE, then we have complete outage and need a reboot
            if len(offline_pods) == len(all_pods):
                raise kopf.TemporaryError(
                    "Could not connect to any cluster member", delay=15)

            if last_exc:
                raise last_exc

            raise kopf.TemporaryError(
                "Could not connect to any cluster member", delay=15)

        return try_connect()

    def log_mysql_info(self, pod: MySQLPod, session: 'ClassicSession', logger: Logger) -> None:
        row = session.run_sql(
            "select @@server_id, @@server_uuid, @@report_host").fetch_one()
        server_id, server_uuid, report_host = row
        try:
            row = session.run_sql(
                "select @@globals.gtid_executed, @@globals.gtid_purged").fetch_one()
            gtid_executed, gtid_purged = row
        except:
            gtid_executed, gtid_purged = None, None

        logger.info(
            f"server_id={server_id} server_uuid={server_uuid}  report_host={report_host}  gtid_executed={gtid_executed}  gtid_purged={gtid_purged}")

    def create_cluster(self, seed_pod: MySQLPod, logger: Logger) -> None:
        logger.info("Creating cluster at %s" % seed_pod.name)

        assume_gtid_set_complete = False
        initial_data_source = "blank"
        if self.cluster.parsed_spec.initDB:
            # TODO store version
            # TODO store last known quorum
            if self.cluster.parsed_spec.initDB.clone:
                initial_data_source = f"clone={self.cluster.parsed_spec.initDB.clone.uri}"
            elif self.cluster.parsed_spec.initDB.dump and seed_pod.index == 0: # A : Should we check for index?
                if self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage:
                    initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage.bucketName}"
                elif self.cluster.parsed_spec.initDB.dump.storage.s3:
                    initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.s3.bucketName}"
                elif self.cluster.parsed_spec.initDB.dump.storage.azure:
                    initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.azure.containerName}",
                elif self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim:
                    initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim}"
                else:
                    assert 0, "Unknown Dump storage mechanism"
            elif self.cluster.parsed_spec.initDB.meb:
                initial_data_source = "meb"
            elif self.cluster.parsed_spec.initDB.cluster_set:
                initial_data_source = f"clusterSet={self.cluster.parsed_spec.initDB.cluster_set.uri}"
            else:
                print(f"{self.cluster.parsed_spec.initDB=} -> {self.cluster.parsed_spec.initDB.cluster_set=}")
                assert 0, "Unknown initDB source"
        else:
            # We're creating the cluster from scratch, so GTID set is sure to be complete
            assume_gtid_set_complete = True


        # The operator manages GR, so turn off start_on_boot to avoid conflicts
        create_options = {
            "gtidSetIsComplete": assume_gtid_set_complete,
            "manualStartOnBoot": True,
            "memberSslMode": "REQUIRED" if self.cluster.parsed_spec.tlsUseSelfSigned else "VERIFY_IDENTITY",
        }
        if not self.cluster.parsed_spec.tlsUseSelfSigned:
            logger.info("Using TLS GR authentication")
            rdns = seed_pod.get_cluster().get_tls_issuer_and_subject_rdns()
            create_options["memberAuthType"] = "CERT_SUBJECT"
            create_options["certIssuer"] = rdns["issuer"]
            create_options["certSubject"] = rdns["subject"]
        else:
            logger.info("Using PASSWORD GR authentication")

        create_options.update(common_gr_options)

        cluster_info = {
            "initialDataSource" : initial_data_source,
            "createOptions" : create_options,
        }
        self.cluster.update_cluster_info(cluster_info)

        def should_retry(err):
            if seed_pod.deleting:
                return False
            return True

        with DbaWrap(shellutils.connect_dba(seed_pod.endpoint_co, logger, is_retriable=should_retry)) as dba:
            try:
                self.dba_cluster = dba.get_cluster()
                # maybe from a previous incomplete create attempt
                logger.info("Cluster already exists")
            except:
                self.dba_cluster = None

            seed_pod.add_member_finalizer()

            if not self.dba_cluster:
                self.log_mysql_info(seed_pod, dba.session, logger)

                logger.info(f"CREATE CLUSTER: seed={seed_pod.name}, options={create_options}")

                try:
                    self.dba_cluster = dba.create_cluster(
                        self.dba_cluster_name, create_options)

                    logger.info("create_cluster OK")
                except mysqlsh.Error as e:
                    # If creating the cluster failed, remove the membership finalizer
                    seed_pod.remove_member_finalizer()

                    # can happen when retrying
                    if e.code == errors.SHERR_DBA_BADARG_INSTANCE_ALREADY_IN_GR:
                        logger.info(
                            f"GR already running at {seed_pod.endpoint}, stopping before retrying...")

                        try:
                            dba.session.run_sql("STOP GROUP_REPLICATION")
                        except mysqlsh.Error as e:
                            logger.info(f"Could not stop GR plugin: {e}")
                            # throw a temporary error for a full retry later
                            raise kopf.TemporaryError(
                                "GR already running while creating cluster but could not stop it", delay=3)
                    raise
            # Only on first cluster! Not if this is supposed to join some other
            if not self.cluster.parsed_spec.initDB or not self.cluster.parsed_spec.initDB.cluster_set:
                try:
                    _ = self.dba_cluster.get_cluster_set()
                    # maybe from a previous incomplete create attempt
                    logger.info("ClusterSet already exists")
                except:
                    # TODO: what to do if this fails?
                    self.dba_cluster_set = self.dba_cluster.create_cluster_set(self.dba_cluster_name)

            routing_options = self.cluster.parsed_spec.router.routingOptions
            for routing_option in routing_options:
                try:
                    routing_value = routing_options[routing_option]
                    logger.info(f"Setting Router Option [{routing_option}] to [{routing_value}]")
                    self.dba_cluster_set.set_routing_option(routing_option, routing_value)
                except mysqlsh.Error as e:
                    # We don't fail when setting an option fails
                    logger.warning(f"Failed setting routing option {routing_option} to {routing_value}: {e}")

            self.probe_member_status(seed_pod, dba.session, True, logger)

            logger.debug("Cluster created %s" % self.dba_cluster.status())

            # if there's just 1 pod, then the cluster is ready... otherwise, we
            # need to wait until all pods have joined
            if self.cluster.parsed_spec.instances == 1:
                self.post_create_actions(dba.session, self.dba_cluster, logger)
                self.probe_member_status(seed_pod, dba.session, True, logger)

    def post_create_actions(self, session: 'ClassicSession', dba_cluster: 'Cluster', logger: Logger) -> None:
        logger.info("cluster_controller::post_create_actions")
        # create router account
        user, password = self.cluster.get_router_account()

        update = True
        try:
            session.run_sql("show grants for ?@'%'", [user])
        except mysqlsh.Error as e:
            if e.code == mysqlsh.mysql.ErrorCode.ER_NONEXISTING_GRANT:
                update = False
            else:
                raise
        logger.debug(
            f"{'Updating' if update else 'Creating'} router account {user}")
        dba_cluster.setup_router_account(
            user, {"password": password, "update": update})

        # update read replicas
        for rr in self.cluster.parsed_spec.readReplicas:
            logger.debug(f"Setting {rr.name} replicas to {rr.instances}")
            cluster_objects.update_stateful_set_size(self.cluster, rr, logger)

        # update the router deployment
        n = self.cluster.parsed_spec.router.instances
        if n:
            logger.debug(f"Setting router replicas to {n}")
            router_objects.update_size(self.cluster, n, False, logger)


    def reboot_cluster(self, seed_pod_index: MySQLPod, logger: Logger) -> None:
        pods = self.cluster.get_pods()
        seed_pod = pods[seed_pod_index]

        logger.info(f"Rebooting cluster {self.cluster.name} from pod {seed_pod}...")

        self.dba = shellutils.connect_dba(seed_pod.endpoint_co, logger)

        self.log_mysql_info(seed_pod, self.dba.session, logger)

        seed_pod.add_member_finalizer()

        self.dba_cluster = self.dba.reboot_cluster_from_complete_outage()

        logger.info(f"reboot_cluster_from_complete_outage OK.")

        # rejoin other pods
        for pod in pods:
            if pod.index != seed_pod_index:
                with shellutils.connect_to_pod(pod, logger, timeout=5) as session:
                    try:
                        self.rejoin_instance(pod, session, logger)
                    except:
                        # TODO - verify this will be retried from elsewhere
                        print("==================================================")
                        print(f"INSTANCE  REJOIN FAILED for {pod.name}")
                        print("=================================================")
                        import traceback
                        traceback.print_exc()

        # TODO: May not be in a ClusterSet (old Cluster?)
        cs = self.dba_cluster.get_cluster_set()
        cs_status = cs.status()
        # TODO: is there really a valid case where
        #       cs_status["clusters"][name] won't exist?
        if cs_status.get("clusters", {}).get(self.cluster.name, {}).get("globalStatus") == "INVALIDATED":
            try:
                cs.rejoin_cluster(self.cluster.name)
            except:
                print("========================")
                print("CLUSTERSET REJOIN FAILED")
                print("========================")

                import traceback
                traceback.print_exc()

        status = self.dba_cluster.status()
        logger.info(f"Cluster reboot successful. status={status}")

        self.probe_member_status(seed_pod, self.dba.session, True, logger)


    def force_quorum(self, seed_pod, logger: Logger) -> None:
        logger.info(
            f"Forcing quorum of cluster {self.cluster.name} using {seed_pod.name}...")

        self.connect_to_primary(seed_pod, logger)

        self.dba_cluster.force_quorum_using_partition_of(seed_pod.endpoint_co)

        status = self.dba_cluster.status()
        logger.info(f"Force quorum successful. status={status}")

        # TODO Rejoin OFFLINE members

    def destroy_cluster(self, last_pod, logger: Logger) -> None:
        logger.info(f"Stopping GR for last cluster member {last_pod.name}")

        try:
            with shellutils.connect_to_pod(last_pod, logger, timeout=5) as session:
                # Just stop GR
                session.run_sql("STOP group_replication")
        except Exception as e:
            logger.warning(
                f"Error stopping GR at last cluster member, ignoring... {e}")
            # Remove the pod membership finalizer even if we couldn't do final cleanup
            # (it's just stop GR, which should be harmless most of the time)
            last_pod.remove_member_finalizer()
            return

        logger.info("Stop GR OK")

        last_pod.remove_member_finalizer()

    def reconcile_pod(self, primary_pod: MySQLPod, pod: MySQLPod, logger: Logger) -> None:
        with DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger)) as pod_dba_session:
            cluster = self.connect_to_primary(primary_pod, logger)

            status = diagnose.diagnose_cluster_candidate(
                self.dba.session, cluster, pod, pod_dba_session, logger)

            logger.info(
                f"Reconciling {pod}: state={status.status}  deleting={pod.deleting} cluster_deleting={self.cluster.deleting}")
            if pod.deleting or self.cluster.deleting:
                return

            # TODO check case where a member pod was deleted and then rejoins with the same address but different uuid

            if status.status == diagnose.CandidateDiagStatus.JOINABLE:
                self.cluster.info(action="ReconcilePod", reason="Join",
                                  message=f"Joining {pod.name} to cluster")
                self.join_instance(pod, pod_dba_session, logger)

            elif status.status == diagnose.CandidateDiagStatus.REJOINABLE:
                self.cluster.info(action="ReconcilePod", reason="Rejoin",
                                  message=f"Rejoining {pod.name} to cluster")
                self.rejoin_instance(pod, pod_dba_session.session, logger)

            elif status.status == diagnose.CandidateDiagStatus.MEMBER:
                logger.info(f"{pod.endpoint} already a member")

                self.probe_member_status(pod, pod_dba_session.session, False, logger)

            elif status.status == diagnose.CandidateDiagStatus.UNREACHABLE:
                # TODO check if we should throw a tmp error or do nothing
                logger.error(f"{pod.endpoint} is unreachable")

                self.probe_member_status(pod, pod_dba_session.session, False, logger)
            else:
                # TODO check if we can repair broken instances
                # It would be possible to auto-repair an instance with errant
                # transactions by cloning over it, but that would mean these
                # errants are lost.
                logger.error(f"{pod.endpoint} is in state {status.status}")

                self.probe_member_status(pod, pod_dba_session.session, False, logger)

    def join_instance(self, pod: MySQLPod, pod_dba_session: 'Dba', logger: Logger) -> None:
        logger.info(f"Adding {pod.endpoint} to cluster")

        peer_pod = self.connect_to_cluster(logger)

        self.log_mysql_info(pod, pod_dba_session.session, logger)

        # TODO - always use clone when dataset is big
        # With Shell Bug #33900165 fixed we should use "auto" by default
        # and remove the retry logic below
        recovery_method = "incremental"

        if self.cluster.parsed_spec.initDB and self.cluster.parsed_spec.initDB.meb:
            # With a restore from a MEB backup server might not find the right
            # binlogs for incremental restore and provision an empty replica
            # clone does the right thing
            recovery_method = "clone"

        add_options = {
            "recoveryMethod": recovery_method,
        }

        # TODO : # add_replica_instance doesn't support cert base auth, thus certSubject works only for group-member-s - WL15056
        # If a cluster was created with cert based auth between the group members no replica can join the cluster
        for option in self.dba_cluster.options()["defaultReplicaSet"]["globalOptions"]:
            if option["option"] == "memberAuthType" and option["value"] in ["CERT_SUBJECT", "CERT_SUBJECT_PASSWORD"]:
                rdns = pod.get_cluster().get_tls_issuer_and_subject_rdns()
                # add_instance() needs only certSubject and but not memberAuthType and certIssuer
                add_options["certSubject"] = rdns["subject"]


        if pod.instance_type == "group-member":
            add_options.update(common_gr_options)

        logger.info(
            f"ADD INSTANCE: target={pod.endpoint}  instance_type={pod.instance_type} cluster_peer={peer_pod.endpoint}  options={add_options}...")

        pod.add_member_finalizer()

        report_host = pod_dba_session.session.run_sql('SELECT @@report_host').fetch_one()[0]
        print(f"DBA SESSION GOES TO {report_host}!")

        try:
            if pod.instance_type == "read-replica":
                self.dba_cluster.add_replica_instance(pod.endpoint, add_options)
            else:
                self.dba_cluster.add_instance(pod.endpoint_co, add_options)

            logger.debug("add_instance OK")
        except  (mysqlsh.Error, RuntimeError) as e:
            logger.warning(f"add_instance failed: error={e}")

            # Incremetnal may fail if transactions are missing from binlog
            # retry using clone
            add_options["recoveryMethod"] = "clone"
            logger.warning(f"trying add_instance with clone")
            try:
                if pod.instance_type == "read-replica":
                    self.dba_cluster.add_replica_instance(pod.endpoint, add_options)
                else:
                    self.dba_cluster.add_instance(pod.endpoint_co, add_options)
            except (mysqlsh.Error, RuntimeError) as e:
                logger.warning(f"add_instance failed second time: error={e}")
                raise

        if pod.instance_type == "read-replica":
            # This is not perfect, as we don't track this further, but async
            # replication gives us limited information only
            pod.update_member_readiness_gate("ready", True)
        else:
            with DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger)) as dba_session:
                # TODO: pod_dba_session may be invalid on caller side if the
                #       pod was provisioned via clone, which may lead to future
                #       bugs, also always using a new connection here is "inefficient"
                #       to a small degree.
                #       In case clone is used and we need a reconnect we have
                #       to communicate that to the caller, else we see bugs in
                #       futre
                minfo = self.probe_member_status(pod, dba_session.session,
                                                True, logger)
                member_id, role, status, view_id, version, member_count, reachable_member_count = minfo
            logger.info(f"JOINED {pod.name}: {minfo}")

            # if the cluster size is complete, ensure routers are deployed
            if not router_objects.get_size(self.cluster) and member_count == self.cluster.parsed_spec.instances:
                self.post_create_actions(self.dba.session, self.dba_cluster, logger)

    def rejoin_instance(self, pod: MySQLPod, pod_session, logger: Logger) -> None:
        logger.info(f"Rejoining {pod.endpoint} to cluster")

        if not self.dba_cluster:
            self.connect_to_cluster(logger)

        self.log_mysql_info(pod, pod_session, logger)

        rejoin_options = {}

        logger.info(
            f"rejoin_instance: target={pod.endpoint} options={rejoin_options}...")

        try:
            self.dba_cluster.rejoin_instance(pod.endpoint, rejoin_options)

            logger.debug("rejoin_instance OK")
        except mysqlsh.Error as e:
            logger.warning(f"rejoin_instance failed: error={e}")
            raise

        self.probe_member_status(pod, pod_session, False, logger)

    def remove_instance(self, pod: MySQLPod, pod_body: Body, logger: Logger, force: bool = False) -> None:
        try:
            self.__remove_instance_aux(pod, logger, force)
        except Exception as e:
            logger.info(f"Exception {e} caught")
            pass
        finally:
            # Remove the membership finalizer to allow the pod to be removed
            pod.remove_member_finalizer(pod_body)
            logger.info(f"Removed finalizer for pod {pod_body['metadata']['name']}")

    def __remove_instance_aux(self, pod: MySQLPod, logger: Logger, force: bool = False) -> None:
        print(f"Removing {pod.endpoint} from cluster FORCE={force}")

        # TODO improve this check
        other_pods = self.cluster.get_pods()
        if len(other_pods) == 1 and pod.instance_type == 'group-member':
            print("There is only one pod left in the cluster. Won't remove it, as this will dissolve the cluster. It will be removed only if the cluster is being deleted.")

        if len(other_pods) > 1 or (len(other_pods) > 0 and pod.instance_type == 'read-replica'):
            try:
                print("connect_to_cluster")
                peer_pod = self.connect_to_cluster(logger)
                print(f"peer_pod={peer_pod}")
            except mysqlsh.Error as e:
                peer_pod = None
                if self.cluster.deleting:
                    logger.warning(
                        f"Could not connect to cluster, but ignoring because we're deleting: error={e}")
                else:
                    logger.error(f"Could not connect to cluster: error={e}")
                    raise

            if peer_pod:
                removed = False
                remove_options = {}

                if not force:
                    logger.info(
                        f"remove_instance: {pod.name}  peer={peer_pod.name}  options={remove_options}")
                    try:
                        self.dba_cluster.remove_instance(pod.endpoint, remove_options)
                        removed = True
                        logger.debug("remove_instance OK")
                    except mysqlsh.Error as e:
                        logger.warning(f"remove_instance failed: error={e}")
                        if e.code == mysqlsh.mysql.ErrorCode.ER_OPTION_PREVENTS_STATEMENT:
                            # super_read_only can still be true on a PRIMARY for a
                            # short time
                            raise kopf.TemporaryError(
                                f"{peer_pod.name} is a PRIMARY but super_read_only is ON", delay=5)
                        elif e.code == errors.SHERR_DBA_MEMBER_METADATA_MISSING:
                            # already removed and we're probably just retrying
                            removed = True
                print(f"removed={removed}")
                if not removed:
                    remove_options["force"] = True
                    logger.info(
                        f"remove_instance: {pod.name}  peer={peer_pod.name}  options={remove_options}")
                    try:
                        self.dba_cluster.remove_instance(pod.endpoint, remove_options)

                        logger.info("FORCED remove_instance OK")
                    except mysqlsh.Error as e:
                        logger.warning(f"remove_instance failed: error={e}")
                        if e.code == errors.SHERR_DBA_MEMBER_METADATA_MISSING:
                            pass
                        else:
                            deleting = not self.cluster or self.cluster.deleting
                            if deleting:
                                logger.info(
                                    f"force remove_instance failed. Ignoring because cluster is deleted: error={e}  peer={peer_pod.name}")
                            else:
                                logger.error(
                                    f"force remove_instance failed. error={e} deleting_cluster={deleting}  peer={peer_pod.name}")
                                raise
                    except RuntimeError as e:
                        logger.info(f"force remove_instance failed. RuntimeError {e}")
                        if str(e).find("The cluster object is disconnected") == -1:
                            logger.info(f"Can't do anything to remove {pod.name} cleanly")
                            raise
            else:
                logger.error(
                    f"Cluster is not available, skipping clean removal of {pod.name}")



    def repair_cluster(self, pod: MySQLPod, diagnostic: diagnose.ClusterStatus, logger: Logger) -> None:
        # TODO check statuses where router has to be put down

        # Restore cluster to an ONLINE state
        if diagnostic.status == diagnose.ClusterDiagStatus.ONLINE:
            # Nothing to do
            return

        elif diagnostic.status == diagnose.ClusterDiagStatus.ONLINE_PARTIAL:
            # Nothing to do, rejoins handled on pod events
            return

        elif diagnostic.status == diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN:
            # Nothing to do
            # TODO maybe delete unreachable pods if enabled?
            return

        elif diagnostic.status == diagnose.ClusterDiagStatus.OFFLINE:
            # Reboot cluster if all pods are reachable
            if len([g for g in diagnostic.gtid_executed.values() if g is not None]) == len(self.cluster.get_pods()):
                seed_pod = select_pod_with_most_gtids(diagnostic.gtid_executed)

                self.cluster.info(action="RestoreCluster", reason="Rebooting",
                                    message=f"Restoring OFFLINE cluster through pod {seed_pod}")

                shellutils.RetryLoop(logger).call(self.reboot_cluster, seed_pod, logger)
            else:
                logger.debug(f"Cannot reboot cluster because not all pods are reachable")
                raise kopf.TemporaryError(
                        f"Cluster cannot be restored because there are unreachable pods", delay=5)

        elif diagnostic.status == diagnose.ClusterDiagStatus.OFFLINE_UNCERTAIN:
            # TODO delete unconnectable pods after timeout, if enabled
            raise kopf.TemporaryError(
                f"Unreachable members found while in state {diagnostic.status}, waiting...")

        elif diagnostic.status == diagnose.ClusterDiagStatus.NO_QUORUM:
            # Restore cluster
            self.cluster.info(action="RestoreCluster", reason="RestoreQuorum",
                              message="Restoring quorum of cluster")

            shellutils.RetryLoop(logger).call(
                self.force_quorum, diagnostic.quorum_candidates[0], logger)

        elif diagnostic.status == diagnose.ClusterDiagStatus.NO_QUORUM_UNCERTAIN:
            # Restore cluster
            # TODO delete unconnectable pods after timeout, if enabled
            raise kopf.TemporaryError(
                f"Unreachable members found while in state {diagnostic.status}, waiting...")

        elif diagnostic.status == diagnose.ClusterDiagStatus.SPLIT_BRAIN:
            self.cluster.error(action="UnrecoverableState", reason="SplitBrain",
                               message="Cluster is in a SPLIT-BRAIN state and cannot be restored automatically.")

            # TODO check if recoverable case
            # Fatal error, user intervention required
            raise kopf.PermanentError(
                f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")

        elif diagnostic.status == diagnose.ClusterDiagStatus.SPLIT_BRAIN_UNCERTAIN:
            # TODO check if recoverable case and if NOT, then throw a permanent error
            self.cluster.error(action="UnrecoverableState", reason="SplitBrain",
                               message="Cluster is in state SPLIT-BRAIN with unreachable instances and cannot be restored automatically.")

            raise kopf.PermanentError(
                f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")
            # TODO delete unconnectable pods after timeout, if enabled
            raise kopf.TemporaryError(
                f"Unreachable members found while in state {diagnostic.status}, waiting...")

        elif diagnostic.status == diagnose.ClusterDiagStatus.UNKNOWN:
            # Nothing to do, but we can try again later and hope something comes back
            raise kopf.TemporaryError(
                f"No members of the cluster could be reached. state={diagnostic.status}")

        elif diagnostic.status == diagnose.ClusterDiagStatus.INVALID:
            self.cluster.error(action="UnrecoverableState", reason="Invalid",
                               message="Cluster state is invalid and cannot be restored automatically.")

            raise kopf.PermanentError(
                f"Unable to recover from current cluster state. User action required. state={diagnostic.status}")

        elif diagnostic.status == diagnose.ClusterDiagStatus.FINALIZING:
            # Nothing to do
            return

        else:
            raise kopf.PermanentError(
                f"Invalid cluster state {diagnostic.status}")


    def on_router_tls_changed(self) -> None:
        """
        Router pods need to be recreated in order for new certificates to get
        reloaded.
        """
        pass

    def on_pod_created(self, pod: MySQLPod, logger: Logger) -> None:
        print("on_pod_created: probing cluster")
        diag = self.probe_status(logger)

        print(f"on_pod_created: pod={pod.name} primary={diag.primary} cluster_state={diag.status}")

        if diag.status == diagnose.ClusterDiagStatus.INITIALIZING:
            # If cluster is not yet created, then we create it at pod-0
            if pod.index == 0:
                if self.cluster.get_create_time():
                    raise kopf.PermanentError(
                        f"Internal inconsistency: cluster marked as initialized, but create requested again")

                print("Time to create the cluster")
                shellutils.RetryLoop(logger).call(self.create_cluster, pod, logger)

                # Mark the cluster object as already created
                self.cluster.set_create_time(datetime.datetime.now())
            else:
                # Other pods must wait for the cluster to be ready
                raise kopf.TemporaryError("Cluster is not yet ready", delay=15)

        elif diag.status in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL, diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN):
            print("Reconciling pod")
            # Cluster exists and is healthy, join the pod to it
            shellutils.RetryLoop(logger).call(
                self.reconcile_pod, diag.primary, pod, logger)
        else:
            print("Attempting to repair the cluster")
            self.repair_cluster(pod, diag, logger)

            # Retry from scratch in another iteration
            raise kopf.TemporaryError(f"Cluster repair from state {diag.status} attempted", delay=5)

    def on_pod_restarted(self, pod: MySQLPod, logger: Logger) -> None:
        diag = self.probe_status(logger)
        logger.debug(
            f"on_pod_restarted: pod={pod.name}  primary={diag.primary}  cluster_state={diag.status}")

        if diag.status not in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL):
            self.repair_cluster(pod, diag, logger)

        shellutils.RetryLoop(logger).call(
            self.reconcile_pod, diag.primary, pod, logger)

    def on_pod_deleted(self, pod: MySQLPod, pod_body: Body, logger: Logger) -> None:
        diag = self.probe_status(logger)

        print(f"on_pod_deleted: pod={pod.name}  primary={diag.primary}  cluster_state={diag.status} cluster.deleting={self.cluster.deleting}")

        if self.cluster.deleting:
            # cluster is being deleted, if this is pod-0 shut it down
            if pod.index == 0:
                self.destroy_cluster(pod, logger)
                pod.remove_member_finalizer(pod_body)
                return

        if pod.deleting and diag.status in (diagnose.ClusterDiagStatus.ONLINE, diagnose.ClusterDiagStatus.ONLINE_PARTIAL, diagnose.ClusterDiagStatus.ONLINE_UNCERTAIN, diagnose.ClusterDiagStatus.FINALIZING):
            print(f"REMOVING INSTANCE {pod.name}")
            shellutils.RetryLoop(logger).call(
                self.remove_instance, pod, pod_body, logger)
        else:
            print("ATTEMPTING CLUSTER REPAIR")
            self.repair_cluster(pod, diag, logger)
            # Retry from scratch in another iteration
            print("RETRYING ON POD DELETE")
            raise kopf.TemporaryError(f"Cluster repair from state {diag.status} attempted", delay=3)

        # TODO maybe not needed? need to make sure that shrinking cluster will be reported as ONLINE
        self.probe_status(logger)

    def on_group_view_change(self, members: list[tuple], view_id_changed) -> None:
        """
        Query membership info about the cluster and update labels and
        annotations in each pod.

        This is for monitoring only and should not trigger any changes other
        than in informational k8s fields.
        """
        for pod in self.cluster.get_pods():
            info = pod.get_membership_info()
            if info:
                pod_member_id = info.get("memberId")
            else:
                pod_member_id = None

            for member_id, role, status, view_id, endpoint, version in members:
                if pod_member_id and member_id == pod_member_id:
                    pass
                elif endpoint == pod.endpoint:
                    pass
                else:
                    continue
                pod.update_membership_status(
                    member_id, role, status, view_id, version)
                if status == "ONLINE":
                    pod.update_member_readiness_gate("ready", True)
                else:
                    pod.update_member_readiness_gate("ready", False)
                break

    def on_server_image_change(self, version: str) -> None:
        return self.on_upgrade(version = version)

    def on_server_version_change(self, version: str) -> None:
        return self.on_upgrade(version = version)

    def on_upgrade(self, version: str) -> None:
        # TODO change status as needed - especially on version error, but make sure we recover
        [version_valid, version_error] = utils.version_in_range(version)
        if not version_valid:
            raise kopf.PermanentError(version_error)

    def on_router_upgrade(self, logger: Logger) -> None:
        def on_nonupdated() -> None:
            raise kopf.TemporaryError(f"Cluster {self.cluster.namespace}/{self.cluster.name} unreachable", delay=5)
        router_objects.update_router_account(self.cluster, on_nonupdated, logger)

    def on_change_metrics_user(self, logger: Logger) -> None:
        metrics = self.cluster.parsed_spec.metrics
        self.connect_to_primary(None, logger)

        if not metrics or not metrics.enable:
            # This will use default name. needs to adapt when supporting custom
            # names
            mysqlutils.remove_metrics_user(self.dba.session)
            return

        user = metrics.dbuser_name
        grants = metrics.dbuser_grants
        max_connections = metrics.dbuser_max_connections

        mysqlutils.setup_metrics_user(self.dba.session, user, grants,
                                      max_connections)

    def on_router_pod_delete(self, name: Union[str, list], logger: Logger) -> None:
        self.connect_to_cluster(logger)
        if type(name) is str:
            router_list = [name]
        elif type(name) is list:
            router_list = name
        for router_name in router_list:
            logger.info(f"Removing metadata for router {router_name} from {self.cluster.name}")
            ret = self.dba_cluster.remove_router_metadata(router_name + '::')
            logger.info(f"remove_router_metadata returned {ret}")

    def on_router_routing_option_chahnge(self, old: dict, new: dict, logger: Logger) -> None:
        self.connect_to_primary(None, logger)

        dba = self.dba_cluster
        try:
            dba = self.dba_cluster.get_cluster_set()
            logger.info("ClusterSet enabled cluster")
        except:
            logger.info("Old cluster without clusterset")
            pass

        # Unset removed entries
        for key in old:
            if key not in new:
                try:
                    dba.set_routing_option(key, None)
                except mysqlsh.Error as e:
                    # We don't fail when setting an option fails
                    logger.warning(f"Failed unsetting routing option {key}: {e}")

        # Set new values, this resets existing values
        for key in new:
            try:
                dba.set_routing_option(key, new[key])
            except mysqlsh.Error as e:
                # We don't fail when setting an option fails
                logger.warning(f"Failed setting routing option {key} to {new[key]}: {e}")

