bigquery_etl/metadata/validate_metadata.py (349 lines of code) (raw):

"""Validate metadata files.""" import logging import os from argparse import ArgumentParser from pathlib import Path import click import yaml from google.cloud import bigquery from bigquery_etl.config import ConfigLoader from bigquery_etl.schema import SCHEMA_FILE, Schema from ..util import extract_from_query_path, standard_args from ..util.common import extract_last_group_by_from_query, project_dirs from .parse_metadata import DatasetMetadata, Metadata parser = ArgumentParser(description=__doc__) parser.add_argument("--target", help="File or directory containing metadata files") standard_args.add_log_level(parser) CODEOWNERS_FILE = "CODEOWNERS" CHANGE_CONTROL_LABEL = "change_controlled" SHREDDER_MITIGATION_LABEL = "shredder_mitigation" ID_LEVEL_COLUMNS_FILE_PATH = Path(__file__).parent / "id_level_columns.yaml" def validate_public_data(metadata, path): """Check if the metadata for public data queries is valid.""" is_valid = True if metadata.is_public_bigquery() or metadata.is_public_json(): if not metadata.review_bugs(): logging.error(f"Missing review bug for public data: {path}") is_valid = False return is_valid def validate_change_control( file_path, metadata, codeowners_file, project_id=ConfigLoader.get( "default", "project", fallback="moz-fx-data-shared-prod" ), sql_dir=ConfigLoader.get("default", "sql_dir", fallback="sql"), ): """Verify that a query is correctly setup for change control.""" path_to_add = file_path.partition(f"{project_id}/")[2] path_in_codeowners = os.path.join(sql_dir, project_id, path_to_add) has_change_control = CHANGE_CONTROL_LABEL in metadata.labels if has_change_control: # This label requires to have at least one owner in the metadata file. # And for any of the owners, at least one entry in the CODEOWNERS file. # The owners can be emails or GitHub identities e.g. mozilla/team_name. if len(metadata.owners) == 0: click.echo( click.style( f"The metadata for {file_path} has the label" f" change_controlled but it's missing owners." ) ) return False with open(codeowners_file, "r") as owners_file: file_content = owners_file.readlines() content = [line for line in file_content if not line.startswith("#")] owners_list = [] for owner in metadata.owners: if "@" not in owner: owner = f"@{owner}" owners_list.append(owner) sample_row_all_owners = f"/{path_in_codeowners} {(' '.join(owners_list))}" if not [line for line in content if path_in_codeowners in line]: click.echo( click.style( f"ERROR: This query has label `change_controlled` which " f"requires the query path and at least one of the owners in " f"CODEOWNERS. Sample row expected: {sample_row_all_owners}" ) ) return False for line in content: if path_in_codeowners in line and not any( owner in line for owner in owners_list ): click.echo( click.style( f"ERROR: This query has label `change_controlled` which " f"requires at least one of the owners to be registered " f"in CODEOWNERS. Sample row expected: \n" f"{sample_row_all_owners}" ) ) return False return True def validate_shredder_mitigation(query_dir, metadata): """Check queries with shredder mitigation label comply with requirements.""" has_shredder_mitigation = SHREDDER_MITIGATION_LABEL in metadata.labels table_not_empty = True if has_shredder_mitigation: schema_file = Path(query_dir) / SCHEMA_FILE if not schema_file.exists(): click.echo( click.style( f"Table {query_dir} does not have schema.yaml required for shredder mitigation.", fg="yellow", ) ) return False schema = Schema.from_schema_file(schema_file).to_bigquery_schema() # This label requires that the query doesn't have id-level columns, # has a group by that is explicit & all schema columns have descriptions. query_file = Path(query_dir) / "query.sql" query_group_by = extract_last_group_by_from_query(sql_path=query_file) # Validate that this label is only applied to tables in version 1 if they're not empty # If the table is empty, it should be backfilled before applying the label. project, dataset, table = extract_from_query_path(Path(query_dir)) client = bigquery.Client(project=project) error_message = ( f"The shredder-mitigation label can only be applied to existing and " f"non-empty tables.\nEnsure that the table `{project}.{dataset}.{table}` is deployed" f" and run a managed backfill without mitigation before applying this label to" f" a new or empty table." f"\n\nSubsequent backfills then can use the [shredder mitigation process]" f"(https://mozilla.github.io/bigquery-etl/cookbooks/creating_a_derived_dataset/#initiating-the-backfill)." ) # Check that the table exists and it's not empty. query_table_is_not_empty = ( f"SELECT EXISTS (SELECT 1 " f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` " f"WHERE table_name = '{table}' LIMIT 1) AS not_empty;" ) try: table_not_empty = client.query(query_table_is_not_empty).result() except Exception: click.echo( click.style( f"Table {project}.{dataset}.{table} not found or inaccessible." f" for validation. Please check that the name is correct and if the table" f" is in a private repository, ensure that it exists and has data before" f" running a backfill with shredder mitigation.", fg="yellow", ) ) if table_not_empty is None or table_not_empty is False: click.echo(click.style(error_message, fg="yellow")) return False # Validate that the query group by is explicit and as required. integers_in_group_by = False for e in query_group_by: try: int(e) integers_in_group_by = True except ValueError: continue if ( "ALL" in query_group_by or not all(isinstance(e, str) for e in query_group_by) or not query_group_by or integers_in_group_by ): click.echo( "Shredder mitigation validation failed, GROUP BY must use an explicit list " "of columns. Avoid expressions like `GROUP BY ALL` or `GROUP BY 1, 2, 3`." ) return False with open(ID_LEVEL_COLUMNS_FILE_PATH, "r") as columns_file: columns_from_file = yaml.safe_load(columns_file) id_level_columns = columns_from_file.get("id_level_columns", []) for field in schema: # Validate that the query columns have descriptions. if not field.description: click.echo( f"Shredder mitigation validation failed, {field.name} does not have " f"a description in the schema." ) return False # Validate that id-level columns are not present in the query schema. if field.name in id_level_columns: click.echo( f"Shredder mitigation validation failed, {field.name} is an id-level" f" column that is not allowed for this type of backfill." ) return False return True def validate_col_desc_enforced(query_dir, metadata): """Check schemas with require_column_descriptions = True comply with requirements.""" if not metadata.require_column_descriptions: return True schema_file = Path(query_dir) / SCHEMA_FILE if not schema_file.exists(): click.echo( click.style( f"Table {query_dir} does not have schema.yaml required for column descriptions.", fg="yellow", ) ) return False schema = Schema.from_schema_file(schema_file).to_bigquery_schema() def validate_fields(fields, parent_path=""): """Recursively validate field descriptions, including nested fields.""" for field in fields: field_path = f"{parent_path}.{field.name}" if parent_path else field.name if not field.description: click.echo( f"Column description validation failed for {query_dir}, " f"{field_path} does not have a description in the schema." ) return False # If the field has nested fields (e.g., RECORD type), recurse if hasattr(field, "fields") and field.fields: if not validate_fields(field.fields, field_path): return False return True return validate_fields(schema) def validate_deprecation(metadata, path): """Check that deprecated is True when deletion date exists.""" if metadata.deletion_date and not metadata.deprecated: click.echo( f"Deletion date should only be added when table is deprecated in {path}" ) return False return True def validate_exclusion_list_expiration_days(metadata, path): """Check if any of the retention exclusion tables have expiration_days set.""" is_valid = True retention_exclusion_list = set( ConfigLoader.get("retention_exclusion_list", fallback=[]) ) normalized_path = str(Path(path).parent) if normalized_path in retention_exclusion_list: # Access and check expiration_days metadata expiration_days = ( metadata.bigquery.time_partitioning.expiration_days if metadata and getattr(metadata, "bigquery", None) and getattr(metadata.bigquery, "time_partitioning", None) else None ) if expiration_days is not None: click.echo( click.style( f"ERROR: Table at {path} is in the retention exclusion list but has expiration_days set to {expiration_days}.", fg="red", ) ) is_valid = False return is_valid def validate_workgroup_access(metadata, path): """Check if there are any specifications of table-level access that are redundant with dataset access.""" is_valid = True dataset_metadata_path = Path(path).parent.parent / "dataset_metadata.yaml" if not dataset_metadata_path.exists(): return is_valid dataset_metadata = DatasetMetadata.from_file(dataset_metadata_path) default_table_workgroup_access_dict = { workgroup_access.get("role"): workgroup_access.get("members", []) for workgroup_access in dataset_metadata.default_table_workgroup_access if dataset_metadata.default_table_workgroup_access } if metadata.workgroup_access: for table_workgroup_access in metadata.workgroup_access: if table_workgroup_access.role in default_table_workgroup_access_dict: for table_workgroup_member in table_workgroup_access.members: if ( table_workgroup_member in default_table_workgroup_access_dict[ table_workgroup_access.role ] ): is_valid = False click.echo( click.style( f"ERROR: Redundant table-level access specification in {path}. " + f"Table-level access for {table_workgroup_access.role}: {table_workgroup_member} defined in dataset_metadata.yaml.", fg="red", ) ) return is_valid def validate_default_table_workgroup_access(path): """ Check that default_table_workgroup_access does not exist in metadata. default_table_workgroup_access will be generated from workgroup_access and should not be overridden. """ is_valid = True dataset_metadata_path = Path(path).parent.parent / "dataset_metadata.yaml" if not dataset_metadata_path.exists(): return is_valid with open(dataset_metadata_path, "r") as yaml_stream: try: metadata = yaml.safe_load(yaml_stream) except yaml.YAMLError as e: raise e if "default_table_workgroup_access" in metadata: is_valid = False click.echo( click.style( f"ERROR: default_table_workgroup_access should not be explicity specified in {dataset_metadata_path}. " + "The default_table_workgroup_access configuration will be automatically generated.", fg="red", ) ) return is_valid def validate_retention_policy_based_on_table_type(metadata, path): """Check if any of the retention exclusion tables have expiration_days set.""" is_valid = True table_type = ( metadata.labels.get("table_type") if isinstance(metadata.labels, dict) else None ) expiration_days = ( metadata.bigquery.time_partitioning.expiration_days if metadata and getattr(metadata, "bigquery", None) and getattr(metadata.bigquery, "time_partitioning", None) else None ) # retention_exclusion_list = set( # ConfigLoader.get("retention_exclusion_list", fallback=[]) # ) # normalized_path = str(Path(path).parent) if expiration_days is not None and table_type == "aggregate": click.echo( click.style( f"ERROR: Table at {path} is an aggregate table but has expiration_days set to {expiration_days}.", fg="red", ) ) is_valid = False # The below line of code should be uncommented when the retention project is completed # if ( # expiration_days is None # and table_type == "client_level" # and normalized_path not in retention_exclusion_list # ): # click.echo( # click.style( # f"ERROR: Table at {path} is an client level table and needs expiration_days to be set", # fg="red", # ) # ) # is_valid = False return is_valid class MetadataValidationError(Exception): """Metadata validation failed.""" def validate(target): """Validate metadata files.""" failed = False skip_validation = ConfigLoader.get("metadata", "validation", "skip", fallback=[]) if os.path.isdir(target): for root, dirs, files in os.walk(target, followlinks=True): for file in files: if Metadata.is_metadata_file(file): path = os.path.join(root, file) if path in skip_validation: continue metadata = Metadata.from_file(path) if not validate_public_data(metadata, path): failed = True if not validate_change_control( file_path=root, metadata=metadata, codeowners_file=CODEOWNERS_FILE, ): failed = True # Shredder mitigation checks still WIP # if not validate_shredder_mitigation( # query_dir=root, # metadata=metadata, # ): # failed = True if not validate_deprecation(metadata, path): failed = True if not validate_exclusion_list_expiration_days(metadata, path): failed = True if not validate_retention_policy_based_on_table_type( metadata, path ): failed = True if not validate_col_desc_enforced(root, metadata): failed = True # todo more validation # e.g. https://github.com/mozilla/bigquery-etl/issues/924 else: raise ValueError(f"Invalid target: {target}, target must be a directory.") if failed: # TODO: add failed checks to message raise MetadataValidationError(f"Metadata validation failed for {target}") def validate_datasets(target): """Validate dataset metadata files.""" failed = False if os.path.isdir(target): for root, dirs, files in os.walk(target, followlinks=True): for file in files: if DatasetMetadata.is_dataset_metadata_file(file): path = os.path.join(root, file) _ = DatasetMetadata.from_file(path) else: raise ValueError(f"Invalid target: {target}, target must be a directory.") if failed: raise MetadataValidationError( f"Dataset metadata validation failed for {target}" ) def main(): """Validate all metadata.yaml files in the provided target directory.""" args = parser.parse_args() # set log level try: logging.basicConfig(level=args.log_level, format="%(levelname)s %(message)s") except ValueError as e: parser.error(f"argument --log-level: {e}") if args.target: validate(args.target) else: for project in project_dirs(): validate(project) if __name__ == "__main__": main()