in msm_plugin/lib/management.py [0:0]
def deploy_schema(
session: object, schema_project_path: str, version: str = None,
backup_directory: str = None) -> str:
"""Deploys the database schema
Deploys the given version of the database schema. If no version is given,
the latest available version will be deployed.
If there is an existing schema version that will be upgraded, a dump of
that schema is created in order to be able to roll back.
A log will be written during the update.
Args:
session (object): The database session to use.
schema_project_path (str): The path to the schema project.
version (str): The version to deploy.
backup_directory (str): The directory to be used for backups
Returns:
None
"""
project_settings = get_project_settings(schema_project_path)
schema_name = project_settings.get("schemaName", None)
if schema_name is None:
err_msg = (
f"The project settings of `{schema_project_path}` could not be "
"read.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise ValueError(err_msg)
schema_file_name = project_settings.get("schemaFileName", None)
released_versions = get_released_versions(
schema_project_path=schema_project_path)
deployment_script_versions = get_deployment_script_versions(
schema_project_path=schema_project_path)
# Check if there are actually any released versions of the schema
if len(released_versions) == 0:
err_msg = (
f"There are no versions of the schema `{schema_name}` that have "
"been released yet.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise Exception(err_msg)
# Check if there is a difference in released versions and deployment scripts
if released_versions != deployment_script_versions:
err_msg = (
"Deployment script(s) missing. Please generate deployment "
"scripts for all released versions first.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise Exception(err_msg)
# If a specific version is requested, ensure that there is a deployment
# script for this version
if version is None:
version = '%d.%d.%d' % tuple(deployment_script_versions[-1])
elif version not in map(
lambda v: '%d.%d.%d' % tuple(v), deployment_script_versions):
err_msg = (f"Deployment or update of database schema `{schema_name}` using "
f"version {version} requested but there is no deployment script "
"available for this version.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise ValueError(err_msg)
# Check if the schema already exists
schema_exists = get_schema_exists(session=session, schema_name=schema_name)
# Check if the schema is actually managed by MSM, and if so, get the version
schema_managed = False
schema_version = None
if schema_exists:
schema_managed = get_schema_is_managed(
session=session, schema_name=schema_name)
if schema_managed:
schema_version = get_schema_version(
session=session, schema_name=schema_name)
if schema_exists and not schema_managed:
err_msg = (
f"Deployment or update of database schema `{schema_name}` using "
f"version {version} requested but the schema is not managed by "
"MSM.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise Exception(err_msg)
# If the requested version already matches the version, exit since there is
# nothing to do
if schema_version is not None and schema_version == version:
info_msg = (
f"Deployment or update of database schema `{schema_name}` using "
f"version {version} requested but the schema is already on the "
"requested version. No changes performed."
)
lib.core.write_to_msm_schema_update_log("INFO", info_msg)
return info_msg
# Check if the current version of the schema is in the list of versions
# that can be upgraded by the deployment scripts
if schema_version is not None:
updatable_versions = get_updatable_versions(schema_project_path)
updatable_versions_str = map(
lambda v: '%d.%d.%d' % tuple(v), updatable_versions)
if (len(updatable_versions) > 0 and
lib.core.convert_version_str_to_list(schema_version) >
updatable_versions[-1]):
info_msg = (
f"The database schema `{schema_name}` is on a newer version "
f"{schema_version} than shipped with this project (version "
f"{'%d.%d.%d' % tuple(updatable_versions[-1])}). No changes "
"performed."
)
lib.core.write_to_msm_schema_update_log("INFO", info_msg)
return info_msg
if not schema_version in updatable_versions_str:
err_msg = (
f"Update of database schema `{schema_name}` to version "
f"{version} requested but the version {schema_version} cannot "
"be updated.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise Exception(err_msg)
# Log start of the
if not schema_exists:
lib.core.write_to_msm_schema_update_log(
"INFO",
f"Starting deployment of database schema `{schema_name}` using "
f"version {version} ...")
else:
lib.core.write_to_msm_schema_update_log(
"INFO",
f"Starting update of database schema `{schema_name}` version "
f"{schema_version} to version {version} ...")
# Perform dump if the schema exists
backup_available = False
if schema_exists:
lib.core.write_to_msm_schema_update_log(
"INFO",
f"Preparing dump of `{schema_name}` version "
f"{schema_version} in order to be roll back in case of an error.")
if backup_directory is None:
backup_directory = os.path.join(
lib.core.get_msm_plugin_data_path(),
"backups", f"{schema_file_name}_backup_{schema_version}")
# If that directory already exists, keep appending counter until
# a new directory is found
i = 2
candidate = backup_directory
while os.path.exists(candidate):
candidate = f"{backup_directory}_{i}"
i += 1
backup_directory = candidate
os.makedirs(backup_directory, exist_ok=True)
# Set the mysqlsh session to the one that was given
if "shell.Object" in str(type(session)):
mysqlsh.globals.shell.set_session(session)
else:
mysqlsh.globals.shell.set_session(session.session)
# Ensure that the dump can be read back in case of a failure by setting
# local_infile to 1
# cSpell:ignore infile
row = lib.core.MsmDbExec("SELECT @@local_infile as local_infile").exec(session).first
original_local_infile = (row and int(row["local_infile"]) == 1)
if not original_local_infile:
try:
lib.core.write_to_msm_schema_update_log(
"INFO",
"Enabling local_infile option in order to be able to load "
"back the schema dump in case of an update error...")
lib.core.MsmDbExec("SET GLOBAL local_infile=1").exec(session)
except:
err_msg = (
"Failed to enable the local_infile option. Please execute "
"SET PERSIST GLOBAL local_infile=1; on the MySQL Server.")
lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
raise Exception(err_msg)
lib.core.write_to_msm_schema_update_log(
"INFO",
f"Creating dump of `{schema_name}` version {schema_version} ...")
mysqlsh.globals.util.dump_schemas(
[schema_name],
f"file://{backup_directory}",
{
"skipUpgradeChecks": True,
"showProgress": False,
})
backup_available = True
# Run deployment script
try:
lib.core.execute_msm_sql_script(
session=session,
sql_file_path=os.path.join(
schema_project_path, "releases", "deployment",
f"{schema_file_name}_deployment_{version}.sql"))
if not schema_exists:
info_msg = (
f"Deployment of `{schema_name}` version "
f"{version} completed successfully.")
else:
if not original_local_infile:
lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session)
lib.core.write_to_msm_schema_update_log(
"INFO",
"Restored local_infile option.")
info_msg = (
f"Completed the update of `{schema_name}` version "
f"{schema_version} to {version} successfully.")
lib.core.write_to_msm_schema_update_log("INFO", info_msg)
# Remove the backup directory as it is no longer needed
if backup_available:
shutil.rmtree(backup_directory)
return info_msg
except Exception as e:
# Drop the schema after failed update
try:
lib.core.MsmDbExec(
f"DROP SCHEMA IF EXISTS {lib.core.quote_ident(schema_name)}"
).exec(session)
except:
pass
# Restore the backup if available
if backup_available:
try:
mysqlsh.globals.util.load_dump(
f"file://{backup_directory}",
{
"showMetadata": False,
"showProgress": False,
"ignoreVersion": True,
})
if not original_local_infile:
lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session)
lib.core.write_to_msm_schema_update_log(
"INFO",
"Restored local_infile option.")
# Remove the backup directory as it is no longer needed
shutil.rmtree(backup_directory)
except Exception as e_dump_load:
err_str = (
"An error occurred while updating the database schema "
f"`{schema_name}` to version {version}. The schema could "
f"not be restored back to version {schema_version}. {e} "
f"{e_dump_load}")
lib.core.write_to_msm_schema_update_log("ERROR", err_str)
raise Exception(err_str)
err_str = (
"An error occurred while updating the database schema "
f"`{schema_name}` to version {version}. The schema has been "
f"restored back to version {schema_version}. {e}"
)
lib.core.write_to_msm_schema_update_log("ERROR", err_str)
raise Exception(err_str)
else:
if not schema_exists:
err_str = (
f"Deploying the database schema `{schema_name}` failed. {e}"
)
lib.core.write_to_msm_schema_update_log("ERROR", err_str)
raise Exception(err_str)
else:
err_str = (
"An error occurred while updating the database schema "
f"`{schema_name}` to version {version}. {e}"
)
lib.core.write_to_msm_schema_update_log("ERROR", err_str)
raise Exception(err_str)