bigquery_etl/shredder/search.py (96 lines of code) (raw):

#!/usr/bin/env python3 """Search for tables and user ids that may be eligible for self serve deletion.""" import re import warnings from argparse import ArgumentParser from google.cloud import bigquery from ..util import standard_args from .config import SEARCH_IGNORE_FIELDS, SEARCH_IGNORE_TABLES, SHARED_PROD parser = ArgumentParser(description=__doc__) parser.add_argument( "-p", "--project", "--project_id", "--project-id", default=SHARED_PROD, help=f"ID of the project in which to find tables; defaults to {SHARED_PROD}", ) standard_args.add_log_level(parser, default=None) standard_args.add_table_filter(parser) ID_PATTERN = re.compile(r"(\b|_)id") IGNORE_PATTERN = re.compile( "|".join( [ "activation_id", "addon_id", "application_id", "batch_id", "bucket_id", "bug_id", "build_id", "campaign_id", "changeset_id", "crash_id", "device_id", "distribution_id", "document_id", "error_id", "experiment_id", "extension_id", "encryption_key_id", "insert_id", "message_id", "model_id", "network_id", "page_id", "partner_id", "product_id", "run_id", "setter_id", "survey_id", "sample_id", "session_id", "subsys(tem)?_id", "thread_id", "tile_id", "vendor_id", "id_bucket", r"active_experiment\.id", r"theme\.id", r"tiles\[]\.id", r"spoc_fills\[]\.id", r"devices\[]\.id", r"application\.id", r"environment\.id", ] ) ) DATASET_PATTERN = re.compile(".*_(stable|derived)") def find_id_fields(fields, prefix=""): """Recursively locate potential ids in fields.""" for field in fields: name = prefix + field.name if field.field_type == "RECORD": prefix += field.name + ("[]" if field.mode == "REPEATED" else "") + "." yield from find_id_fields(field.fields, prefix) elif ID_PATTERN.search(name) and not IGNORE_PATTERN.search(name): yield name def find_target_tables(project, table_filter): """Search for potential new ids and new tables with ids.""" client = bigquery.Client() for dataset in client.list_datasets(project): if not DATASET_PATTERN.match(dataset.dataset_id): continue for table_ref in client.list_tables(dataset.reference): if table_ref.table_type != "TABLE": continue table = f"{table_ref.dataset_id}.{table_ref.table_id}" if not table_filter(table) or table in SEARCH_IGNORE_TABLES: continue for field in find_id_fields(client.get_table(table_ref).schema): result = table, field if result not in SEARCH_IGNORE_FIELDS: yield result def main(): """Print results of find_target_tables.""" args = parser.parse_args() for table, field in find_target_tables(args.project, args.table_filter): print(f"table={table!r}, field={field!r}") if __name__ == "__main__": warnings.filterwarnings("ignore", module="google.auth._default") main()