mysqloperator/controller/mysqlutils.py (95 lines of code) (raw):
# Copyright (c) 2020, 2023, Oracle and/or its affiliates.
#
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
import mysqlsh
from typing import List
def is_client_error(code):
return mysqlsh.mysql.ErrorCode.CR_MIN_ERROR <= code <= mysqlsh.mysql.ErrorCode.CR_MAX_ERROR
def clone_server(donor_co, donor_session, recip_session, logger):
"""
Clone recipient server from donor.
If clone already happened, return False, otherwise True.
Throws exception on any error.
"""
mysql = mysqlsh.mysql
donor = f"{donor_co.get('host', 'localhost')}:{donor_co.get('port', 3306)}"
recip_co = mysqlsh.globals.shell.parse_uri(recip_session.uri)
recip = f"{recip_co.get('host', 'localhost')}:{recip_co.get('port', 3306)}"
try:
logger.debug(f"Installing clone plugin at {recip}")
recip_session.run_sql("INSTALL PLUGIN clone SONAME 'mysql_clone.so'")
except mysqlsh.Error as e:
logger.debug(f"Error installing clone plugin at {recip}: {e}")
if e.code == mysql.ErrorCode.ER_UDF_EXISTS:
pass
else:
raise
# Check if clone was already executed from the logs
res = recip_session.run_sql("""SELECT
state, begin_time, end_time, source, error_no, error_message
FROM performance_schema.clone_status
ORDER BY id DESC LIMIT 1""")
row = res.fetch_one()
if row:
logger.info(
f"Previous clone execution detected at {recip}: source={row[3]} status={row[0]} started={row[1]} ended={row[2]} errno={row[4]} error={row[5]}")
if row[0] == "Completed" and row[3] == donor:
return False
# check if the donor has the GR plugin installed and if so, install it
res = donor_session.run_sql("show plugins")
source_plugins = set()
for row in res.fetch_all():
if row[1] == "ACTIVE" and row[3] != None:
source_plugins.add(row[0])
res = recip_session.run_sql("show plugins")
dest_plugins = set()
for row in res.fetch_all():
if row[1] == "ACTIVE" and row[3] != None:
dest_plugins.add(row[0])
missing_plugins = source_plugins - dest_plugins
if missing_plugins:
if "group_replication" in missing_plugins:
try:
logger.debug(f"Installing group_replication plugin at {recip}")
recip_session.run_sql(
"INSTALL PLUGIN group_replication SONAME 'group_replication.so'")
except mysqlsh.Error as e:
logger.debug(
f"Error installing group_replication plugin at {recip}: {e}")
if e.code == mysql.ErrorCode.ER_UDF_EXISTS:
pass
else:
raise
missing_plugins.remove("group_replication")
if missing_plugins:
logger.warning(
f"The following plugins are installed at the donor but not the recipient: {missing_plugins}")
# do other validations that the clone plugin doesn't
logger.info(f"Starting clone from {donor} to {recip}")
try:
recip_session.run_sql("SET GLOBAL clone_valid_donor_list=?", [donor])
recip_session.run_sql("CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", [
donor_co["user"], donor_co["host"], donor_co.get("port", 3306), donor_co["password"]])
except mysqlsh.Error as e:
logger.debug(f"Error executing clone from {donor} at {recip}: {e}")
raise
# If everything went OK, the server should be restarting now.
return True
def setup_backup_account(session, user, password):
session.run_sql(f"DROP USER IF EXISTS {user}")
session.run_sql(f"CREATE USER {user} IDENTIFIED BY ?", [password])
session.run_sql(
f"GRANT select, show databases, show view, lock tables, reload ON *.* TO {user}")
session.run_sql(
f"GRANT backup_admin /*!80020 , show_routine */ ON *.* TO {user}")
def setup_metrics_user(session: 'mysqlsh.ClassicSession', user: str,
grants: List, max_connections: int) -> None:
host = "localhost"
grants = ", ".join(grants)
session.run_sql("DROP USER IF EXISTS ?@?", [user, host])
session.run_sql(
"CREATE USER IF NOT EXISTS ?@? IDENTIFIED WITH auth_socket AS 'daemon' WITH MAX_USER_CONNECTIONS ?", [user, host, max_connections])
session.run_sql("REVOKE ALL PRIVILEGES, GRANT OPTION FROM ?@?", [user, host])
session.run_sql(f"GRANT {grants} ON *.* TO ?@? WITH GRANT OPTION", [user, host])
def remove_metrics_user(session: 'mysqlsh.ClassicSession') -> None:
user = "mysqlmetrics"
host = "localhost"
session.run_sql("DROP USER IF EXISTS ?@?", [user, host])
def count_gtids(gtid_set: str) -> int:
"""Return number of transactions in the GTID set"""
def count_range(r):
begin, _, end = r.partition("-")
if not end:
return 1
else:
return int(end)-int(begin)+1
n = 0
for g in gtid_set.replace("\n", "").split(","):
for r in g.split(":")[1:]:
n += count_range(r)
return n