sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py (251 lines of code) (raw):

#!/usr/bin/env python3 """Search for tables with client id columns.""" import datetime from collections import defaultdict from functools import cache from multiprocessing.pool import ThreadPool from pathlib import Path from typing import Any, Dict, Iterable, List, Set import click from google.cloud import bigquery from google.cloud import datacatalog_lineage_v1 as datacatalog_lineage from google.cloud.bigquery import TableReference from google.cloud.exceptions import NotFound from bigquery_etl.schema import Schema from bigquery_etl.shredder.config import ( CLIENT_ID, DELETE_TARGETS, GLEAN_CLIENT_ID, SHARED_PROD, DeleteSource, find_glean_targets, get_glean_channel_to_app_name_mapping, ) FIND_TABLES_QUERY_TEMPLATE = """ WITH no_client_id_tables AS ( SELECT table_catalog, table_schema, table_name, FROM `{project}.region-us.INFORMATION_SCHEMA.TABLE_OPTIONS` WHERE option_name = 'labels' AND option_value LIKE '%(\"include_client_id\", \"false\")%' ) SELECT table_catalog, table_schema, table_name, FROM `{project}.region-us.INFORMATION_SCHEMA.TABLES` LEFT JOIN no_client_id_tables USING (table_catalog, table_schema, table_name) WHERE -- find tables with columns ending with client_id ( ( ddl LIKE '%client_id STRING%' AND no_client_id_tables.table_name IS NULL ) OR ( -- glean tables may have an all null client_info.client_id column but may have a secondary client id ddl LIKE '%client_id STRING%client_id STRING%' AND no_client_id_tables.table_name IS NOT NULL ) ) AND table_type = 'BASE TABLE' -- exclude views AND ( table_schema LIKE '%_derived' or table_schema LIKE '%_stable' ) -- TODO: can't get lineage for most opmon tables, need to figure this out separately AND table_schema != "operational_monitoring_derived" AND table_schema != "backfills_staging_derived" AND table_name != 'deletion_request_v1' AND table_name != 'deletion_request_v4' """ def find_client_id_tables(project: str) -> List[str]: """Return a list of tables that have columns ending with 'client_id'.""" client = bigquery.Client(project=project) row_results = client.query_and_wait( query=FIND_TABLES_QUERY_TEMPLATE.format(project=project) ) return [f"{project}.{row.table_schema}.{row.table_name}" for row in row_results] def get_upstream_stable_tables(id_tables: List[str]) -> Dict[str, Set[str]]: """Build map of tables to upstream stable tables using GCP data catalog lineage. Note that the data catalog only uses the information from the last 30 days of query jobs. """ client = datacatalog_lineage.LineageClient() upstream_stable_tables = defaultdict(set) def traverse_upstream(base_table: str): """Recursively traverse lineage to find stable tables.""" table_ref = TableReference.from_string(base_table) if table_ref.dataset_id.endswith("_stable"): # stable tables are terminal nodes upstream_stable_tables[base_table] = {base_table} elif base_table not in upstream_stable_tables: upstream_links_result = client.search_links( request={ "parent": "projects/moz-fx-data-shared-prod/locations/us", "target": datacatalog_lineage.EntityReference( fully_qualified_name=f"bigquery:{base_table}" ), } ) # recursively add upstream tables for upstream_link in upstream_links_result: # possible sources: https://cloud.google.com/dataplex/docs/fully-qualified-names link_parts = upstream_link.source.fully_qualified_name.split(":") source = link_parts[0] parent_table = link_parts[-1] if not source.startswith("bigquery"): break upstream_stable_tables[base_table] = upstream_stable_tables[ base_table ].union(traverse_upstream(parent_table)) return upstream_stable_tables[base_table] upstream_stable_table_map = {} print("Upstream stable tables:") for table_name in id_tables: upstream_stable_table_map[table_name] = set(traverse_upstream(table_name)) print(f"{table_name} upstream: {upstream_stable_table_map[table_name]}") return upstream_stable_table_map @cache def table_exists(client: bigquery.Client, table_name: str) -> bool: """Return true if given project.dataset.table exists, caching results.""" try: client.get_table(table_name) return True except NotFound: return False def get_associated_deletions( project: str, upstream_stable_tables: Dict[str, Set[str]] ) -> Dict[str, Set[DeleteSource]]: """Get a list of associated deletion requests tables per table based on the stable tables.""" client = bigquery.Client(project=project) # deletion targets for stable tables defined in the shredder config known_stable_table_sources: Dict[str, Set[DeleteSource]] = { f"{target.project}.{target.dataset_id}.{target.table_id}": ( set(src) if isinstance(src, Iterable) else {src} ) for target, src in DELETE_TARGETS.items() if target.dataset_id.endswith("_stable") } table_to_deletions: Dict[str, Set[DeleteSource]] = {} datasets_with_additional_deletion_requests = {} for base_table in upstream_stable_tables: if base_table.endswith(".additional_deletion_requests_v1"): dataset_name = TableReference.from_string(base_table).dataset_id datasets_with_additional_deletion_requests[dataset_name] = ( f"{dataset_name}.additional_deletion_requests_v1" ) datasets_with_additional_deletion_requests[ dataset_name.replace("_derived", "_stable") ] = f"{dataset_name}.additional_deletion_requests_v1" glean_channel_names = get_glean_channel_to_app_name_mapping() for table_name, stable_tables in upstream_stable_tables.items(): deletion_tables: Set[DeleteSource] = set() for stable_table in stable_tables: if stable_table in known_stable_table_sources: table_to_deletions[stable_table] = known_stable_table_sources[ stable_table ] elif stable_table not in table_to_deletions: stable_table_ref = TableReference.from_string(stable_table) # glean table if ( stable_table_ref.dataset_id[: -len("_stable")] in glean_channel_names ): if table_exists( client, f"{stable_table_ref.project}.{stable_table_ref.dataset_id}.deletion_request_v1", ): table_to_deletions[stable_table] = { DeleteSource( table=f"{stable_table_ref.dataset_id}.deletion_request_v1", field=GLEAN_CLIENT_ID, project=SHARED_PROD, ) } else: print(f"No deletion requests for {stable_table_ref.dataset_id}") table_to_deletions[stable_table] = set() if ( stable_table_ref.dataset_id in datasets_with_additional_deletion_requests ): table_to_deletions[stable_table].add( DeleteSource( table=datasets_with_additional_deletion_requests[ stable_table_ref.dataset_id ], field=CLIENT_ID, project=SHARED_PROD, ) ) # unknown legacy telemetry or non-glean structured else: table_to_deletions[stable_table] = set() deletion_tables = deletion_tables.union(table_to_deletions[stable_table]) table_to_deletions[table_name] = deletion_tables return { table: deletions for table, deletions in table_to_deletions.items() if table in upstream_stable_tables } def delete_source_to_dict(source: DeleteSource): """Convert a DeleteSource to a dict, removing the condition field.""" d = source.__dict__.copy() d.pop("conditions") return d def get_missing_deletions( associated_deletions: Dict[str, Set[DeleteSource]] ) -> List[Dict[str, Any]]: """Get list of all tables with the currently configured deletion sources and the sources based on lineage.""" # get the generated glean deletion list with ThreadPool(processes=12) as pool: bigquery_client = bigquery.Client() glean_delete_targets = find_glean_targets(pool, client=bigquery_client) glean_channel_names = get_glean_channel_to_app_name_mapping() glean_app_name_to_channels = defaultdict(list) for channel, app_name in glean_channel_names.items(): glean_app_name_to_channels[app_name].append(channel) table_deletions = [] for target, sources in (*glean_delete_targets.items(), *DELETE_TARGETS.items()): target_table = f"{target.project}.{target.table}" # expand per-app deletion request views into per-channel tables unnested_sources = set() if isinstance(sources, Iterable): for source in sources: if ( source.dataset_id in glean_app_name_to_channels and source.table_id == "deletion_request" ): for channel in glean_app_name_to_channels[source.dataset_id]: unnested_sources.add( DeleteSource( table=f"{channel}_stable.deletion_request_v1", field=source.field, project=source.project, conditions=source.conditions, ) ) else: unnested_sources.add(source) else: unnested_sources.add(sources) # tables not in associated_deletions likely use another id column, e.g. user_id detected_deletions = associated_deletions.pop(target_table, set()) table_deletions.append( { "project_id": target.project, "dataset_id": target.dataset_id, "table_id": target.table_id, "current_sources": [ delete_source_to_dict(source) for source in unnested_sources ], "detected_sources": [ delete_source_to_dict(detected) for detected in detected_deletions ], "matching_sources": set(unnested_sources) == detected_deletions, } ) # delete target not in shredder config for table_name, srcs in associated_deletions.items(): project, dataset, table = table_name.split(".") table_deletions.append( { "project_id": project, "dataset_id": dataset, "table_id": table, "current_sources": [], "detected_sources": [ delete_source_to_dict(detected) for detected in srcs ], "matching_sources": len(srcs) == 0, } ) return table_deletions def write_to_bigquery( run_date: datetime.datetime, target_table: TableReference, deletions: List[Dict[str, Any]], ): client = bigquery.Client() result = client.load_table_from_json( json_rows=[ {"run_date": run_date.date().isoformat(), **deletion} for deletion in deletions ], destination=f"{str(target_table)}${run_date.date().strftime('%Y%m%d')}", job_config=bigquery.LoadJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, schema=Schema.from_schema_file( Path(__file__).parent / "schema.yaml" ).to_bigquery_schema(), time_partitioning=bigquery.TimePartitioning(field="run_date"), ), ).result() print(f"Wrote {result.output_rows} rows to {result.destination}") @click.command @click.option( "--run-date", type=click.DateTime(), help="The date to write in the output." ) @click.option( "--output-table", type=TableReference.from_string, metavar="PROJECT.DATASET.TABLE", help="Table to write results to in the form of PROJECT.DATASET.TABLE.", ) @click.option( "--project-id", default=SHARED_PROD, help="BigQuery project to search for client id tables.", ) def main(run_date, output_table, project_id): """Find tables in the given project that could be added to shredder.""" # TODO: handle other id columns client_id_tables = find_client_id_tables(project_id) print(f"Found {len(client_id_tables)} client id tables.") upstream_stable_tables = get_upstream_stable_tables(client_id_tables) associated_deletions = get_associated_deletions(project_id, upstream_stable_tables) table_deletions = get_missing_deletions(associated_deletions) write_to_bigquery(run_date, output_table, table_deletions) if __name__ == "__main__": main()