bigquery_etl/cli/stage.py (482 lines of code) (raw):

"""bigquery-etl CLI stage commands.""" import re import shutil import tempfile from datetime import datetime from glob import glob from pathlib import Path import rich_click as click from google.cloud import bigquery from google.cloud.bigquery.enums import EntityTypes from google.cloud.exceptions import NotFound from .. import ConfigLoader from ..cli.query import deploy as deploy_query_schema from ..cli.query import update as update_query_schema from ..cli.routine import publish as publish_routine from ..cli.utils import paths_matching_name_pattern, sql_dir_option from ..cli.view import publish as publish_view from ..dryrun import DryRun, get_id_token from ..routine.parse_routine import ( ROUTINE_FILES, UDF_FILE, RawRoutine, accumulate_dependencies, read_routine_dir, ) from ..schema import SCHEMA_FILE, Schema from ..util.common import render from ..view import View VIEW_FILE = "view.sql" QUERY_FILE = "query.sql" QUERY_SCRIPT = "query.py" MATERIALIZED_VIEW = "materialized_view.sql" ROOT = Path(__file__).parent.parent.parent TEST_DIR = ROOT / "tests" / "sql" @click.group(help="Commands for managing stage deploys") def stage(): """Create the CLI group for the stage command.""" pass @stage.command( help=""" Deploy artifacts to the configured stage project. The order of deployment is: UDFs, views, tables. Examples: ./bqetl stage deploy sql/moz-fx-data-shared-prod/telemetry_derived/ """ ) @click.argument( "paths", nargs=-1, ) @click.option( "--project-id", "--project_id", help="GCP project to deploy artifacts to", default="bigquery-etl-integration-test", ) @sql_dir_option @click.option( "--dataset-suffix", "--dataset_suffix", help="Suffix appended to the deployed dataset", ) @click.option( "--update-references", "--update_references", default=True, is_flag=True, help="Update references to deployed artifacts with project and dataset artifacts have been deployed to.", ) @click.option( "--copy-sql-to-tmp-dir", "--copy_sql_to_tmp_dir", help="Copy existing SQL from the sql_dir to a temporary directory and apply updates there.", default=False, is_flag=True, ) @click.option( "--remove-updated-artifacts", "--remove_updated_artifacts", default=False, is_flag=True, help="Remove artifacts that have been updated and deployed to stage from prod folder. This ensures that" + " tests don't run on outdated or undeployed artifacts (required for CI)", ) @click.pass_context def deploy( ctx, paths, project_id, sql_dir, dataset_suffix, update_references, copy_sql_to_tmp_dir, remove_updated_artifacts, ): """Deploy provided artifacts to destination project.""" if copy_sql_to_tmp_dir: # copy SQL to a temporary directory tmp_dir = Path(tempfile.mkdtemp()) tmp_dir.mkdir(parents=True, exist_ok=True) new_sql_dir = tmp_dir / Path(sql_dir).name shutil.copytree(sql_dir, new_sql_dir, dirs_exist_ok=True) # rename paths to tmp_dir paths = [path.replace(sql_dir, f"{new_sql_dir}/", 1) for path in paths] sql_dir = new_sql_dir artifact_files = set() # get SQL files for artifacts that are to be deployed for path in paths: artifact_files.update( [ p for p in paths_matching_name_pattern( path, sql_dir, None, files=["*.sql", "*.py"] ) if p.suffix in [".sql", ".py"] and p.name != "checks.sql" and len(p.suffixes) == 1 ] ) # any dependencies need to be determined an deployed as well since the stage # environment doesn't have access to the prod environment artifact_files.update(_udf_dependencies(artifact_files)) artifact_files.update(_view_dependencies(artifact_files, sql_dir)) # update references of all deployed artifacts # references needs to be set to the stage project and the new dataset identifier if update_references: _update_references(artifact_files, project_id, dataset_suffix, sql_dir) updated_artifact_files = set() (Path(sql_dir) / project_id).mkdir(parents=True, exist_ok=True) # copy updated files locally to a folder representing the stage env project for artifact_file in artifact_files: artifact_project = artifact_file.parent.parent.parent.name artifact_dataset = artifact_file.parent.parent.name artifact_name = artifact_file.parent.name artifact_test_path = ( TEST_DIR / artifact_project / artifact_dataset / artifact_name ) if ( artifact_dataset == "INFORMATION_SCHEMA" or "INFORMATION_SCHEMA" in artifact_name ): continue new_artifact_dataset = ( f"{artifact_dataset}_{artifact_project.replace('-', '_')}" ) if dataset_suffix: new_artifact_dataset = f"{new_artifact_dataset}_{dataset_suffix}" if artifact_file.name == MATERIALIZED_VIEW: # replace CREATE MATERIALIED VIEW statement sql_content = render( artifact_file.name, template_folder=str(artifact_file.parent), format=False, ) sql_content = re.sub( "CREATE MATERIALIZED VIEW.*?AS", "", sql_content, flags=re.DOTALL, ) artifact_file.write_text(sql_content) # map materialized views to normal queries query_path = Path(artifact_file.parent, QUERY_FILE) artifact_file.rename(query_path) artifact_file = query_path new_artifact_path = ( Path(sql_dir) / project_id / new_artifact_dataset / artifact_name ) new_artifact_path.mkdir(parents=True, exist_ok=True) shutil.copytree(artifact_file.parent, new_artifact_path, dirs_exist_ok=True) updated_artifact_files.add(new_artifact_path / artifact_file.name) # copy tests to the right structure if artifact_test_path.exists(): new_artifact_test_path = ( TEST_DIR / project_id / new_artifact_dataset / artifact_name ) shutil.copytree( artifact_test_path, new_artifact_test_path, dirs_exist_ok=True ) if remove_updated_artifacts: shutil.rmtree(artifact_test_path) # rename test files for test_file_path in map(Path, glob(f"{TEST_DIR}/**/*", recursive=True)): test_file_suffix = test_file_path.suffix for artifact_file in artifact_files: artifact_project = artifact_file.parent.parent.parent.name artifact_dataset = artifact_file.parent.parent.name artifact_name = artifact_file.parent.name if test_file_path.name in ( f"{artifact_project}.{artifact_dataset}.{artifact_name}{test_file_suffix}", f"{artifact_project}.{artifact_dataset}.{artifact_name}.schema{test_file_suffix}", ) or ( test_file_path.name in ( f"{artifact_dataset}.{artifact_name}{test_file_suffix}", f"{artifact_dataset}.{artifact_name}.schema{test_file_suffix}", ) and artifact_project in test_file_path.parent.parts ): new_artifact_dataset = ( f"{artifact_dataset}_{artifact_project.replace('-', '_')}" ) if dataset_suffix: new_artifact_dataset = f"{new_artifact_dataset}_{dataset_suffix}" new_test_file_name = ( f"{project_id}.{new_artifact_dataset}.{artifact_name}" ) if test_file_path.name.endswith(f".schema{test_file_suffix}"): new_test_file_name += ".schema" new_test_file_name += test_file_suffix new_test_file_path = test_file_path.parent / new_test_file_name if not new_test_file_path.exists(): test_file_path.rename(new_test_file_path) break # remove artifacts from the "prod" folders if remove_updated_artifacts: for artifact_file in artifact_files: if artifact_file.parent.exists(): shutil.rmtree(artifact_file.parent) # deploy to stage _deploy_artifacts(ctx, updated_artifact_files, project_id, dataset_suffix, sql_dir) def _udf_dependencies(artifact_files): """Determine UDF dependencies.""" udf_dependencies = set() raw_routines = read_routine_dir() udf_files = [file for file in artifact_files if file.name == UDF_FILE] for udf_file in udf_files: # all referenced UDFs need to be deployed in the same stage project due to access restrictions raw_routine = RawRoutine.from_file(udf_file) udfs_to_publish = accumulate_dependencies([], raw_routines, raw_routine.name) for dependency in udfs_to_publish: if dependency in raw_routines: file_path = Path(raw_routines[dependency].filepath) udf_dependencies.add(file_path) return udf_dependencies def _view_dependencies(artifact_files, sql_dir): """Determine view dependencies.""" view_dependencies = set() view_dependency_files = [file for file in artifact_files if file.name == VIEW_FILE] id_token = get_id_token() for dep_file in view_dependency_files: # all references views and tables need to be deployed in the same stage project if dep_file not in artifact_files: view_dependencies.add(dep_file) if dep_file.name == VIEW_FILE: view = View.from_file(dep_file, id_token=id_token) for dependency in view.table_references: dependency_components = dependency.split(".") if dependency_components[1:2] == ["INFORMATION_SCHEMA"]: dependency_components.insert(0, view.project) if dependency_components[2:3] == ["INFORMATION_SCHEMA"]: # INFORMATION_SCHEMA has more components that will be treated as the table name # no deploys for INFORMATION_SCHEMA will happen later on dependency_components = dependency_components[:2] + [ ".".join(dependency_components[2:]) ] if len(dependency_components) != 3: raise ValueError( f"Invalid table reference {dependency} in view {view.name}. " "Tables should be fully qualified, expected format: project.dataset.table." ) project, dataset, name = dependency_components file_path = Path(view.path).parent.parent.parent / dataset / name file_exists_for_dependency = False for file in [VIEW_FILE, QUERY_FILE, QUERY_SCRIPT, MATERIALIZED_VIEW]: if (file_path / file).is_file(): if (file_path / file) not in artifact_files: view_dependency_files.append(file_path / file) file_exists_for_dependency = True break path = Path(sql_dir) / project / dataset / name if not path.exists(): path.mkdir(parents=True, exist_ok=True) # don't create schema for wildcard and metadata tables if "*" not in name and name != "INFORMATION_SCHEMA": partitioned_by = "submission_timestamp" schema = Schema.for_table( project=project, dataset=dataset, table=name, id_token=id_token, partitioned_by=partitioned_by, ) schema.to_yaml_file(path / SCHEMA_FILE) if not file_exists_for_dependency: (path / QUERY_SCRIPT).write_text( "# Table stub generated by stage deploy" ) view_dependencies.add(path / QUERY_SCRIPT) # extract UDF references from view definition raw_routines = read_routine_dir() udf_dependencies = set() for udf_dependency in view.udf_references: routine = raw_routines[udf_dependency] udf_dependencies.add(Path(routine.filepath)) # determine UDF dependencies recursively view_dependencies.update(_udf_dependencies(udf_dependencies)) view_dependencies.update(udf_dependencies) return view_dependencies def _update_references(artifact_files, project_id, dataset_suffix, sql_dir): replace_references = [] replace_partial_references = [] for artifact_file in artifact_files: name = artifact_file.parent.name name_pattern = name.replace("*", r"\*") # match literal * original_dataset = artifact_file.parent.parent.name original_project = artifact_file.parent.parent.parent.name deployed_dataset = original_dataset if original_dataset not in ( "INFORMATION_SCHEMA", "region-eu", "region-us", ): deployed_dataset += f"_{original_project.replace('-', '_')}" if dataset_suffix: deployed_dataset += f"_{dataset_suffix}" deployed_project = project_id # Replace references, preserving fully quoted references. replace_partial_references += [ # partially qualified references (like "telemetry.main") ( re.compile(rf"(?<![\._])`{original_dataset}\.{name_pattern}`"), f"`{deployed_project}.{deployed_dataset}.{name}`", original_project, ), ( re.compile( rf"(?<![\._])`?{original_dataset}`?\.`?{name_pattern}(?![a-zA-Z0-9_])`?" ), f"`{deployed_project}`.`{deployed_dataset}`.`{name}`", original_project, ), ] replace_references += [ # fully qualified references (like "moz-fx-data-shared-prod.telemetry.main") ( re.compile( rf"`{original_project}\.{original_dataset}\.{name_pattern}`" ), f"`{deployed_project}.{deployed_dataset}.{name}`", original_project, ), ( re.compile( rf"(?<![a-zA-Z0-9_])`?{original_project}`?\.`?{original_dataset}`?\.`?{name_pattern}(?![a-zA-Z0-9_])`?" ), f"`{deployed_project}`.`{deployed_dataset}`.`{name}`", original_project, ), ] for path in map(Path, glob(f"{sql_dir}/**/*.sql", recursive=True)): # apply substitutions if path.is_file(): if "is_init()" in path.read_text(): init_sql = render( path.name, template_folder=path.parent, format=False, **{"is_init": lambda: True}, ) query_sql = render( path.name, template_folder=path.parent, format=False, **{"is_init": lambda: False}, ) sql = f""" {{% if is_init() %}} {init_sql} {{% else %}} {query_sql} {{% endif %}} """ else: sql = render(path.name, template_folder=path.parent, format=False) for ref in replace_references: sql = re.sub(ref[0], ref[1], sql) for ref in replace_partial_references: file_project = path.parent.parent.parent.name if file_project == ref[2]: sql = re.sub(ref[0], ref[1], sql) path.write_text(sql) def _deploy_artifacts(ctx, artifact_files, project_id, dataset_suffix, sql_dir): """Deploy routines, tables and views.""" # give read permissions to dry run accounts dataset_access_entries = [ bigquery.AccessEntry( role="READER", entity_type=EntityTypes.USER_BY_EMAIL, entity_id=dry_run_account, ) for dry_run_account in ConfigLoader.get( "dry_run", "function_accounts", fallback=[] ) ] # deploy routines routine_files = [file for file in artifact_files if file.name in ROUTINE_FILES] for routine_file in routine_files: dataset = routine_file.parent.parent.name create_dataset_if_not_exists( project_id=project_id, dataset=dataset, suffix=dataset_suffix, access_entries=dataset_access_entries, ) ctx.invoke(publish_routine, name=None, project_id=project_id, dry_run=False) # deploy table schemas query_files = list( { file for file in artifact_files if file.name in [QUERY_FILE, QUERY_SCRIPT] # don't attempt to deploy wildcard or metadata tables and "*" not in file.parent.name and file.parent.name != "INFORMATION_SCHEMA" } ) if len(query_files) > 0: # checking and creating datasets needs to happen sequentially for query_file in query_files: dataset = query_file.parent.parent.name create_dataset_if_not_exists( project_id=project_id, dataset=dataset, suffix=dataset_suffix, access_entries=dataset_access_entries, ) ctx.invoke( update_query_schema, name=query_files, sql_dir=sql_dir, project_id=project_id, respect_dryrun_skip=True, is_init=True, ) ctx.invoke( deploy_query_schema, name=query_files, sql_dir=sql_dir, project_id=project_id, force=True, respect_dryrun_skip=False, skip_external_data=True, ) # deploy views view_files = [ file for file in artifact_files if file.name == VIEW_FILE and str(file) not in DryRun.skipped_files() ] for view_file in view_files: dataset = view_file.parent.parent.name create_dataset_if_not_exists( project_id=project_id, dataset=dataset, suffix=dataset_suffix, access_entries=dataset_access_entries, ) ctx.invoke( publish_view, name=None, sql_dir=sql_dir, project_id=project_id, dry_run=False, skip_authorized=False, force=True, respect_dryrun_skip=True, ) def create_dataset_if_not_exists(project_id, dataset, suffix=None, access_entries=None): """Create a temporary dataset if not already exists.""" client = bigquery.Client(project_id) dataset = bigquery.Dataset(f"{project_id}.{dataset}") dataset.location = "US" dataset = client.create_dataset(dataset, exists_ok=True) dataset.default_table_expiration_ms = 60 * 60 * 1000 * 12 # ms # mark dataset as expired 12 hours from now; can be removed by CI expiration = int( ((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() + 60 * 60 * 12) * 1000 ) dataset.labels = {"expires_on": expiration} if suffix: dataset.labels["suffix"] = suffix if access_entries: dataset.access_entries = dataset.access_entries + access_entries return client.update_dataset( dataset, ["default_table_expiration_ms", "labels", "access_entries"] ) @stage.command( help=""" Remove deployed artifacts from stage environment Examples: ./bqetl stage clean """ ) @click.option( "--project-id", "--project_id", help="GCP project to deploy artifacts to", default="bigquery-etl-integration-test", ) @click.option( "--dataset-suffix", "--dataset_suffix", help="Suffix appended to the deployed dataset", ) @click.option( "--delete-expired", "--delete_expired", help="Remove all datasets that have expired including those that do not match the suffix", default=False, is_flag=True, ) def clean(project_id, dataset_suffix, delete_expired): """Reset the stage environment.""" client = bigquery.Client(project_id) dataset_filter = ( None if delete_expired is True else f"labels.suffix:{dataset_suffix}" ) datasets = client.list_datasets(filter=dataset_filter) current_timestamp = int( (datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() * 1000 ) for dataset in datasets: if dataset.labels: # remove datasets that either match the suffix or are expired for label, value in dataset.labels.items(): if ( dataset_suffix and label == "suffix" and value == dataset_suffix ) or ( delete_expired and label == "expires_on" and int(value) < current_timestamp ): click.echo(f"Deleting dataset {dataset.full_dataset_id}") try: client.delete_dataset(dataset, delete_contents=True) except NotFound as e: # account for other concurrent CI runs click.echo(f"Failed to delete: {e.message}")