def deploy_schema()

in msm_plugin/lib/management.py [0:0]


def deploy_schema(
        session: object, schema_project_path: str, version: str = None,
        backup_directory: str = None) -> str:
    """Deploys the database schema

    Deploys the given version of the database schema. If no version is given,
    the latest available version will be deployed.

    If there is an existing schema version that will be upgraded, a dump of
    that schema is created in order to be able to roll back.

    A log will be written during the update.

    Args:
        session (object): The database session to use.
        schema_project_path (str): The path to the schema project.
        version (str): The version to deploy.
        backup_directory (str): The directory to be used for backups

    Returns:
        None
    """
    project_settings = get_project_settings(schema_project_path)
    schema_name = project_settings.get("schemaName", None)
    if schema_name is None:
        err_msg = (
            f"The project settings of `{schema_project_path}` could not be "
            "read.")
        lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
        raise ValueError(err_msg)
    schema_file_name = project_settings.get("schemaFileName", None)

    released_versions = get_released_versions(
        schema_project_path=schema_project_path)
    deployment_script_versions = get_deployment_script_versions(
        schema_project_path=schema_project_path)

    # Check if there are actually any released versions of the schema
    if len(released_versions) == 0:
        err_msg = (
            f"There are no versions of the schema `{schema_name}` that have "
            "been released yet.")
        lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
        raise Exception(err_msg)

    # Check if there is a difference in released versions and deployment scripts
    if released_versions != deployment_script_versions:
        err_msg = (
            "Deployment script(s) missing. Please generate deployment "
            "scripts for all released versions first.")
        lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
        raise Exception(err_msg)

    # If a specific version is requested, ensure that there is a deployment
    # script for this version
    if version is None:
        version = '%d.%d.%d' % tuple(deployment_script_versions[-1])
    elif version not in map(
            lambda v: '%d.%d.%d' % tuple(v), deployment_script_versions):
        err_msg = (f"Deployment or update of database schema `{schema_name}` using "
                   f"version {version} requested but there is no deployment script "
                   "available for this version.")
        lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
        raise ValueError(err_msg)

    # Check if the schema already exists
    schema_exists = get_schema_exists(session=session, schema_name=schema_name)

    # Check if the schema is actually managed by MSM, and if so, get the version
    schema_managed = False
    schema_version = None
    if schema_exists:
        schema_managed = get_schema_is_managed(
            session=session, schema_name=schema_name)

        if schema_managed:
            schema_version = get_schema_version(
                session=session, schema_name=schema_name)

    if schema_exists and not schema_managed:
        err_msg = (
            f"Deployment or update of database schema `{schema_name}` using "
            f"version {version} requested but the schema is not managed by "
            "MSM.")
        lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
        raise Exception(err_msg)

    # If the requested version already matches the version, exit since there is
    # nothing to do
    if schema_version is not None and schema_version == version:
        info_msg = (
            f"Deployment or update of database schema `{schema_name}` using "
            f"version {version} requested but the schema is already on the "
            "requested version. No changes performed."
        )
        lib.core.write_to_msm_schema_update_log("INFO", info_msg)
        return info_msg

    # Check if the current version of the schema is in the list of versions
    # that can be upgraded by the deployment scripts
    if schema_version is not None:
        updatable_versions = get_updatable_versions(schema_project_path)
        updatable_versions_str = map(
            lambda v: '%d.%d.%d' % tuple(v), updatable_versions)

        if (len(updatable_versions) > 0 and
                lib.core.convert_version_str_to_list(schema_version) >
                updatable_versions[-1]):
            info_msg = (
                f"The database schema `{schema_name}` is on a newer version "
                f"{schema_version} than shipped with this project (version "
                f"{'%d.%d.%d' % tuple(updatable_versions[-1])}). No changes "
                "performed."
            )
            lib.core.write_to_msm_schema_update_log("INFO", info_msg)
            return info_msg

        if not schema_version in updatable_versions_str:
            err_msg = (
                f"Update of database schema `{schema_name}` to version "
                f"{version} requested but the version {schema_version} cannot "
                "be updated.")
            lib.core.write_to_msm_schema_update_log("ERROR", err_msg)
            raise Exception(err_msg)

    # Log start of the
    if not schema_exists:
        lib.core.write_to_msm_schema_update_log(
            "INFO",
            f"Starting deployment of database schema `{schema_name}` using "
            f"version {version} ...")
    else:
        lib.core.write_to_msm_schema_update_log(
            "INFO",
            f"Starting update of database schema `{schema_name}` version "
            f"{schema_version} to version {version} ...")

    # Perform dump if the schema exists
    backup_available = False
    if schema_exists:
        lib.core.write_to_msm_schema_update_log(
            "INFO",
            f"Preparing dump of `{schema_name}` version "
            f"{schema_version} in order to be roll back in case of an error.")

        if backup_directory is None:
            backup_directory = os.path.join(
                lib.core.get_msm_plugin_data_path(),
                "backups", f"{schema_file_name}_backup_{schema_version}")
            # If that directory already exists, keep appending counter until
            # a new directory is found
            i = 2
            candidate = backup_directory
            while os.path.exists(candidate):
                candidate = f"{backup_directory}_{i}"
                i += 1
            backup_directory = candidate

        os.makedirs(backup_directory, exist_ok=True)

        # Set the mysqlsh session to the one that was given
        if "shell.Object" in str(type(session)):
            mysqlsh.globals.shell.set_session(session)
        else:
            mysqlsh.globals.shell.set_session(session.session)

        # Ensure that the dump can be read back in case of a failure by setting
        # local_infile to 1
        # cSpell:ignore infile
        row = lib.core.MsmDbExec("SELECT @@local_infile as local_infile").exec(session).first
        original_local_infile = (row and int(row["local_infile"]) == 1)
        if not original_local_infile:
            try:
                lib.core.write_to_msm_schema_update_log(
                    "INFO",
                    "Enabling local_infile option in order to be able to load "
                    "back the schema dump in case of an update error...")
                lib.core.MsmDbExec("SET GLOBAL local_infile=1").exec(session)
            except:
                err_msg = (
                    "Failed to enable the local_infile option. Please execute "
                    "SET PERSIST GLOBAL local_infile=1; on the MySQL Server.")
                lib.core.write_to_msm_schema_update_log("ERROR", err_msg)

                raise Exception(err_msg)

        lib.core.write_to_msm_schema_update_log(
            "INFO",
            f"Creating dump of `{schema_name}` version {schema_version} ...")
        mysqlsh.globals.util.dump_schemas(
            [schema_name],
            f"file://{backup_directory}",
            {
                "skipUpgradeChecks": True,
                "showProgress": False,
            })

        backup_available = True

    # Run deployment script
    try:
        lib.core.execute_msm_sql_script(
            session=session,
            sql_file_path=os.path.join(
                schema_project_path, "releases", "deployment",
                f"{schema_file_name}_deployment_{version}.sql"))

        if not schema_exists:
            info_msg = (
                f"Deployment of `{schema_name}` version "
                f"{version} completed successfully.")
        else:
            if not original_local_infile:
                lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session)
                lib.core.write_to_msm_schema_update_log(
                    "INFO",
                    "Restored local_infile option.")

            info_msg = (
                f"Completed the update of `{schema_name}` version "
                f"{schema_version} to {version} successfully.")

        lib.core.write_to_msm_schema_update_log("INFO", info_msg)

        # Remove the backup directory as it is no longer needed
        if backup_available:
            shutil.rmtree(backup_directory)

        return info_msg
    except Exception as e:
        # Drop the schema after failed update
        try:
            lib.core.MsmDbExec(
                f"DROP SCHEMA IF EXISTS {lib.core.quote_ident(schema_name)}"
            ).exec(session)
        except:
            pass

        # Restore the backup if available
        if backup_available:
            try:
                mysqlsh.globals.util.load_dump(
                    f"file://{backup_directory}",
                    {
                        "showMetadata": False,
                        "showProgress": False,
                        "ignoreVersion": True,
                    })

                if not original_local_infile:
                    lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session)
                    lib.core.write_to_msm_schema_update_log(
                        "INFO",
                        "Restored local_infile option.")

                # Remove the backup directory as it is no longer needed
                shutil.rmtree(backup_directory)
            except Exception as e_dump_load:
                err_str = (
                    "An error occurred while updating the database schema "
                    f"`{schema_name}` to version {version}. The schema could "
                    f"not be restored back to version {schema_version}. {e} "
                    f"{e_dump_load}")
                lib.core.write_to_msm_schema_update_log("ERROR", err_str)

                raise Exception(err_str)

            err_str = (
                "An error occurred while updating the database schema "
                f"`{schema_name}` to version {version}. The schema has been "
                f"restored back to version {schema_version}. {e}"
            )
            lib.core.write_to_msm_schema_update_log("ERROR", err_str)

            raise Exception(err_str)
        else:
            if not schema_exists:
                err_str = (
                    f"Deploying the database schema `{schema_name}` failed. {e}"
                )
                lib.core.write_to_msm_schema_update_log("ERROR", err_str)

                raise Exception(err_str)
            else:
                err_str = (
                    "An error occurred while updating the database schema "
                    f"`{schema_name}` to version {version}. {e}"
                )
                lib.core.write_to_msm_schema_update_log("ERROR", err_str)

                raise Exception(err_str)