unittest/scripts/setup_py/dump_utils.inc (394 lines of code) (raw):
import json
import re
import urllib.parse
def md5_table(session, schema, table, where = "", partitions = []):
columns = []
has_pri = 0 != session.run_sql("SELECT COUNT(*) FROM information_schema.statistics WHERE index_name = 'PRIMARY' AND table_schema = ? AND table_name = ?", [schema, table]).fetch_one()[0]
for c in session.run_sql("desc !.!", [schema, table]).fetch_all():
columns.append(c[0])
query = f"SELECT @crc := sha1(concat(@crc, sha1(concat_ws('#', {', '.join(['convert(! using binary)' for c in columns])})))),@cnt := @cnt + 1 as discard from !.!"
params = columns + [schema, table]
if partitions:
query += f" PARTITION ({', '.join(['!' for p in partitions])})"
params += partitions
if has_pri:
query += " use index(PRIMARY)"
if where:
query += f" WHERE {where}"
if not has_pri:
query += f" ORDER BY {', '.join(['!' for c in columns])}"
params += columns
session.run_sql("SET @crc = ''")
session.run_sql("SET @cnt = 0")
session.run_sql(query, params)
row = session.run_sql("select @crc, @cnt").fetch_one()
return {"sha1": row[0], "count": row[1]}
def snapshot_account(session, auser, ahost):
user = {}
user["create"] = session.run_sql("SHOW CREATE USER ?@?", [auser, ahost]).fetch_one()[0]
user["grants"] = []
for row in session.run_sql("SHOW GRANTS FOR ?@?", [auser, ahost]).fetch_all():
user["grants"].append(row[0])
return user
def snapshot_accounts(session):
accounts = {}
for row in session.run_sql("SELECT user, host FROM mysql.user").fetch_all():
user = snapshot_account(session, row[0], row[1])
name = row[0] + "@" + row[1]
accounts[name]=user
return accounts
def snapshot_table_data(session, schema, table):
# CHECKSUM TABLE returns different values for floating point values when they're loaded with LOAD DATA - BUG#31071891
# CHECKSUM TABLE doesn't work well with JSON columns (5.7)
# cksum = session.run_sql("CHECKSUM TABLE !.!", [schema, table]).fetch_one()[1]
# count = session.run_sql("SELECT count(*) FROM !.!", [schema, table])).fetch_one()[0]
# return {"checksum":cksum, "rowcount":count}
return md5_table(session, schema, table)
def snapshot_tables_and_triggers(session, schema):
tables = {}
for row in session.run_sql("SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = ? and table_type = 'BASE TABLE'", [schema]).fetch_all():
name = row[0]
tables[name] = snapshot_table_data(session, schema, name)
tables[name]["ddl"] = session.run_sql("SHOW CREATE TABLE !.!", [schema, name]).fetch_one()[1]
# on Windows "SHOW CREATE" keeps adding slashes at the end of a DATA|INDEX directory, mitigate that
tables[name]["ddl"] = re.compile(r"\/+").sub("/", tables[name]["ddl"])
triggers = {}
for row in session.run_sql("SELECT trigger_name FROM information_schema.triggers WHERE trigger_schema = ? and event_object_table = ?", [schema, name]).fetch_all():
trigger = row[0]
triggers[trigger] = session.run_sql("SHOW CREATE TRIGGER !.!", [schema, trigger]).fetch_one()[2]
tables[name]["triggers"] = triggers
return tables
def snapshot_views(session, schema):
tables = {}
for row in session.run_sql("SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = ? and table_type = 'VIEW'", [schema]).fetch_all():
name = row[0]
tables[name] = {}
tables[name]["ddl"] = session.run_sql("SHOW CREATE VIEW !.!", [schema, name]).fetch_one()[1]
return tables
def snapshot_routines(session, schema, type):
routines = {}
for row in session.run_sql(f"SELECT ROUTINE_NAME FROM information_schema.routines WHERE routine_schema = ? and routine_type = '{type}'", [schema]).fetch_all():
name = row[0]
routines[name] = {}
routines[name]["ddl"] = session.run_sql(f"SHOW CREATE {type} !.!", [schema, name]).fetch_one()[2]
return routines
def snapshot_procedures(session, schema):
return snapshot_routines(session, schema, "PROCEDURE")
def snapshot_functions(session, schema):
return snapshot_routines(session, schema, "FUNCTION")
def snapshot_events(session, schema):
events = {}
for row in session.run_sql("SHOW EVENTS IN !", [schema]).fetch_all():
name = row.Name
events[name] = {}
events[name]["ddl"] = session.run_sql("SHOW CREATE EVENT !.!", [schema, name]).fetch_one()[3]
return events
def snapshot_schema(session, schema):
obj = {}
obj["tables"] = snapshot_tables_and_triggers(session, schema)
obj["views"] = snapshot_views(session, schema)
obj["procedures"] = snapshot_procedures(session, schema)
obj["functions"] = snapshot_functions(session, schema)
obj["events"] = snapshot_events(session, schema)
return obj
def snapshot_schemas(session):
schemas = {}
for row in session.run_sql("SELECT SCHEMA_NAME FROM information_schema.schemata WHERE schema_name not in ('sys', 'information_schema', 'mysql', 'performance_schema')").fetch_all():
name = row[0]
schemas[name] = snapshot_schema(session, name)
schemas[name]["ddl"] = session.run_sql("SHOW CREATE SCHEMA !", [name]).fetch_one()[1]
return schemas
def snapshot_tablespaces(session):
# not supported atm
return {}
def snapshot_instance(session):
snapshot = {}
snapshot["accounts"] = snapshot_accounts(session)
snapshot["tablespaces"] = snapshot_tablespaces(session)
snapshot["schemas"] = snapshot_schemas(session)
# normalize the JSON encoding
return json.loads(json.dumps(snapshot))
def wipeout_server(session):
session.run_sql("set foreign_key_checks=0")
schemas = session.run_sql("show schemas").fetch_all()
for schema, in schemas:
if schema in ("mysql", "sys", "performance_schema", "information_schema"):
continue
session.run_sql("drop schema `"+schema+"`")
for user,host in session.run_sql("select user,host from mysql.user").fetch_all():
if user not in ["root", "mysql.infoschema", "mysql.sys", "mysql.session"]:
session.run_sql("drop user ?@?", [user,host])
session.run_sql("reset master")
session.run_sql("set global foreign_key_checks=1")
def truncate_all_tables(session):
session.run_sql("set foreign_key_checks=0")
schemas = session.run_sql("show schemas").fetch_all()
for schema, in schemas:
if schema in ("mysql", "sys", "performance_schema", "information_schema"):
continue
tables = session.run_sql("show full tables in `%s`" % schema).fetch_all()
for table, t in tables:
if t == "BASE TABLE":
session.run_sql("truncate table `%s`.`%s`" % (schema, table))
session.run_sql("set foreign_key_checks=1")
def compare_query_results(session1, session2, query, args=[], ignore_columns=[]):
r1=list([list([str(f) for i,f in enumerate(r) if i not in ignore_columns]) for r in session1.run_sql(query, args).fetch_all()])
r1.sort()
r2=list([list([str(f) for i,f in enumerate(r) if i not in ignore_columns]) for r in session2.run_sql(query, args).fetch_all()])
r2.sort()
EXPECT_EQ(len(r1), len(r2), query + ", args: " + str(args))
for i in range(len(r1)):
EXPECT_EQ(str(r1[i]), str(r2[i]) if i < len(r2) else "<missing>")
return r1
def compare_schema(session1, session2, schema, check_rows=True):
tables = compare_query_results(session1, session2, "select table_name from information_schema.tables where table_schema=? and table_type='BASE TABLE' order by table_name", [schema])
for table, in tables:
compare_query_results(session1, session2, "show create table `"+schema+"`.`"+table+"`")
if check_rows:
columns = [c[0] for c in session1.run_sql("SELECT COLUMN_NAME FROM information_schema.columns WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION;", [schema, table]).fetch_all()]
compare_query_results(session1, session2, "select {0} from !.!".format(("MD5(!)," * len(columns))[:-1]), columns + [schema, table])
views = compare_query_results(session1, session2, "select table_name from information_schema.tables where table_schema=? and table_type='VIEW' order by table_name", [schema])
for view, in views:
compare_query_results(session1, session2, "show create view `"+schema+"`.`"+view+"`")
# ignore Originator in the show events output
compare_query_results(session1, session2, "show events in `"+schema+"`", ignore_columns=[11])
compare_query_results(session1, session2, "select TRIGGER_NAME,EVENT_MANIPULATION,EVENT_OBJECT_SCHEMA,EVENT_OBJECT_TABLE,ACTION_ORDER,ACTION_CONDITION,ACTION_STATEMENT,ACTION_ORIENTATION,ACTION_TIMING,ACTION_REFERENCE_OLD_TABLE,ACTION_REFERENCE_NEW_TABLE,ACTION_REFERENCE_OLD_ROW,ACTION_REFERENCE_NEW_ROW,SQL_MODE,DEFINER,CHARACTER_SET_CLIENT,COLLATION_CONNECTION,DATABASE_COLLATION from information_schema.triggers where trigger_schema=? order by trigger_name",[schema])
compare_query_results(session1, session2, "select SPECIFIC_NAME,ROUTINE_SCHEMA,ROUTINE_NAME,ROUTINE_TYPE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,CHARACTER_OCTET_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE,DATETIME_PRECISION,CHARACTER_SET_NAME,COLLATION_NAME,DTD_IDENTIFIER,ROUTINE_BODY,ROUTINE_DEFINITION,EXTERNAL_NAME,EXTERNAL_LANGUAGE,PARAMETER_STYLE,IS_DETERMINISTIC,SQL_DATA_ACCESS,SQL_PATH,SECURITY_TYPE,SQL_MODE,ROUTINE_COMMENT,DEFINER,CHARACTER_SET_CLIENT,COLLATION_CONNECTION,DATABASE_COLLATION from information_schema.routines where routine_schema=? order by specific_name", [schema])
def compare_schemas(session1, session2, check_rows=True):
schemas = compare_query_results(session1, session2, "show schemas")
for schema, in schemas:
if schema in ("mysql", "sys", "performance_schema", "information_schema"):
continue
compare_schema(session1, session2, schema, check_rows)
def compare_user_grants(session1, session2, user):
grants1 = session1.run_sql(f"show grants for {user}").fetch_all()
grants2 = session2.run_sql(f"show grants for {user}").fetch_all()
EXPECT_EQ(str(grants1), str(grants2), f"grants for {user}")
def compare_users(session1, session2):
users = set()
for s in [session1, session2]:
for user, in s.run_sql("select concat(quote(user), '@', quote(host)) from mysql.user").fetch_all():
users.add(user)
for user in users:
compare_user_grants(session1, session2, user)
def compare_servers(session1, session2, *, check_rows=True, check_users=True):
compare_schemas(session1, session2, check_rows)
if check_users:
compare_users(session1, session2)
def CHECK_OUTPUT_SANITY(outdir, min_chunk_size, min_chunks, allow_min_chunk_size_errors=0):
chunks_per_table = {}
files = []
print()
print(outdir)
for f in os.listdir(outdir):
if not f.endswith(".tsv"):
continue
fsize = os.stat(os.path.join(outdir, f)).st_size
print("%-30s\t%-10s"%(f, fsize))
files.append((f, fsize))
if f.count("@") >= 2:
table = "@".join(f.split("@")[:2])
if table not in chunks_per_table:
chunks_per_table[table] = 1
else:
chunks_per_table[table] += 1
min_chunk_size_errors = 0
for f, fsize in files:
# don't check the last file if there are many chunks
if "@@" not in f or len(files) == 1:
if fsize < min_chunk_size:
min_chunk_size_errors += 1
if min_chunk_size_errors > allow_min_chunk_size_errors:
EXPECT_LE(min_chunk_size, fsize, "Size of "+f+" too small")
for t, c in chunks_per_table.items():
EXPECT_LE(min_chunks, c, "Too few chunks for "+t)
def get_test_user_name(name):
return f"test_{name}"
def get_test_user_account(name):
return f"'{get_test_user_name(name)}'@'{__host}'"
def get_user_account_for_output(name):
return name if __version_num < 80000 else name.replace("'", "`")
def test_user_uri(port):
return f"mysql://{get_test_user_name(test_user)}:{test_user_pwd}@{__host}:{port}"
class Compatibility_issue:
def __init__(self, error, fixed, warning=None):
self.__error_msg = "ERROR: " + error if error else ""
self.__warning_msg = "WARNING: " + warning if warning else ""
self.__fixed_msg = "NOTE: " + fixed if fixed else ""
def error(self):
if self.__error_msg:
return self.__error_msg
else:
raise Exception("No error message")
def error_no_prefix(self):
return self.error()[7:]
def warning(self):
if self.__warning_msg:
return self.__warning_msg
else:
raise Exception("No warning message")
def warning_no_prefix(self):
return self.warning()[9:]
def fixed(self):
if self.__fixed_msg:
return self.__fixed_msg
else:
raise Exception("No fixed message")
def fixed_no_prefix(self):
return self.fixed()[6:]
def strip_restricted_grants(user, privileges):
all_privileges = ", ".join(sorted(privileges))
plural = "s" if len(privileges) > 1 else ""
error = f"User {user} is granted restricted privilege{plural}: {all_privileges} (fix this with 'strip_restricted_grants' compatibility option)"
fixed = f"User {user} had restricted privilege{plural} ({all_privileges}) removed"
return Compatibility_issue(error, fixed)
def strip_invalid_grants(user, grant, object_type):
error = f"User {user} has grant statement on a non-existent {object_type} ({grant}) (fix this with 'strip_invalid_grants' compatibility option)"
fixed = f"User {user} had grant statement on a non-existent {object_type} removed ({grant})"
return Compatibility_issue(error, fixed)
def comment_data_index_directory(schema, table):
error = ""
fixed = f"Table `{schema}`.`{table}` had {{DATA|INDEX}} DIRECTORY table option commented out"
return Compatibility_issue(error, fixed)
def comment_encryption(schema, table):
error = ""
fixed = f"Table `{schema}`.`{table}` had ENCRYPTION table option commented out"
return Compatibility_issue(error, fixed)
def force_innodb_unsupported_storage(schema, table, engine = "MyISAM"):
error = f"Table `{schema}`.`{table}` uses unsupported storage engine {engine} (fix this with 'force_innodb' compatibility option)"
fixed = f"Table `{schema}`.`{table}` had unsupported engine {engine} changed to InnoDB"
return Compatibility_issue(error, fixed)
def force_innodb_row_format_fixed(schema, table):
error = f"Table `{schema}`.`{table}` uses unsupported ROW_FORMAT=FIXED option (fix this with 'force_innodb' compatibility option)"
fixed = f"Table `{schema}`.`{table}` had unsupported ROW_FORMAT=FIXED option removed"
return Compatibility_issue(error, fixed)
def strip_tablespaces(schema, table):
error = f"Table `{schema}`.`{table}` uses unsupported tablespace option (fix this with 'strip_tablespaces' compatibility option)"
fixed = f"Table `{schema}`.`{table}` had unsupported tablespace option removed"
return Compatibility_issue(error, fixed)
def strip_definers_definer_clause(schema, name, label = "View"):
error = f"{label} `{schema}`.`{name}` - definition uses DEFINER clause set to user `root`@`localhost` which can only be executed by this user or a user with SET_USER_ID or SUPER privileges (fix this with 'strip_definers' compatibility option)"
fixed = f"{label} `{schema}`.`{name}` had definer clause removed"
return Compatibility_issue(error, fixed)
def strip_definers_security_clause(schema, name, label = "View"):
error = f"{label} `{schema}`.`{name}` - definition does not use SQL SECURITY INVOKER characteristic, which is required (fix this with 'strip_definers' compatibility option)"
fixed = f"{label} `{schema}`.`{name}` had SQL SECURITY characteristic set to INVOKER"
return Compatibility_issue(error, fixed)
def skip_invalid_accounts_plugin(user, plugin):
error = f"User {user} is using an unsupported authentication plugin '{plugin}' (fix this with 'skip_invalid_accounts' compatibility option)"
fixed = f"User {user} is using an unsupported authentication plugin '{plugin}', this account has been removed from the dump"
return Compatibility_issue(error, fixed)
def skip_invalid_accounts_no_password(user):
error = f"User {user} does not have a password set (fix this with 'skip_invalid_accounts' compatibility option)"
fixed = f"User {user} does not have a password set, this account has been removed from the dump"
return Compatibility_issue(error, fixed)
def create_invisible_pks(schema, table):
error = f"Table `{schema}`.`{table}` does not have a Primary Key, which is required for High Availability in MySQL HeatWave Service"
fixed = f"Table `{schema}`.`{table}` does not have a Primary Key, this will be fixed when the dump is loaded"
return Compatibility_issue(error, fixed)
def create_invisible_pks_name_conflict(schema, table):
error = f"Table `{schema}`.`{table}` does not have a Primary Key, this cannot be fixed automatically because the table has a column named `my_row_id` (this issue needs to be fixed manually)"
fixed = ""
return Compatibility_issue(error, fixed)
def create_invisible_pks_auto_increment_conflict(schema, table):
error = f"Table `{schema}`.`{table}` does not have a Primary Key, this cannot be fixed automatically because the table has a column with 'AUTO_INCREMENT' attribute (this issue needs to be fixed manually)"
fixed = ""
return Compatibility_issue(error, fixed)
def ignore_missing_pks(schema, table):
error = f"Table `{schema}`.`{table}` does not have a Primary Key, which is required for High Availability in MySQL HeatWave Service"
fixed = f"Table `{schema}`.`{table}` does not have a Primary Key, this is ignored"
return Compatibility_issue(error, fixed)
def too_many_columns(schema, table, columns):
error = f"Table `{schema}`.`{table}` has {columns} columns, while the limit for the InnoDB engine is 1017 columns (this issue needs to be fixed manually)"
fixed = ""
return Compatibility_issue(error, fixed)
def ignore_wildcard_grants(user, grant):
error = f"User {user} has a wildcard grant statement at the database level ({grant})"
fixed = error + ", this is ignored"
return Compatibility_issue(error, fixed)
def grant_on_excluded_object(user, grant):
warning = f"User {user} has a grant statement on an object which is not included in the dump ({grant})"
return Compatibility_issue(None, None, warning)
def urlencode_object_name(s):
ret = ""
for c in s:
if ord(c) <= 127:
ret += urllib.parse.quote(c)
else:
ret += c
return ret
def truncate_basename(basename):
if len(basename) > 225:
# there are no names which would create the same truncated basename,
# always append "0" as ordinal number
return basename[:225] + "0"
else:
return basename
# WL13807-FR8 - The base name of any schema-related file created during the dump must be in the form of `schema`, where:
# * schema - percent-encoded name of the schema.
# Only code points up to and including `U+007F` which are not Unreserved Characters (as specified in RFC3986) must be encoded, all remaining code points must not be encoded. If the length of base name exceeds `225` characters, it must be truncated to `225` characters and an ordinal number must be appended.
# WL13807-TSFR8_1
# WL13807-TSFR8_2
def encode_schema_basename(schema):
return truncate_basename(urlencode_object_name(schema))
# WL13807-FR7.1 - The base name of any file created during the dump must be in the format `schema@table`, where:
# * `schema` - percent-encoded name of the schema which contains the table to be exported,
# * `table` - percent-encoded name of the table to be exported.
# Only code points up to and including `U+007F` which are not Unreserved Characters (as specified in RFC3986) must be encoded, all remaining code points must not be encoded. If the length of base name exceeds `225` characters, it must be truncated to `225` characters and an ordinal number must be appended.
def encode_table_basename(schema, table):
return truncate_basename(urlencode_object_name(schema) + "@" + urlencode_object_name(table))
def encode_partition_basename(schema, table, partition):
return truncate_basename(urlencode_object_name(schema) + "@" + urlencode_object_name(table) + "@" + urlencode_object_name(partition))
def count_files_with_basename(directory, basename):
cnt = 0
for f in os.listdir(directory):
if f.startswith(basename):
cnt += 1
return cnt
def has_file_with_basename(directory, basename):
return count_files_with_basename(directory, basename) > 0
def count_files_with_extension(directory, ext):
cnt = 0
for f in os.listdir(directory):
if f.endswith(ext):
cnt += 1
return cnt
def validate_load_progress(file_name):
with open(file_name) as fp:
while True:
line = fp.readline()
if line:
content = json.loads(line)
EXPECT_TRUE("op" in content)
EXPECT_TRUE("done" in content)
EXPECT_TRUE("schema" in content or content["op"] in ["GTID-UPDATE", "SERVER-UUID"])
else:
break
def remove_file(name):
try:
os.remove(name)
except:
pass
def read_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def write_json(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f)
def wipe_dir(path):
try:
testutil.rmdir(path, True)
except:
pass
def EXPECT_CAPABILITIES(path, capabilities):
metadata = read_json(path)
EXPECT_TRUE("capabilities" in metadata, "Metadata file should have 'capabilities' array")
for expected in capabilities:
actual = next(filter(lambda capability: expected["id"] == capability["id"], metadata["capabilities"]), None)
EXPECT_TRUE(actual is not None, f"Metadata should have '{expected['id']}' capability")
if actual:
EXPECT_EQ(expected["id"], actual["id"], "IDs of the capability should match")
EXPECT_EQ(expected["description"], actual["description"], "Descriptions of the capability should match")
EXPECT_EQ(expected["versionRequired"], actual["versionRequired"], "Version required of the capability should match")
def EXPECT_NO_CAPABILITIES(path, capabilities):
metadata = read_json(path)
EXPECT_TRUE("capabilities" in metadata, "Metadata file should have 'capabilities' array")
for expected in capabilities:
actual = next(filter(lambda capability: expected["id"] == capability["id"], metadata["capabilities"]), None)
EXPECT_FALSE(actual is not None, f"Metadata should NOT have '{expected['id']}' capability")
def quote_identifier(schema, object = None):
def quote(identifier):
return f"`{identifier.replace('`', '``')}`"
ret = quote(schema)
if object is not None:
ret += "." + quote(object)
return ret
def count_rows(schema, table):
return session.run_sql("SELECT COUNT(*) FROM !.!", [ schema, table ]).fetch_one()[0]
# constants
test_user = "sample_user"
test_user_account = get_test_user_account(test_user)
test_user_pwd = "p4$$"
partition_awareness_capability = {
"id": "partition_awareness",
"description": "Partition awareness - dumper treats each partition as a separate table, improving both dump and load times.",
"versionRequired": "8.0.27",
}