cicd-deployers/metadata_deployer.py (111 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import subprocess import os import shutil import json import sys import argparse import collections def run_deploy_data_mesh(config_file, tag_template_directories, policy_directories, lake_directories, annotation_directories, overwrite): """Runs the 'deploy_data_mesh.py' script with provided arguments. Args: config_file (str): Path to the configuration JSON file. tag_template_directories (str): Path to the tag template directories. policy_directories (str): Path to the policy taxonomies directories. lake_directories (str): Path to the lake directories. annotation_directories (str): Path to the annotation directories. overwrite (bool): Whether to overwrite existing data. """ src_code_path = "metadata/metadata-deployer/cortex_src_code" if os.path.exists(src_code_path): shutil.rmtree(src_code_path) subprocess.run(["git", "rm", "-rf", "--cached", src_code_path], check=True) subprocess.run(["git", "submodule", "add", "-f", "https://github.com/GoogleCloudPlatform/cortex-data-foundation.git", src_code_path]) command = [ "python3", "metadata/metadata-deployer/cortex_src_code/src/common/data_mesh/deploy_data_mesh.py", "--config-file", config_file, "--tag-template-directories", tag_template_directories, "--policy-directories", policy_directories, "--lake-directories", lake_directories, "--annotation-directories", annotation_directories ] if overwrite != "false": command.append("--overwrite") requirements_path = "metadata/metadata-deployer/cortex_src_code/requirements.in" # Check if dependencies are installed if not all([os.path.exists(req) for req in open(requirements_path)]): new_lines = [ "exceptiongroup", "google-api-core", "google-cloud-bigquery", "google-cloud-bigquery-datapolicies", "google-cloud-datacatalog", "google-cloud-dataplex", ] add_lines_to_file(requirements_path, new_lines) subprocess.run([f"pip", "install", "-r", requirements_path], check=True) subprocess.run(command, check=True) def write_json_file(project_id, location, output_filename="cortex_config.json"): """Creates a JSON file with the specified project ID and location. Args: project_id (str): The project ID to include in the JSON. location (str): The location to include in the JSON. output_filename (str, optional): The filename of the output JSON file. Defaults to "config.json". """ data = { "deployDataMesh": True, "projectIdSource": project_id, "projectIdTarget": project_id, "targetBucket": f"{project_id}-cortex-tmp-bucket", "location": location, "DataMesh": { "deployDescriptions": True, "deployLakes": True, "deployCatalog": True, "deployACLs": True }, "k9": { "datasets": { "processing": "K9_PROCESSING", "reporting": "K9_REPORTING" } }, "VertexAI": { "region": "REGION", "processingDataset": "VERTEXDATASET" } } with open(output_filename, "w") as outfile: json.dump(data, outfile, indent=4) return output_filename def add_lines_to_file(filename, new_lines): """Appends new lines to the end of a file, handling potential errors. Args: filename (str): The path to the flat file. new_lines (list): A list of strings representing the new lines to add. """ try: with open(filename, 'a') as file: file.write('\n') for line in new_lines: file.write(line + '\n') print("Lines added successfully!") with open(filename, 'r') as file: file_content = file.read() new_content = file_content.replace("requests>=2.32.*", "requests==2.32.*") with open(filename, 'w') as file: file.write(new_content) except FileNotFoundError: print(f"Error: File '{filename}' not found.") except IOError: print(f"Error: An I/O error occurred while working with '{filename}'.") def main(args: collections.abc.Sequence[str]) -> int: parser = argparse.ArgumentParser(description="Cortex Data Mesh Deployer") parser.add_argument("--project_id", type=str, required=True, help="Project where metadata (lakes, zones, tags, etc.) will be deployed.") parser.add_argument("--location", type=str, required=True, help="Location where metadata (lakes, zones, tags, etc.) will be deployed.") parser.add_argument("--overwrite", type=str, required=True, help="Whether to overwrite existing metadata") params = parser.parse_args(args) project_id = str(params.project_id) location = str(params.location) overwrite = str(params.overwrite) run_deploy_data_mesh( config_file=write_json_file(project_id, location), tag_template_directories="../metadata/tag_templates", policy_directories="../metadata/policy_taxonomies", lake_directories="../metadata/lakes", annotation_directories="../metadata/annotations", overwrite=overwrite ) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))