mysqloperator/controller/shellutils.py (241 lines of code) (raw):
# Copyright (c) 2020, 2021, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
from logging import Logger
from .innodbcluster.cluster_api import MySQLPod
import typing
from typing import Any, Optional, Callable, TYPE_CHECKING, Union
import mysqlsh
import kopf
import time
if TYPE_CHECKING:
from mysqlsh.mysql import ClassicSession
from mysqlsh import Dba, Cluster
mysql = mysqlsh.mysql
mysqlx = mysqlsh.mysqlx
# MySQL connection errors that are not supposed to happen while connecting to
# a cluster member. If these happen there's either a bug or someone/thing broke
# the cluster. There's no point in retrying after these.
FATAL_CONNECT_ERRORS = set([
# Authentication errors aren't supposed to happen because we
# only use an account we own, so access denied would indicate
# something or someone broke our account or worse.
mysql.ErrorCode.ER_ACCESS_DENIED_ERROR,
mysql.ErrorCode.ER_ACCOUNT_HAS_BEEN_LOCKED
])
# Same as above, but for errors that happen while executing SQL.
FATAL_SQL_ERRORS = set([
mysql.ErrorCode.ER_MUST_CHANGE_PASSWORD,
mysql.ErrorCode.ER_NO_DB_ERROR,
mysql.ErrorCode.ER_NO_SUCH_TABLE,
mysql.ErrorCode.ER_UNKNOWN_SYSTEM_VARIABLE,
mysql.ErrorCode.ER_SPECIFIC_ACCESS_DENIED_ERROR,
mysql.ErrorCode.ER_TABLEACCESS_DENIED_ERROR,
mysql.ErrorCode.ER_COLUMNACCESS_DENIED_ERROR
])
FATAL_MYSQL_ERRORS = FATAL_CONNECT_ERRORS.union(FATAL_SQL_ERRORS)
def check_fatal_connect(err, where, logger) -> bool:
if err.code in FATAL_MYSQL_ERRORS:
logger.error(
f"Unexpected error connecting to MySQL. This error is not expected and may indicate a bug or corrupted cluster deployment: error={err} target={where}")
return True
return False
def check_fatal(err, where, context, logger) -> bool:
if err.code in FATAL_SQL_ERRORS:
logger.error(
f"Unexpected MySQL error. This error is not expected and may indicate a bug or corrupted cluster deployment: error={err} target={where}{' context=%s' % context if context else ''}")
return True
return False
class Timeout(Exception):
pass
class GiveUp(Exception):
def __init__(self, real_exc=None):
self.real_exc = real_exc
T = typing.TypeVar("T")
class RetryLoop:
def __init__(self, logger: Logger, timeout: int = 60,
max_tries: Optional[int] = None,
is_retriable: Optional[Callable] = None,
backoff: Callable[[int], int] = lambda i: i+1):
self.logger = logger
self.timeout = timeout
self.max_tries = max_tries
self.backoff = backoff
self.is_retriable = is_retriable
def call(self, f: Callable[..., T], *args, **kwargs) -> T:
delay = 1
tries = 0
total_wait = 0
while True:
try:
tries += 1
return f(*args)
except (kopf.PermanentError, kopf.TemporaryError) as err:
# Don't retry kopf errors
raise
except GiveUp as err:
self.logger.error(
f"Error executing {f.__qualname__}, giving up: {err.real_exc}")
if err.real_exc:
raise err.real_exc
else:
return None
except mysqlsh.Error as err:
#import traceback
#print("=====")
#print(traceback.format_exc())
#print("=====")
if self.is_retriable and not self.is_retriable(err):
raise
if total_wait < self.timeout and (self.max_tries is None or tries < self.max_tries):
self.logger.info(
f"Error executing {f.__qualname__}, retrying after {delay}s: {err}")
time.sleep(delay)
total_wait += delay
delay = self.backoff(delay)
# TODO - do we want it this way? need it for debug now
import traceback
print(traceback.format_exc())
else:
self.logger.error(
f"Error executing {f.__qualname__}, giving up: {err}")
raise
class SessionWrap:
def __init__(self, session: Union['ClassicSession', dict]) -> None:
if isinstance(session, dict):
try:
self.session = mysql.get_session(session)
except mysqlsh.Error as e:
url = session.copy()
if "password" in url:
del url["password"]
url = mysqlsh.globals.shell.unparse_uri(url)
raise mysqlsh.Error(
e.code, f"Error connecting to {url}: {e.msg}")
else:
self.session = session
def __enter__(self) -> 'ClassicSession':
return self.session
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.session.close()
def __getattr__(self, name) -> Any:
return getattr(self.session, name)
class DbaWrap:
def __init__(self, dba: 'Dba'):
self.dba = dba
def __enter__(self):
return self.dba
def __exit__(self, exc_type, exc_value, traceback):
self.dba.session.close()
def __getattr__(self, name):
return getattr(self.dba, name)
class ClusterWrap:
def __init__(self, cluster: 'Cluster'):
self.cluster = cluster
def __enter__(self):
return self.cluster
def __exit__(self, exc_type, exc_value, traceback):
self.cluster.disconnect()
def __getattr__(self, name):
return getattr(self.cluster, name)
def connect_dba(target: dict, logger: Logger, **kwargs) -> 'Dba':
return RetryLoop(logger, **kwargs).call(mysqlsh.connect_dba, target)
def connect_to_pod_dba(pod: MySQLPod, logger: Logger, **kwargs) -> 'Dba':
return connect_dba(pod.endpoint_co, logger)
def connect_to_pod(pod: MySQLPod, logger: Logger, **kwargs):
def connect(target):
session = mysqlsh.mysql.get_session(target)
# avoid trouble with global autocommit=0
session.run_sql("set autocommit=1")
# make sure there's no global ansi_quotes or anything like that
session.run_sql("set sql_mode=''")
try:
# avoid problems with GR consistency during queries, if GR is running
session.run_sql("set group_replication_consistency='EVENTUAL'")
except:
pass
return SessionWrap(session)
return RetryLoop(logger, **kwargs).call(connect, pod.endpoint_co)
def jump_to_primary(session, account):
# Check if we're already the PRIMARY
res = session.run_sql(
"SELECT member_role, member_host, (member_host = cast(coalesce(@@report_host, @@hostname) as char ascii)) as me"
" FROM performance_schema.replication_group_members"
" ORDER BY member_host")
r = res.fetch_one()
while r:
if r[0] == "PRIMARY":
if r[2]: # we're the primary
return session
else:
# connect to the PRIMARY using the same credentials
co = mysqlsh.globals.shell.parse_uri(session.uri)
co["user"], co["password"] = account
co["host"] = r[1]
try:
return mysqlx.get_session(co)
except mysqlsh.Error as e:
print(
f"Could not connect to {co['host']}:{co['port']}: {e}")
# continue in case we're in multi-primary mode
r = res.fetch_one()
return None
def jump_to_primary_cluster(session, account):
# TODO - Do we ereally need this!?
"""Return a primary_session to the primary instance of the primary cluster
This is especially relevant in a ClusterSet, it might create an intermediate
connection in case it picks a secondary of the primary cluster first.
Returns None if no instance can bereached or if priamry can not be reached
"""
# TODO see if we can use cs.describe()
# TODO consider being in a partition etc (https://corparch-core-srv.slack.com/archives/CK025KBV1/p1712577673711939)
res = session.run_sql("""
SELECT instances.xendpoint
FROM mysql_innodb_cluster_metadata.v2_instances instances
JOIN mysql_innodb_cluster_metadata.v2_cs_members cs_primary ON instances.cluster_id = cs_primary.cluster_id
JOIN mysql_innodb_cluster_metadata.v2_cs_members cs_members ON cs_primary.clusterset_id = cs_members.clusterset_id AND cs_primary.member_role = 'PRIMARY'
JOIN mysql_innodb_cluster_metadata.v2_this_instance this ON cs_members.cluster_id=this.cluster_id
WHERE instances.instance_type = 'group-member'
""")
r = res.fetch_one()
while r:
co = mysqlsh.globals.shell.parse_uri(r[0]) # session.uri)
print(co)
co["user"], co["password"] = account
#co["host"] = r[0]
print(co)
try:
print(f"Trying to connect to {r[0]} on primary cluster")
tmp_session = mysqlx.get_session(co)
primary_session = jump_to_primary(tmp_session, account)
if tmp_session != primary_session:
tmp_session.close()
return primary_session
except mysqlsh.Error as e:
print(f"Could not connect to {co['host']}:{co['port']}: {e}")
r = res.fetch_one()
return None
def get_valid_cluster_handle(cluster, logger):
"""
Try to get a cluster handle from any ONLINE pod.
"""
ignore_pods = []
def try_once():
last_err = None
for pod in cluster.get_pods():
if pod.name not in ignore_pods:
try:
dba = mysqlsh.connect_dba(pod.endpoint_co)
except Exception as e:
logger.warning(
f"Could not connect: target={pod.endpoint} error={e}")
last_err = e
continue
try:
return pod, dba, dba.get_cluster()
except Exception as e:
logger.warning(
f"get_cluster: target={pod.endpoint} error={e}")
last_err = e
continue
# If failed because of exception, then throw exception which will
# cause a retry
if last_err:
raise last_err
# If failed because no pods, then just return None
return None, None, None
return RetryLoop(logger).call(try_once)
def query_membership_info(session):
row = session.run_sql("""SELECT m.member_id, m.member_role, m.member_state, s.view_id, m.member_version,
(SELECT count(*) FROM performance_schema.replication_group_members) as member_count,
(SELECT count(*) FROM performance_schema.replication_group_members WHERE member_state <> 'UNREACHABLE') as reachable_member_count
FROM performance_schema.replication_group_members m
JOIN performance_schema.replication_group_member_stats s
ON m.member_id = s.member_id
WHERE m.member_id = @@server_uuid""").fetch_one()
if row:
member_id = row[0] or ""
role = row[1] or ""
status = row[2]
view_id = row[3] or ""
version = row[4]
member_count = row[5]
reachable_member_count = row[6]
else:
member_id = ""
role = ""
status = "OFFLINE"
view_id = ""
version = ""
member_count = None
reachable_member_count = None
return member_id, role, status, view_id, version, member_count, reachable_member_count
def query_members(session) -> list[tuple]:
res = session.run_sql("""SELECT m.member_id, m.member_role, m.member_state,
s.view_id, concat(m.member_host, ':', m.member_port), m.member_version
FROM performance_schema.replication_group_members m
JOIN performance_schema.replication_group_member_stats s
ON m.member_id = s.member_id""")
members = []
row = res.fetch_one()
while row:
member_id = row[0] or ""
role = row[1] or ""
status = row[2]
view_id = row[3] or ""
endpoint = row[4] or ""
version = row[5]
members.append((member_id, role, status, view_id, endpoint, version))
row = res.fetch_one()
return members
def parse_uri(uri):
return mysqlsh.globals.shell.parse_uri(uri)