mysqloperator/controller/group_monitor.py (172 lines of code) (raw):

# Copyright (c) 2020, 2021, Oracle and/or its affiliates. # # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ # from logging import Logger from typing import Callable, Optional, TYPE_CHECKING, Tuple, List from mysqloperator.controller.innodbcluster.cluster_api import InnoDBCluster from mysqloperator.controller.shellutils import RetryLoop from . import shellutils from logging import Logger, getLogger import threading import time import select import mysqlsh mysql = mysqlsh.mysql mysqlx = mysqlsh.mysqlx k_connect_retry_interval = 10 class MonitoredCluster: def __init__(self, cluster: InnoDBCluster, account: Tuple[str, str], handler: Callable[[InnoDBCluster, list[tuple], bool], None]): self.cluster = cluster self.account = account self.session = None self.target = None self.target_not_primary = None self.last_connect_attempt = 0 self.last_primary_id = None self.last_view_id = None self.handler = handler self.logger: Logger = getLogger(f"CM_{self.cluster.name}") @property def name(self) -> str: return self.cluster.name @property def namespace(self) -> str: return self.cluster.namespace def ensure_connected(self) -> Optional['mysqlx.Session']: # TODO run a ping every X seconds if not self.session and (not self.last_connect_attempt or time.time() - self.last_connect_attempt > k_connect_retry_interval): self.logger.info(f"Trying to connect to a member of cluster {self.cluster.namespace}/{self.cluster.name}") self.last_connect_attempt = time.time() self.session = None self.connect_to_primary() # force a refresh after we connect so we don't miss anything # that happened while we were out if self.session: self.logger.info(f"Connection to member of {self.cluster.namespace}/{self.cluster.name} OK {self.session}") self.on_view_change(None) else: self.logger.error(f"Connection to member of {self.cluster.namespace}/{self.cluster.name} FAILED") return self.session def connect_to_primary(self) -> None: while True: session, is_primary = self.find_primary() if not is_primary: if session: self.logger.error(f"Could not connect to PRIMARY of cluster {self.cluster.namespace}/{self.cluster.name}") else: self.logger.error(f"Could not connect to neither PRIMARY nor SECONDARY of cluster {self.cluster.namespace}/{self.cluster.name}") if session: try: # extend number of seconds for the server to wait for a command to arrive to a full day session.run_sql(f"set session mysqlx_wait_timeout = {24*60*60}") session._enable_notices(["GRViewChanged"]) co = shellutils.parse_uri(session.uri) self.target = f"{co['host']}:{co['port']}" self.target_not_primary = not is_primary self.session = session except mysqlsh.Error as e: if mysql.ErrorCode.CR_MAX_ERROR >= e.code >= mysql.ErrorCode.CR_MIN_ERROR: # Try again if the server we were connectd to is gone continue else: raise else: self.session = None break def find_primary(self) -> Tuple[Optional['mysqlx.Session'], bool]: not_primary = None pods = self.cluster.get_pods() # Try to find the PRIMARY the easy way for pod in pods: member_info = pod.get_membership_info() if member_info and member_info.get("role") == "PRIMARY": session = self.try_connect(pod) if session: s = shellutils.jump_to_primary(session, self.account) if s: if s != session: session.close() return s, True else: not_primary = session # Try to connect to anyone and find the primary from there for pod in pods: session = self.try_connect(pod) if session: s = shellutils.jump_to_primary(session, self.account) if s: if s != session: session.close() return s, True else: not_primary = session return not_primary, False def try_connect(self, pod) -> Optional['mysqlx.Session']: try: session = mysqlx.get_session(pod.xendpoint_co) except mysqlsh.Error as e: self.logger.error(f"ERROR CONNECTING TO {pod.xendpoint}: {e}") return None return session def handle_notice(self) -> None: while 1: try: # TODO hack to force unexpected async notice to be read, xsession should read packets itself self.session.run_sql("select 1") notice = self.session._fetch_notice() if not notice: break self.logger.info(f"GOT NOTICE {notice}") self.on_view_change(notice.get("view_id")) if not self.session: break except mysqlsh.Error as e: self.logger.error(f"ERROR FETCHING NOTICE: dest={self.target} error={e}") self.session.close() self.session = None break def on_view_change(self, view_id: Optional[str]) -> None: members = shellutils.query_members(self.session) self.handler(self.cluster, members, view_id != self.last_view_id) self.last_view_id = view_id primary = None force_reconnect = False for member_id, role, status, view_id, endpoint, version in members: if self.last_primary_id == member_id and role != "PRIMARY": force_reconnect = True break if role == "PRIMARY" and not primary: primary = member_id self.last_primary_id = primary # force reconnection if the PRIMARY changed or we're not connected to the PRIMARY if self.target_not_primary or force_reconnect: self.logger.info(f"PRIMARY changed for {self.cluster.namespace}/{self.cluster.name}") if self.session: self.session.close() self.session = None # TODO change this to a per cluster kopf.daemon? class GroupMonitor(threading.Thread): def __init__(self): super().__init__(daemon=True, name="group-monitor") self.clusters : List[MonitoredCluster] = [] self.stopped = False self.logger: Logger = getLogger("GROUP_MONITOR") def monitor_cluster(self, cluster: InnoDBCluster, handler: Callable[[InnoDBCluster, list[tuple], bool], None], logger: Logger) -> None: for c in self.clusters: if c.name == cluster.name and c.namespace == cluster.namespace: return # We could get called here before the Secret is ready account = RetryLoop(self.logger).call(cluster.get_admin_account) target = MonitoredCluster(cluster, account, handler) self.clusters.append(target) self.logger.info(f"ADDED A MONITOR FOR {cluster.namespace}/{cluster.name}") def remove_cluster(self, cluster: InnoDBCluster) -> None: for c in self.clusters: if c.name == cluster.name and c.namespace == cluster.namespace: self.clusters.remove(c) self.logger.info(f"REMOVED THE MONITOR OF CLUSTER {cluster.namespace}/{cluster.name}") break def run(self) -> None: last_ping = time.time() while not self.stopped: session_fds_to_cluster = {} for cluster in self.clusters: cluster.ensure_connected() if cluster.session: session_fds_to_cluster[cluster.session._get_socket_fd()] = cluster # wait for 1s at most so that newly added session don't wait much # TODO replace poll_sessions() with something to get the session fd # - do the poll loop in python # - add a socket_pair() to allow interrupting the poll when a new # cluster is added and increase the timeout ready, _, _ = select.select(session_fds_to_cluster.keys(), [], [], 1000) for fd in ready: session_fds_to_cluster[fd].handle_notice() def stop(self) -> None: self.stopped = True g_group_monitor = GroupMonitor()