msm_plugin/lib/management.py (946 lines of code) (raw):

# Copyright (c) 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # Define plugin version from msm_plugin import lib import json import os from pathlib import Path import re import shutil from string import Template from datetime import date from textwrap import indent import mysqlsh # Regex to match MSM sections, excluding the license and the last section MSM_SECTIONS_REGEX = r'--\s+#+\s+--\s+MSM\s+Section\s+(\d+):\s*(.*?)$\s+.*?--\s+#+' # Regex to match the MSM LOOP UPDATABLE-VERSIONS MSM_LOOP_UPDATABLE_VERSIONS_REGEX = \ r'--\s+###\s+MSM-LOOP-START:UPDATABLE-VERSIONS(\(indent=(\d+)\))?.*?\n(.*?)' \ r'--\s+###\s+MSM-LOOP-END:UPDATABLE-VERSIONS.*?\n' # Reges to match the MSM section placeholder MSM_SECTION_PLACEHOLDER_REGEX = r'\$\{section_(\d+).*?\}\n' # Regex to match the leading comments and empty lines before SQL commands, # but ensure that comments before SQL commands are kept REMOVE_LEADING_COMMENTS_AND_EMPTY_LINES = r'((^--.*?\n)+(^\s*\n)*)' # Regex to match all empty lines, including whitespace, at the end of the file REMOVE_TRAILING_EMPTY_LINES = r'(\s*\Z)' MSM_SCHEMA_VERSION_VIEW_VALUES = r'CREATE.*?VIEW.*?`msm_schema_version`.*?' \ r'\(.*?`major`.*?`minor`.*?`patch`.*?\).*?AS.*?SELECT.*?' \ r'((\d+).*?,.*?(\d+).*?,.*?(\d+))' MSM_SECTION_SOURCE_REGEX = \ r'^([ \t]*)SOURCE\s+[\'"](.*?)[\'"]\s*\[(\d*:-?\d*)\]\s*;.*?$\n' MSM_SECTION_REMOVE_COMMENTS_AND_DELIMITERS = \ r'(^\s*--\s+.*?$)|(^DELIMITER\s+.*?$)|(\/\*.*?\*\/)' def remove_leading_comments_trailing_lines(section: str) -> str: """Removes leading comments and empty lines at the beginning and end from the given section Args: section (str): The section to remove the leading comments from Returns: The trimmed section text """ lines = section.splitlines() # Remove leading comments and empty lines while len(lines) > 0 and (lines[0].startswith("-- ") or lines[0].strip() == ""): lines = lines[1:] # Remove trailing empty lines while len(lines) > 0 and lines[len(lines) - 1].strip() == "": lines = lines[:len(lines) - 1] return "\n".join(lines) def get_sql_content(full_section_content: str) -> tuple[str, int, int]: """Returns the sql content of a MSM section Args: full_section_content: The full content of the section Returns: The sql content as well as the start and the end position """ # Search start position of sql content start_position = 0 match = re.search( REMOVE_LEADING_COMMENTS_AND_EMPTY_LINES, full_section_content, re.MULTILINE | re.DOTALL) if match is not None: start_position = match.end() # Search end position of sql content end_position = len(full_section_content) match = re.search( REMOVE_TRAILING_EMPTY_LINES, full_section_content, re.MULTILINE | re.DOTALL) if match is not None: end_position = match.start() return ( full_section_content[start_position:end_position], start_position, end_position) def set_sql_content(section: dict, sql_content: str) -> None: """Sets the sql content of a MSM section Args: section: The MSM section as dict Returns: None """ if not ("sql_content" in section and "sql_start" in section and "sql_end" in section): raise ValueError("The given section does not contain SQL content.") section["full_content"] = ( section["full_content"][:section["sql_start"]] + "\n" + sql_content.strip("\n")) section["sql_content"] = sql_content section["sql_end"] = len(section["full_content"]) def get_script_sections(script: str) -> dict[str, dict]: """Returns the MSM sections of a script Args: script (str): The script as string Returns: A list of script dicts. """ # Search for MSM sections matches = re.finditer( MSM_SECTIONS_REGEX, script, re.MULTILINE | re.DOTALL) # for match_id, match in enumerate(matches, start=1): # print("Match {match_id} was found at {start}-{end}: {match}".format( # match_id=match_id, start=match.start(), end=match.end(), match=match.group())) # for group_id in range(0, len(match.groups())): # group_id = group_id + 1 # print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id, # start=match.start(group_id), end=match.end(group_id), group=match.group(group_id))) section_headers = [] for match_id, match in enumerate(matches, start=1): # Add license section if available if match_id == 1 and match.start() > 0: section_headers.append({ "section_id": "license", "start_position": 0, "header_length": 0, }) section_headers.append({ "section_id": match.group(1), "start_position": match.start(), "header_length": match.end() - match.start(), }) sections = {} for idx, header in enumerate(section_headers): # Lookup the end of the section by checking the start of the next section, # while the last section ends at the end of the script. if idx + 1 < len(section_headers): section_end = section_headers[idx + 1]["start_position"] - 1 else: section_end = len(script) sections[header["section_id"]] = { "full_content": script[header["start_position"]:section_end], "sql_content": script[header["start_position"] + header["header_length"]:section_end], "sql_start": header["header_length"], "sql_end": section_end, "start_position": header["start_position"], "end_position": section_end, } return sections def get_file_sections(file_path: str) -> str: """Returns the MSM sections of a given file Args: file_path (str): The path of the SQL file. section_id (str): The id of the section. Returns: The SQL content as string """ if not os.path.exists(file_path): raise ValueError( f"The file `{file_path}` could not be found.") with open(file_path, "r") as f: script = f.read() return get_script_sections(script) def get_file_section(file_path: str, section_id: str) -> str: """Returns the SQL content of a MSM section of a given file Args: file_path (str): The path of the SQL file. section_id (str): The id of the section. Returns: The SQL content as string """ sections = get_file_sections(file_path) if section_id in sections: return sections[section_id] else: raise ValueError( f"The MSM section `{section_id}` could not be found in the file `{file_path}`.") def set_section_sql_content( section_id: str, sql_content: str, file_path: str = None, sections: dict[str, dict] = None) -> None: """Sets the SQL content of a MSM section of a file Args: section_id: The id of the section. sql_content: The sql content to set file_path: The path of the SQL file. sections: The MSM sections. Returns: None """ if sections is None and file_path is not None: sections = get_file_sections(file_path) elif sections is None: raise ValueError("Either a file_path or sections must be set.") if not section_id in sections: raise ValueError(f"The given section `{section_id}` was not found.") section = sections[section_id] set_sql_content(section=section, sql_content=sql_content) if file_path is not None: write_sections_to_file( file_path=file_path, sections=sections, overwrite_existing=True) def write_sections_to_file(file_path: str, sections: dict[str, dict], overwrite_existing: bool = False): """Writes all MSM sections to a file Args: file_path: The path write the sections to. sections: The MSM sections to write out. overwrite_existing: Whether an existing file should be replaced. Defaults to False. Returns: None """ if os.path.exists(file_path) and not overwrite_existing: raise ValueError( f"The file {file_path} already exists. Please explicitly allow to replace existing files.") with open(file_path, "w") as f: f.write("\n".join( sections[section_id].get("full_content") for section_id in sections)) def convert_string_to_valid_filename(name: str) -> str: """Converts a string to a valid filename Replaces all spaces with underscores and remove characters that are not allowed. Args: name: The string to convert Returns: The converted filename """ # Replace all spaces with underscores r = re.compile(r"\s+", re.MULTILINE) file_name = r.sub("_", name) # Remove characters that are not allowed r = re.compile(r"[\\\\/:*?\"<>|]", re.MULTILINE) file_name = r.sub("", file_name) return file_name def check_mysql_identifier(identifier: str, must_be_usable_when_unquoted: bool = True): """Checks if the given identifier is valid Args: identifier: The identifier to be checked allow_special_chars: Wether Returns: The converted filename """ if not must_be_usable_when_unquoted: if not re.match(r"^[A-Za-z0-9_]+[A-Za-z0-9_$]*$", identifier): raise ValueError( "Only basic Latin letters, digits 0-9, dollar, underscore are allowed to be used as schema name. " "Use the allow_special_chars option to overwrite this behavior.") if re.match(r"^[0-9]+$", identifier): raise ValueError( "A schema name must not consist solely of digits. " "Use the allow_special_chars option to overwrite this behavior.") def get_license_text(schema_project_path: str = None, project_settings: dict = None) -> str: """Gets the correct license text for the given project The license text is fetch from the right license template and the variables are substituted accordingly. Args: schema_project_path: The MSM project path project_settings: Optional project settings Returns: The converted filename """ if project_settings is None and schema_project_path is not None: project_settings = get_project_settings(schema_project_path) elif project_settings is None and schema_project_path is None: raise ValueError( "No schema_project_path nor project_settings parameter given.") try: copyright_holder = project_settings.get("copyrightHolder") license = project_settings.get("license") custom_license = project_settings.get("customLicense") year_of_creation = project_settings.get("yearOfCreation") except Exception as e: raise ValueError( f"The project settings must include copyrightHolder, license, customLicense and yearOfCreation. {e}") license_template = None if license.upper() == "CUSTOM": if custom_license is not None and custom_license != "": license_template = Template(custom_license) else: raise ValueError("No custom license text specified.") else: template_folder = os.path.join( Path(__file__).parent.parent, "templates") license_file_path = os.path.join( template_folder, "license", f"{license}.txt") if not os.path.exists(license_file_path): raise ValueError( f"No license stored under the given license name `{license}`. Please use a custom license text.") with open(license_file_path) as f: license_template = Template("".join(f.readlines()[1:])) current_year = date.today().strftime("%Y") try: license_text = license_template.substitute({ "copyright_holder": copyright_holder, "year": year_of_creation if year_of_creation == current_year else f"{year_of_creation}, {current_year}" }) except: raise ValueError( "The license template is either missing the ${year} or ${copyright_holder} placeholders.") return license_text def copy_template_file_and_substitute( source_file_path: str, target_file_path: str, substitutions: dict): """Copies a template file and substitutes the given variables Args: source_file_path: The path to the source file target_file_path: The path to the target file substitutions: The dict holding the substitutions Returns: None """ # Create schema development script with open(source_file_path, "r") as f: # Remove copyright line and replace placeholders script = Template("".join(f.readlines()[1:])) script = script.substitute(substitutions) with open(target_file_path, "w") as f: f.write(script) def create_schema_project_folder( schema_name: str, target_path: str, copyright_holder: str, license: str = None, overwrite_existing: bool = False, allow_special_chars: bool = False, enforce_target_path: bool = False) -> str: """Creates a new schema project folder. Args: schema_name (str): The name of the schema. target_path (str): The path to the schema project folder. copyright_holder (str): The name of the copyright holder. license (str): The license to use for the project. overwrite_existing (bool): If the project folder already exists, overwrite it. allow_special_chars (bool): If set to True, allows all characters overwrite_existing (bool): If the project folder already exists, overwrite it. enforce_target_path (bool): If set to true, the target_path is created if it does not yet exist. Returns: The path of the project folder that was created. """ check_mysql_identifier( identifier=schema_name, must_be_usable_when_unquoted=not allow_special_chars) target_path = os.path.abspath(os.path.expanduser(target_path)) if not os.path.exists(target_path): if enforce_target_path: Path(target_path).mkdir(parents=True, exist_ok=True) else: raise ValueError( f'The project folder cannot be created inside the directory `{target_path}` as this path ' 'does not exist.') # When used in file names, critical characters need to be removed schema_file_name = convert_string_to_valid_filename(schema_name) # Create the project folder project_path = os.path.join(target_path, f"{schema_file_name}.msm.project") if os.path.exists(project_path): if overwrite_existing: shutil.rmtree(project_path) else: raise ValueError( f'The project folder "{project_path}" already exists.') os.makedirs(os.path.join(project_path), exist_ok=True) # Copy the template project file structure template_folder = os.path.join(Path(__file__).parent.parent, "templates") shutil.copytree( Path(os.path.join(template_folder, "msm.project")), project_path, dirs_exist_ok=True) # Get current year year_of_creation = date.today().strftime("%Y") project_settings = { "copyrightHolder": copyright_holder, "customLicense": "", "license": license, "schemaDependencies": [], "schemaName": schema_name, "schemaFileName": schema_file_name, "yearOfCreation": year_of_creation, } # Write the msm.project.json file with open(os.path.join(project_path, "msm.project.json"), "w") as f: f.write(json.dumps(project_settings, indent=4)) # Replace placeholders in all markdown files files = list(Path(project_path).rglob("*.md")) for file in files: with open(file, "r") as f: script = Template(f.read()) script = script.substitute({ "schema_name": schema_name, }) r = re.compile(r"Copyright.*$", re.MULTILINE) script = r.sub( f"Copyright (c) {year_of_creation}, {copyright_holder}.", script) with open(file, "w") as f: f.write(script) copy_template_file_and_substitute( source_file_path=os.path.join( template_folder, "scripts", "schema_next.sql"), target_file_path=os.path.join( project_path, "development", f"{schema_file_name}_next.sql"), substitutions={ "license": get_license_text(project_settings=project_settings), "schema_name": schema_name, "version_str": "0.0.1", "version_comma_str": "0, 0, 1", }) return project_path def get_schema_development_file_path(schema_project_path: str = None) -> str: schema_file_name = get_project_settings( schema_project_path).get("schemaFileName", None) if schema_file_name is None: raise ValueError( f"The settings files of the project `{schema_project_path}` does not contain " "a schemaFileName value.") schema_dev_file_path = os.path.join( schema_project_path, "development", f"{schema_file_name}_next.sql") if not os.path.exists(schema_dev_file_path): raise ValueError( f"The MSM project folder does not contain a schema development file `{schema_dev_file_path}`.") return schema_dev_file_path def get_schema_development_sections(schema_project_path: str = None) -> dict[str, dict]: schema_dev_file_path = get_schema_development_file_path( schema_project_path) with open(schema_dev_file_path, "r") as f: script = f.read() return get_script_sections(script) def write_schema_development_file(schema_project_path: str, sections: dict[str, dict]) -> None: schema_dev_file_path = get_schema_development_file_path( schema_project_path) write_sections_to_file( file_path=schema_dev_file_path, sections=sections, overwrite_existing=True) def schema_development_version( schema_project_path: str = None, sections: dict[str, dict] = None, new_version: str = None, write_to_file: bool = False) -> str: """Returns the development version of the current schema Args: schema_project_path: The path to the schema project folder. sections: The list of sections of the schema development file. new_version: If not None, the given version is set as new development version. write_to_file: Whether the development file should be updated. Defaults to False. Returns: A dict with information about the project """ if sections is None and schema_project_path is None: raise ValueError( "No sections nor schema_project_path parameters given.") if sections is None: sections = get_schema_development_sections( schema_project_path=schema_project_path) # Process MSM Section 910: Database Schema Version version_section = sections.get("910", None) if version_section is None: for section_id in sections: section = sections[section_id] print( f"{section_id=}\n{section['full_content']=}\n\n{section['sql_content']=}\n\n\n") raise ValueError( "The script section `MSM Section 910: Database Schema Version` could not be found.") section_content = version_section.get("full_content") version_values = re.search( MSM_SCHEMA_VERSION_VIEW_VALUES, section_content, re.MULTILINE | re.DOTALL) if version_values is None: raise ValueError( "The script section `MSM Section 910: Database Schema Version` does not include the " "CREATE VIEW statement to create the `msm_schema_version` VIEW.") # If no new_version is given, return the current one if new_version is None: return f"{version_values.group(2)}.{version_values.group(3)}.{version_values.group(4)}" new_version_list = lib.core.convert_version_str_to_list(new_version) # Update the version updated_content = ( section_content[:version_values.start(1)] + ", ".join(str(number) for number in new_version_list) + section_content[version_values.end(1):]) version_section["full_content"] = updated_content sections["910"] = version_section if write_to_file: write_schema_development_file( schema_project_path=schema_project_path, sections=sections) return new_version def get_schema_development_version(schema_project_path: str = None, sections: dict[str, dict] = None) -> str: """Returns the development version of the current schema Args: schema_project_path: The path to the schema project folder. sections: The list of sections of the schema development file. Returns: A dict with information about the project """ return schema_development_version( schema_project_path=schema_project_path, sections=sections) def set_development_version( new_version: str, schema_project_path: str = None, sections: dict = None, write_to_file: bool = False) -> dict: """Sets the development version inside the development/schema_next.sql file Args: version: The new version to create. schema_project_path: The path to the schema project folder. sections: The list of sections of the schema development file. write_to_file: Whether the development file should be updated. Defaults to False. Returns: None """ return schema_development_version( schema_project_path=schema_project_path, sections=sections, new_version=new_version, write_to_file=write_to_file) def get_project_settings(schema_project_path: str) -> dict: """Returns the project settings Args: schema_project_path: The path to the schema project folder. Returns: A dict with the project settings """ if schema_project_path is None: raise ValueError("No schema_project_path given.") project_settings_file = os.path.join( schema_project_path, "msm.project.json") if not os.path.exists(project_settings_file): raise ValueError( f"The path {schema_project_path} does not contain a MSM project folder.") with open(project_settings_file, "r") as f: try: project_settings = json.loads(f.read()) except: raise ValueError( f"The contents of the MSM project settings file `{project_settings_file}` is corrupted.") return project_settings def get_released_versions(schema_project_path: str) -> list[list[int, int, int]]: """Returns the all released version of the database schema Args: schema_project_path: The path to the schema project folder. Returns: The list of released database versions as list of list of 3 ints. """ if schema_project_path is None: raise ValueError("No schema_project_path given.") # Look at all files and get the highest version files = [] for file in next(os.walk(os.path.join(schema_project_path, "releases", "versions")))[2]: files.append(file) versions = [] for file in files: version_match = re.match(r".*?(\d+)\.(\d+)\.(\d+)\.sql", file) if version_match is not None: file_version = [int(version_match.group( 1)), int(version_match.group(2)), int(version_match.group(3))] versions.append(file_version) versions.sort() return versions def get_updatable_versions(schema_project_path: str) -> list[list[int, int, int]]: """Returns the all version that have update scripts Args: schema_project_path: The path to the schema project folder. Returns: The list of updatable database versions as list of list of 3 ints. """ if schema_project_path is None: raise ValueError("No schema_project_path given.") # Look at all files and get the highest version files = [] for file in next(os.walk(os.path.join(schema_project_path, "releases", "updates")))[2]: files.append(file) versions = [] versions_to = [] for file in files: version_match = re.match( r".*?(\d+)\.(\d+)\.(\d+)_to_(\d+)\.(\d+)\.(\d+)\.sql", file) if version_match is not None: file_version = [int(version_match.group( 1)), int(version_match.group(2)), int(version_match.group(3))] versions.append(file_version) to_version = [int(version_match.group( 4)), int(version_match.group(5)), int(version_match.group(6))] versions_to.append(to_version) versions.sort() versions_to.sort() # Check if there are any missing version updates if len(versions) > 0: versions_to.insert(0, versions[0]) del versions_to[-1] missing_upgrade_from = None missing_upgrade_to = None for i in range(len(versions)): if versions[i] != versions_to[i]: missing_upgrade_from = versions[i] missing_upgrade_to = versions_to[i] else: continue break if missing_upgrade_from is not None: raise Exception( "The list of upgrade scripts misses an upgrade step from " f"version {missing_upgrade_from} to {missing_upgrade_to}. " "Please create the required update script.") return versions def get_last_released_version(schema_project_path: str) -> list[int, int, int] | None: """Returns the last released version of the database schema Args: schema_project_path: The path to the schema project folder. Returns: The last released version or None if no version was released yet """ versions = get_released_versions(schema_project_path) if len(versions) > 0: return versions[-1] else: return None def get_deployment_script_versions(schema_project_path: str) -> list[list[int, int, int]]: """Returns the all deployed version of the database schema Args: schema_project_path: The path to the schema project folder. Returns: The list of deployed database versions as list of list of 3 ints. """ if schema_project_path is None: raise ValueError("No schema_project_path given.") # Look at all files and get the highest version files = [] for file in next(os.walk(os.path.join(schema_project_path, "releases", "deployment")))[2]: files.append(file) versions = [] for file in files: version_match = re.match(r".*?(\d+)\.(\d+)\.(\d+)\.sql", file) if version_match is not None: file_version = [int(version_match.group( 1)), int(version_match.group(2)), int(version_match.group(3))] versions.append(file_version) versions.sort() return versions def get_last_deployment_script_version(schema_project_path: str) -> list[int, int, int] | None: """Returns the last deployment version of the database schema Args: schema_project_path: The path to the schema project folder. Returns: The version of the last deployment or None if no version was released yet """ versions = get_deployment_script_versions(schema_project_path) if len(versions) > 0: return versions[-1] else: return None def get_project_info(schema_project_path: str) -> dict: """Returns information about the project Args: schema_project_path: The path to the schema project folder. Returns: A dict with information about the project """ project_info = get_project_settings(schema_project_path) project_info["currentDevelopmentVersion"] = get_schema_development_version( schema_project_path=schema_project_path) last_released_version = get_last_released_version( schema_project_path) project_info["lastReleasedVersion"] = ( '%d.%d.%d' % tuple(last_released_version) if last_released_version is not None else None) project_info["schemaDevelopmentFilePath"] = get_schema_development_file_path( schema_project_path) return project_info def sql_content_has_no_statement(sql_content: str, pattern: re.Pattern[str] = None) -> bool: """Removes all comments and delimiter statements Args: section_content: The section content Returns: True if the section content only contains comments or delimiter statements """ if pattern is None: pattern = re.compile( MSM_SECTION_REMOVE_COMMENTS_AND_DELIMITERS, re.MULTILINE | re.DOTALL) return re.sub(pattern, "", sql_content, 0).strip() == "" def remove_empty_sections(sections: dict, keep_even_if_empty=list[str]) -> None: """Removes all empty sections from the given section dict Args: sections: The section dict with all sections keep_even_if_empty: A list of section ids that should kept even if empty Returns: None """ pattern = re.compile( MSM_SECTION_REMOVE_COMMENTS_AND_DELIMITERS, re.MULTILINE | re.DOTALL) sections_to_remove = [] for section_id in sections: if section_id not in keep_even_if_empty: section = sections[section_id] full_content = section.get("full_content", "") if sql_content_has_no_statement(full_content, pattern): sections_to_remove.append(section_id) for section_id in sections_to_remove: sections.pop(section_id, None) def prepare_release( schema_project_path: str, version: str, next_version: str, allow_to_stay_on_same_version: bool = False, overwrite_existing: bool = False) -> list[str]: """Adds a new release to the schema project Args: schema_project_path: The path to the schema project folder. version: The new version to create. next_version: The next development version. allow_to_stay_on_same_version: Whether to allow to stay on the same version for further development work overwrite_existing: Whether existing files should be overwritten. Defaults to False. Returns: A list of filenames that have been created or changed for the release """ project_settings = get_project_settings(schema_project_path) schema_name = project_settings.get("schemaName") schema_file_name = project_settings.get("schemaFileName") last_released_version = get_last_released_version(schema_project_path) version_as_ints = lib.core.convert_version_str_to_list(version) files_for_release = [] # Ensure provided version are OK if last_released_version is not None: if last_released_version > version_as_ints: raise ValueError( f"The given version {version} is lower than the last released " f"version {'%d.%d.%d' % tuple(last_released_version)}.") if not allow_to_stay_on_same_version and lib.core.convert_version_str_to_list(next_version) <= version_as_ints: raise ValueError( "The next development version needs to be higher than the version for release " f"{version}. Please explicitly pass the according flag to stay on the same version.") elif allow_to_stay_on_same_version and lib.core.convert_version_str_to_list(next_version) < version_as_ints: raise ValueError( "The next development version needs to be at least at the same version as the version for release " f"{version}.") # Get the current schema development file sections schema_dev_file_path = os.path.join( schema_project_path, "development", f"{schema_file_name}_next.sql") if not os.path.exists(schema_dev_file_path): raise ValueError( f"The MSM project folder does not contain a schema development file `{schema_dev_file_path}`.") with open(schema_dev_file_path, "r") as f: schema_dev_script = f.read() # Make sure to resolve SOURCE statements with actual content schema_dev_script = substitute_source_statements_with_content( schema_dev_script, os.path.dirname(schema_dev_file_path)) schema_dev_expanded_sections = get_script_sections(schema_dev_script) # Prepare the schema version template file template_folder = os.path.join(Path(__file__).parent.parent, "templates") schema_version_template_file_path = os.path.join( template_folder, "scripts", "schema_a.b.c.sql") if not os.path.exists(schema_version_template_file_path): raise ValueError( f"The schema release version template file `{schema_version_template_file_path}` does not exist.") with open(schema_version_template_file_path, "r") as f: # Remove copyright line and replace placeholders schema_version_script = Template("".join(f.readlines()[1:])) schema_version_script = schema_version_script.substitute({ # get_license_text(project_settings=project_settings), "license": "License Placeholder", "schema_name": schema_name, "version_str": version, "version_comma_str": ", ".join(str(number) for number in version_as_ints), }) schema_version_sections = get_script_sections(schema_version_script) # Get the version section of the current schema development file schema_dev_version_section = schema_dev_expanded_sections.get("910", None) if schema_dev_version_section is None: raise ValueError( "The script section `MSM Section 910: Database Schema Version` could not be found in " f"`{schema_dev_file_path}`.") # Loop over all sections of the version file and replace the sections with the ones from the development file for section_id in schema_version_sections: section = schema_dev_expanded_sections.get(section_id, None) if section is not None: schema_version_sections[section_id] = section remove_empty_sections( sections=schema_version_sections, keep_even_if_empty=["license", "001"]) # Write the schema version file out schema_version_file_path = os.path.join( schema_project_path, "releases", "versions", f"{schema_file_name}_{version}.sql") write_sections_to_file( file_path=schema_version_file_path, sections=schema_version_sections, overwrite_existing=overwrite_existing) files_for_release.append(schema_version_file_path) # Check if update script is needed and if so, write it out if last_released_version is not None: last_released_version_str = '%d.%d.%d' % tuple(last_released_version) if last_released_version_str != version: schema_update_file_path = os.path.join( schema_project_path, "releases", "updates", f"{schema_file_name}_{last_released_version_str}_to_{version}.sql") copy_template_file_and_substitute( source_file_path=os.path.join( template_folder, "scripts", "schema_x.y.z_to_a.b.c.sql"), target_file_path=schema_update_file_path, substitutions={ "license": get_license_text(project_settings=project_settings), "schema_name": schema_name, "version_from": last_released_version_str, "version_to": version, "version_comma_str": ", ".join( str(number) for number in lib.core.convert_version_str_to_list(version)), }) files_for_release.append(schema_update_file_path) # Update the version of the development schema file store in section # MSM Section 910: Database Schema Version set_development_version(new_version=next_version, schema_project_path=schema_project_path, write_to_file=True) return files_for_release def generate_deployment_script( schema_project_path: str, version: str, overwrite_existing: bool = False) -> str: """Generate the deployment script for a release Args: schema_project_path: The path to the schema project folder. version: The version to generate the deployment script for. overwrite_existing: Whether existing files should be overwritten. Defaults to False. Returns: The file name path of the generated script """ project_settings = get_project_settings(schema_project_path) schema_name = project_settings.get("schemaName") schema_file_name = project_settings.get("schemaFileName") # Build deployment file path that will be used for output and check that it does not exist deployment_file_path = os.path.join( schema_project_path, "releases", "deployment", f"{schema_file_name}_deployment_{version}.sql") if os.path.exists(deployment_file_path) and not overwrite_existing: raise ValueError( f"The file {deployment_file_path} already exists. Please explicitly allow to replace existing files.") # Build the version file path that is used as a source and check that it does exist version_file_path = os.path.join( schema_project_path, "releases", "versions", f"{schema_file_name}_{version}.sql") if not os.path.exists(version_file_path): raise ValueError( f"The file {version_file_path} does not exist exists. Please make " f"sure to prepare the release {version} first.") # Get the version script file section target_version_sections = get_file_sections(version_file_path) version_as_ints = lib.core.convert_version_str_to_list(version) # Ensure there are no missing updates between 2 released versions updatable_versions = get_updatable_versions(schema_project_path) # Get the list of released version and build list of updatable versions released_versions = get_released_versions(schema_project_path) i = 0 updatable_versions = [] updatable_versions_sections = {} while i + 1 < len(released_versions) and released_versions[i] < version_as_ints: version_from = '%d.%d.%d' % tuple(released_versions[i]) version_to = '%d.%d.%d' % tuple(released_versions[i + 1]) updatable_versions.append(version_from) updatable_versions_sections[version_from] = { "version_from": released_versions[i], "version_to": released_versions[i + 1], "sections": get_file_sections(os.path.join( schema_project_path, "releases", "updates", f"{schema_file_name}_{version_from}_to_{version_to}.sql")), } i += 1 if len(released_versions) == 0: raise ValueError( "Please prepare a version for release before generating a deployment script.") # If there is exactly one released versions yet, use the version SQL script as deployment script if len(released_versions) == 1: shutil.copyfile(version_file_path, deployment_file_path) return deployment_file_path # Prepare the schema version template file template_folder = os.path.join(Path(__file__).parent.parent, "templates") schema_deployment_template_file_path = os.path.join( template_folder, "scripts", "schema_deployment_a.b.c.sql") if not os.path.exists(schema_deployment_template_file_path): raise ValueError( f"The schema release version template file `{schema_deployment_template_file_path}` does not exist.") with open(schema_deployment_template_file_path, "r") as f: # Remove copyright line and replace placeholders schema_deployment_script = Template("".join(f.readlines()[1:])) schema_deployment_script = schema_deployment_script.safe_substitute({ "license": get_license_text(project_settings=project_settings), "schema_name": schema_name, "version_target": version, "version_comma_str": ", ".join(str(number) for number in version_as_ints), "section_130_creation_of_helpers": target_version_sections.get("130", {}).get("sql_content", "").strip("\n"), "section_140_non_idempotent_schema_objects": indent( target_version_sections.get("140", {}).get("sql_content", "").strip("\n"), " "), "section_150_idempotent_schema_objects": target_version_sections.get("150", {}).get("sql_content", "").strip("\n"), "section_170_authorization": indent( target_version_sections.get("170", {}).get("sql_content", "").strip("\n"), " "), "section_190_removal_of_helpers": target_version_sections.get("190", {}).get("sql_content", "").strip("\n"), "updatable_versions": ", ".join(f'"{v}"' for v in updatable_versions), }) matches = re.finditer( MSM_LOOP_UPDATABLE_VERSIONS_REGEX, schema_deployment_script, re.MULTILINE | re.DOTALL) # for match_id, match in enumerate(matches, start=1): # print("Match {match_id} was found at {start}-{end}: {match}".format( # match_id=match_id, start=match.start(), end=match.end(), match=match.group())) # for group_id in range(0, len(match.groups())): # group_id = group_id + 1 # print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id, # start=match.start(group_id), end=match.end(group_id), group=match.group(group_id))) for match in reversed(list(matches)): loop_content_template = match.group(3) loop_content = "" needs_indent = match.group(2) if needs_indent is None or needs_indent == "": needs_indent = 0 else: needs_indent = int(needs_indent) for version_from in updatable_versions_sections: loop_version_content = loop_content_template sections_to_replace = reversed(list( re.finditer(MSM_SECTION_PLACEHOLDER_REGEX, loop_content_template))) for s in sections_to_replace: section_id = s.group(1) sql_content = updatable_versions_sections[version_from]["sections"].get( section_id, {}).get("sql_content", "").strip("\n") sql_content_indent = indent( sql_content, " " * needs_indent) if needs_indent else sql_content sql_content_indent += "\n" # If the sql_content has no SQL statements, do not insert if sql_content_has_no_statement(sql_content_indent): sql_content_indent = "" loop_version_content = ( loop_content_template[:s.start()] + sql_content_indent + loop_content_template[s.end():]) loop_content += Template(loop_version_content).safe_substitute({ "version_from": '%d.%d.%d' % tuple(updatable_versions_sections[version_from]["version_from"]), "version_to": '%d.%d.%d' % tuple(updatable_versions_sections[version_from]["version_to"]), }) # Check if the loop_content is empty (TODO: check if it contains no statements) and if so # insert a placeholder if len(updatable_versions_sections) > 0 and loop_content == "" and needs_indent > 0: loop_content = " " * needs_indent + "DO NONE;\n" if sql_content_has_no_statement(loop_content): loop_content = "" schema_deployment_script = ( schema_deployment_script[:match.start()] + loop_content + schema_deployment_script[match.end():]) # Remove empty sections schema_deployment_script_sections = get_script_sections( schema_deployment_script) remove_empty_sections( sections=schema_deployment_script_sections, keep_even_if_empty=["license", "003"]) write_sections_to_file( file_path=deployment_file_path, sections=schema_deployment_script_sections, overwrite_existing=overwrite_existing) # with open(deployment_file_path, "w") as f: # f.write(schema_deployment_script) return deployment_file_path def get_available_licenses() -> list[str]: """Returns the list of available licenses Returns: The available licenses as a list of strings """ template_folder = os.path.join(Path(__file__).parent.parent, "templates") license_path = os.path.join(template_folder, "license") files = [] for file in next(os.walk(license_path))[2]: files.append(file[:-4]) files.sort() return files def substitute_source_statements_with_content(script: str, source_absolute_file_path: str): """Returns the script with substituted source statements Args: script: The script content as string source_absolute_file_path: The absolute path of the referenced script files Returns: The script with substituted source statements """ matches = re.finditer( MSM_SECTION_SOURCE_REGEX, script, re.MULTILINE | re.DOTALL) # for match_id, match in enumerate(matches, start=1): # print("Match {match_id} was found at {start}-{end}: {match}".format( # match_id=match_id, start=match.start(), end=match.end(), match=match.group())) # for group_id in range(0, len(match.groups())): # group_id = group_id + 1 # print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id, # start=match.start(group_id), end=match.end(group_id), group=match.group(group_id))) for match in reversed(list(matches)): source_indention = len(match.group(1)) source_file_path = match.group(2) source_slicing = match.group(3) # If a relative file path is defined in the SOURCE statement, convert it to an absolute path if not source_file_path.startswith("/"): source_file_path = os.path.normpath(os.path.join( source_absolute_file_path, source_file_path)) if not os.path.exists(source_file_path): raise ValueError( f"The give SOURCE file path `{source_file_path}` could not be resolved.") with open(source_file_path, "r") as f: source_file_content = f.read() # If slicing was defined for the source content, only get the relevant substring if source_slicing is not None: if source_slicing.startswith(":"): source_file_content = source_file_content[:int( source_slicing[1:])] else: slices = source_slicing.split(":") source_file_content = source_file_content[int( slices[0]):int(slices[1]) if slices[1] else None] if source_indention > 0: source_file_content = indent(source_file_content, source_indention * " ") script = ( script[:match.start()] + source_file_content + script[match.end():]) return script def get_schema_name(schema_project_path: str): """Gets the schema name from the project settings Args: schema_project_path (str): The path to the schema project. Returns: The schema name """ project_settings = get_project_settings(schema_project_path) schema_name = project_settings.get("schemaName", None) if schema_name is None: raise ValueError( "The schema name could not be read from the project settings.") return schema_name def get_schema_exists( session: object, schema_project_path: str = None, schema_name: str = None) -> bool: """Checks whether the given schema exists Either the schema_project_path or the schema_name can be given. Args: session (object): The database session to use. schema_project_path (str): The path to the schema project. schema_name (str): The name of the schema Returns: True if the schema exists """ if schema_name is None: schema_name = get_schema_name(schema_project_path) return (lib.core.MsmDbExec( 'SELECT COUNT(*) as schema_count FROM information_schema.SCHEMATA ' 'WHERE SCHEMA_NAME = ?' ).exec(session, [schema_name]).first["schema_count"] == 1) def get_schema_is_managed(session: object, schema_project_path: str = None, schema_name: str = None) -> bool: """Checks whether the given schema is managed Either the schema_project_path or the schema_name can be given. Args: session (object): The database session to use. schema_project_path (str): The path to the schema project. schema_name (str): The name of the schema Returns: True if the schema is managed by MSM """ if schema_name is None: schema_name = get_schema_name(schema_project_path) return (lib.core.MsmDbExec( 'SELECT COUNT(*) as table_count FROM information_schema.TABLES ' 'WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND TABLE_TYPE = "VIEW"' ).exec(session, [schema_name, "msm_schema_version"]) .first["table_count"] == 1) def get_schema_version(session: object, schema_project_path: str = None, schema_name: str = None) -> str | None: """Returns the current version of the schema Either the schema_project_path or the schema_name can be given. Args: session (object): The database session to use. schema_project_path (str): The path to the schema project. schema_name (str): The name of the schema Returns: True if the schema is managed by msm """ if schema_name is None: schema_name = get_schema_name(schema_project_path) if get_schema_exists(session=session, schema_name=schema_name): schema_version = lib.core.MsmDbExec( "SELECT CONCAT(major, '.', minor, '.', patch) AS version " f"FROM {lib.core.quote_ident(schema_name)}.`msm_schema_version`" ).exec(session).first["version"] else: schema_version = None return schema_version def deploy_schema( session: object, schema_project_path: str, version: str = None, backup_directory: str = None) -> str: """Deploys the database schema Deploys the given version of the database schema. If no version is given, the latest available version will be deployed. If there is an existing schema version that will be upgraded, a dump of that schema is created in order to be able to roll back. A log will be written during the update. Args: session (object): The database session to use. schema_project_path (str): The path to the schema project. version (str): The version to deploy. backup_directory (str): The directory to be used for backups Returns: None """ project_settings = get_project_settings(schema_project_path) schema_name = project_settings.get("schemaName", None) if schema_name is None: err_msg = ( f"The project settings of `{schema_project_path}` could not be " "read.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise ValueError(err_msg) schema_file_name = project_settings.get("schemaFileName", None) released_versions = get_released_versions( schema_project_path=schema_project_path) deployment_script_versions = get_deployment_script_versions( schema_project_path=schema_project_path) # Check if there are actually any released versions of the schema if len(released_versions) == 0: err_msg = ( f"There are no versions of the schema `{schema_name}` that have " "been released yet.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise Exception(err_msg) # Check if there is a difference in released versions and deployment scripts if released_versions != deployment_script_versions: err_msg = ( "Deployment script(s) missing. Please generate deployment " "scripts for all released versions first.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise Exception(err_msg) # If a specific version is requested, ensure that there is a deployment # script for this version if version is None: version = '%d.%d.%d' % tuple(deployment_script_versions[-1]) elif version not in map( lambda v: '%d.%d.%d' % tuple(v), deployment_script_versions): err_msg = (f"Deployment or update of database schema `{schema_name}` using " f"version {version} requested but there is no deployment script " "available for this version.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise ValueError(err_msg) # Check if the schema already exists schema_exists = get_schema_exists(session=session, schema_name=schema_name) # Check if the schema is actually managed by MSM, and if so, get the version schema_managed = False schema_version = None if schema_exists: schema_managed = get_schema_is_managed( session=session, schema_name=schema_name) if schema_managed: schema_version = get_schema_version( session=session, schema_name=schema_name) if schema_exists and not schema_managed: err_msg = ( f"Deployment or update of database schema `{schema_name}` using " f"version {version} requested but the schema is not managed by " "MSM.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise Exception(err_msg) # If the requested version already matches the version, exit since there is # nothing to do if schema_version is not None and schema_version == version: info_msg = ( f"Deployment or update of database schema `{schema_name}` using " f"version {version} requested but the schema is already on the " "requested version. No changes performed." ) lib.core.write_to_msm_schema_update_log("INFO", info_msg) return info_msg # Check if the current version of the schema is in the list of versions # that can be upgraded by the deployment scripts if schema_version is not None: updatable_versions = get_updatable_versions(schema_project_path) updatable_versions_str = map( lambda v: '%d.%d.%d' % tuple(v), updatable_versions) if (len(updatable_versions) > 0 and lib.core.convert_version_str_to_list(schema_version) > updatable_versions[-1]): info_msg = ( f"The database schema `{schema_name}` is on a newer version " f"{schema_version} than shipped with this project (version " f"{'%d.%d.%d' % tuple(updatable_versions[-1])}). No changes " "performed." ) lib.core.write_to_msm_schema_update_log("INFO", info_msg) return info_msg if not schema_version in updatable_versions_str: err_msg = ( f"Update of database schema `{schema_name}` to version " f"{version} requested but the version {schema_version} cannot " "be updated.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise Exception(err_msg) # Log start of the if not schema_exists: lib.core.write_to_msm_schema_update_log( "INFO", f"Starting deployment of database schema `{schema_name}` using " f"version {version} ...") else: lib.core.write_to_msm_schema_update_log( "INFO", f"Starting update of database schema `{schema_name}` version " f"{schema_version} to version {version} ...") # Perform dump if the schema exists backup_available = False if schema_exists: lib.core.write_to_msm_schema_update_log( "INFO", f"Preparing dump of `{schema_name}` version " f"{schema_version} in order to be roll back in case of an error.") if backup_directory is None: backup_directory = os.path.join( lib.core.get_msm_plugin_data_path(), "backups", f"{schema_file_name}_backup_{schema_version}") # If that directory already exists, keep appending counter until # a new directory is found i = 2 candidate = backup_directory while os.path.exists(candidate): candidate = f"{backup_directory}_{i}" i += 1 backup_directory = candidate os.makedirs(backup_directory, exist_ok=True) # Set the mysqlsh session to the one that was given if "shell.Object" in str(type(session)): mysqlsh.globals.shell.set_session(session) else: mysqlsh.globals.shell.set_session(session.session) # Ensure that the dump can be read back in case of a failure by setting # local_infile to 1 # cSpell:ignore infile row = lib.core.MsmDbExec("SELECT @@local_infile as local_infile").exec(session).first original_local_infile = (row and int(row["local_infile"]) == 1) if not original_local_infile: try: lib.core.write_to_msm_schema_update_log( "INFO", "Enabling local_infile option in order to be able to load " "back the schema dump in case of an update error...") lib.core.MsmDbExec("SET GLOBAL local_infile=1").exec(session) except: err_msg = ( "Failed to enable the local_infile option. Please execute " "SET PERSIST GLOBAL local_infile=1; on the MySQL Server.") lib.core.write_to_msm_schema_update_log("ERROR", err_msg) raise Exception(err_msg) lib.core.write_to_msm_schema_update_log( "INFO", f"Creating dump of `{schema_name}` version {schema_version} ...") mysqlsh.globals.util.dump_schemas( [schema_name], f"file://{backup_directory}", { "skipUpgradeChecks": True, "showProgress": False, }) backup_available = True # Run deployment script try: lib.core.execute_msm_sql_script( session=session, sql_file_path=os.path.join( schema_project_path, "releases", "deployment", f"{schema_file_name}_deployment_{version}.sql")) if not schema_exists: info_msg = ( f"Deployment of `{schema_name}` version " f"{version} completed successfully.") else: if not original_local_infile: lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session) lib.core.write_to_msm_schema_update_log( "INFO", "Restored local_infile option.") info_msg = ( f"Completed the update of `{schema_name}` version " f"{schema_version} to {version} successfully.") lib.core.write_to_msm_schema_update_log("INFO", info_msg) # Remove the backup directory as it is no longer needed if backup_available: shutil.rmtree(backup_directory) return info_msg except Exception as e: # Drop the schema after failed update try: lib.core.MsmDbExec( f"DROP SCHEMA IF EXISTS {lib.core.quote_ident(schema_name)}" ).exec(session) except: pass # Restore the backup if available if backup_available: try: mysqlsh.globals.util.load_dump( f"file://{backup_directory}", { "showMetadata": False, "showProgress": False, "ignoreVersion": True, }) if not original_local_infile: lib.core.MsmDbExec("SET GLOBAL local_infile=0").exec(session) lib.core.write_to_msm_schema_update_log( "INFO", "Restored local_infile option.") # Remove the backup directory as it is no longer needed shutil.rmtree(backup_directory) except Exception as e_dump_load: err_str = ( "An error occurred while updating the database schema " f"`{schema_name}` to version {version}. The schema could " f"not be restored back to version {schema_version}. {e} " f"{e_dump_load}") lib.core.write_to_msm_schema_update_log("ERROR", err_str) raise Exception(err_str) err_str = ( "An error occurred while updating the database schema " f"`{schema_name}` to version {version}. The schema has been " f"restored back to version {schema_version}. {e}" ) lib.core.write_to_msm_schema_update_log("ERROR", err_str) raise Exception(err_str) else: if not schema_exists: err_str = ( f"Deploying the database schema `{schema_name}` failed. {e}" ) lib.core.write_to_msm_schema_update_log("ERROR", err_str) raise Exception(err_str) else: err_str = ( "An error occurred while updating the database schema " f"`{schema_name}` to version {version}. {e}" ) lib.core.write_to_msm_schema_update_log("ERROR", err_str) raise Exception(err_str) def execute_msm_sql_script(session, sql_script: str = None, script_name: str = None, sql_file_path: str = None): """ Execute a SQL script on the MySQL Server and log the progress in the MSM log file. The SQL script can be provided as a string, or as a path to a file. Args: session (object): The MySQL session to use. sql_script (str): The SQL script to execute. script_name (str): The name of the SQL script. sql_file_path (str): The path to the SQL script file. Returns: None """ lib.core.execute_msm_sql_script( session, sql_script=sql_script, script_name=script_name, sql_file_path=sql_file_path)