bigquery_etl/cli/backfill.py (709 lines of code) (raw):

"""bigquery-etl CLI backfill command.""" import json import logging import subprocess import sys import tempfile from collections import defaultdict from datetime import date, datetime, timedelta from pathlib import Path import rich_click as click import yaml from dateutil.relativedelta import relativedelta from google.cloud import bigquery from google.cloud.exceptions import Conflict, NotFound from ..backfill.date_range import BackfillDateRange, get_backfill_partition from ..backfill.parse import ( BACKFILL_FILE, DEFAULT_BILLING_PROJECT, DEFAULT_REASON, DEFAULT_WATCHER, Backfill, BackfillStatus, ) from ..backfill.shredder_mitigation import ( SHREDDER_MITIGATION_CHECKS_NAME, SHREDDER_MITIGATION_QUERY_NAME, generate_query_with_shredder_mitigation, ) from ..backfill.utils import ( MAX_BACKFILL_ENTRY_AGE_DAYS, get_backfill_backup_table_name, get_backfill_file_from_qualified_table_name, get_backfill_staging_qualified_table_name, get_entries_from_qualified_table_name, get_qualified_table_name_to_entries_map_by_project, get_scheduled_backfills, qualified_table_name_matching, validate_table_metadata, ) from ..backfill.validate import ( validate_depends_on_past_end_date, validate_duplicate_entry_with_initiate_status, validate_file, ) from ..cli.query import backfill as query_backfill from ..cli.utils import ( billing_project_option, is_authenticated, project_id_option, sql_dir_option, ) from ..config import ConfigLoader from ..deploy import FailedDeployException, SkippedDeployException, deploy_table from ..format_sql.formatter import reformat from ..metadata.parse_metadata import METADATA_FILE, Metadata, PartitionType from ..metadata.validate_metadata import SHREDDER_MITIGATION_LABEL from ..schema import SCHEMA_FILE, Schema logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) ignore_missing_metadata_option = click.option( "--ignore-missing-metadata", is_flag=True, default=False, help="Ignore backfills for tables missing metadata.yaml. " "This can be used to run on checked-in queries without running sql generation.", ) @click.group(help="Commands for managing backfills.") @click.pass_context def backfill(ctx): """Create the CLI group for the backfill command.""" # create temporary directory generated content is written to # the directory will be deleted automatically after the command exits ctx.ensure_object(dict) ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory()) @backfill.command( help="""Create a new backfill entry in the backfill.yaml file. Create a backfill.yaml file if it does not already exist. Examples: \b ./bqetl backfill create moz-fx-data-shared-prod.telemetry_derived.deviations_v1 \\ --start_date=2021-03-01 \\ --end_date=2021-03-31 \\ --exclude=2021-03-03 \\ """, ) @click.argument("qualified_table_name") @sql_dir_option @click.option( "--start_date", "--start-date", "-s", help="First date to be backfilled. Date format: yyyy-mm-dd", type=click.DateTime(formats=["%Y-%m-%d"]), required=True, ) @click.option( "--end_date", "--end-date", "-e", help="Last date to be backfilled. Date format: yyyy-mm-dd", type=click.DateTime(formats=["%Y-%m-%d"]), default=datetime.today(), ) @click.option( "--exclude", "-x", multiple=True, help="Dates excluded from backfill. Date format: yyyy-mm-dd", type=click.DateTime(formats=["%Y-%m-%d"]), default=None, ) @click.option( "--watcher", "-w", help="Watcher of the backfill (email address)", default=DEFAULT_WATCHER, ) @click.option( "--custom_query_path", "--custom-query-path", help="Path of the custom query to run the backfill. Optional.", ) @click.option( "--shredder_mitigation/--no_shredder_mitigation", help="Wether to run a backfill using an auto-generated query that mitigates shredder effect.", ) @click.option( "--override-retention-range-limit", "--override_retention_range_limit", required=False, type=bool, is_flag=True, help="True to allow running a backfill outside the retention policy limit.", default=False, ) @click.option( "--override-depends-on-past-end-date", "--override_depends_on_past_end_date", is_flag=True, help="If set, allow backfill for depends_on_past tables to have an end date before the entry date. " "In some cases, this can cause inconsistencies in the data.", ) # If not specified, the billing project will be set to the default billing project when the backfill is initiated. @billing_project_option() @click.pass_context def create( ctx, qualified_table_name, sql_dir, start_date, end_date, exclude, watcher, custom_query_path, shredder_mitigation, override_retention_range_limit, override_depends_on_past_end_date, billing_project, ): """CLI command for creating a new backfill entry in backfill.yaml file. A backfill.yaml file will be created if it does not already exist. """ if errors := validate_table_metadata( sql_dir, qualified_table_name, ignore_missing_metadata=False ): click.echo("\n".join(errors)) sys.exit(1) existing_backfills = get_entries_from_qualified_table_name( sql_dir, qualified_table_name ) new_entry = Backfill( entry_date=date.today(), start_date=start_date.date(), end_date=end_date.date(), excluded_dates=[e.date() for e in list(exclude)], reason=DEFAULT_REASON, watchers=[watcher], status=BackfillStatus.INITIATE, custom_query_path=custom_query_path, shredder_mitigation=shredder_mitigation, override_retention_limit=override_retention_range_limit, override_depends_on_past_end_date=override_depends_on_past_end_date, billing_project=billing_project, ) backfill_file = get_backfill_file_from_qualified_table_name( sql_dir, qualified_table_name ) validate_duplicate_entry_with_initiate_status(new_entry, existing_backfills) validate_depends_on_past_end_date(new_entry, backfill_file) existing_backfills.insert(0, new_entry) backfill_file.write_text( "\n".join( backfill.to_yaml() for backfill in sorted(existing_backfills, reverse=True) ) ) click.echo(f"Created backfill entry in {backfill_file}.") @backfill.command( help="""Validate backfill.yaml file format and content. Examples: ./bqetl backfill validate moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 \b # validate all backfill.yaml files if table is not specified Use the `--project_id` option to change the project to be validated; default is `moz-fx-data-shared-prod`. Examples: ./bqetl backfill validate """ ) @click.argument("qualified_table_name", required=False) @sql_dir_option @project_id_option() @ignore_missing_metadata_option @click.pass_context def validate( ctx, qualified_table_name, sql_dir, project_id, ignore_missing_metadata, ): """Validate backfill.yaml files.""" if qualified_table_name: backfills_dict = { qualified_table_name: get_entries_from_qualified_table_name( sql_dir, qualified_table_name ) } else: backfills_dict = get_qualified_table_name_to_entries_map_by_project( sql_dir, project_id ) errors = defaultdict(list) for table_name in backfills_dict: if metadata_errors := validate_table_metadata( sql_dir, table_name, ignore_missing_metadata ): click.echo("\n".join(metadata_errors)) sys.exit(1) try: backfill_file = get_backfill_file_from_qualified_table_name( sql_dir, table_name ) validate_file(backfill_file) except (yaml.YAMLError, ValueError) as e: errors[table_name].append( f"Backfill.yaml file for {table_name} contains the following error:\n {e}" ) except FileNotFoundError: if ignore_missing_metadata: click.echo(f"Skipping {table_name} due to --ignore-missing-metadata") else: raise if table_name in errors: click.echo(f"{BACKFILL_FILE} validation failed for {table_name}") else: click.echo(f"{BACKFILL_FILE} has been validated for {table_name}.") if len(errors) > 0: click.echo("Failed to validate the following backfill entries:") for table_name, error_list in errors.items(): if len(error_list) == 0: continue click.echo(f"{table_name}:") click.echo("\n".join(error_list)) sys.exit(1) elif backfills_dict: click.echo( f"All {BACKFILL_FILE} files have been validated for project {project_id}." ) @backfill.command( help="""Get backfill(s) information from all or specific table(s). Examples: # Get info for specific table. ./bqetl backfill info moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 \b # Get info for all tables. ./bqetl backfill info \b # Get info from all tables with specific status. ./bqetl backfill info --status=Initiate """, ) @click.argument("qualified_table_name", required=False) @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.option( "--status", type=click.Choice([s.value for s in BackfillStatus]), help="Filter backfills with this status.", ) @click.pass_context def info(ctx, qualified_table_name, sql_dir, project_id, status): """Return backfill(s) information from all or specific table(s).""" if qualified_table_name: backfills_dict = { qualified_table_name: get_entries_from_qualified_table_name( sql_dir, qualified_table_name, status ) } else: backfills_dict = get_qualified_table_name_to_entries_map_by_project( sql_dir, project_id, status ) total_backfills_count = 0 for qualified_table_name, entries in backfills_dict.items(): entries_count = len(entries) total_backfills_count += entries_count project, dataset, table = qualified_table_name_matching(qualified_table_name) status_str = f" with {status} status" if status is not None else "" click.echo( f"""{project}.{dataset}.{table} has {entries_count} backfill(s){status_str}:""" ) for entry in entries: click.echo(str(entry)) click.echo(f"\nThere are a total of {total_backfills_count} backfill(s).") @backfill.command( help="""Get information on backfill(s) that require processing. Examples: # Get info for specific table. ./bqetl backfill scheduled moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 \b # Get info for all tables. ./bqetl backfill scheduled """, ) @click.argument("qualified_table_name", required=False) @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.option( "--status", type=click.Choice([s.value for s in BackfillStatus]), default=BackfillStatus.INITIATE.value, help="Whether to get backfills to process or to complete.", ) @click.option("--json_path", type=click.Path()) @click.option( "--ignore-old-entries", is_flag=True, default=False, help=f"If set, entries older than {MAX_BACKFILL_ENTRY_AGE_DAYS} days will be ignored due " "to BigQuery retention settings.", ) @ignore_missing_metadata_option @click.pass_context def scheduled( ctx, qualified_table_name, sql_dir, project_id, status, json_path, ignore_old_entries, ignore_missing_metadata, ): """Return list of backfill(s) that require processing.""" backfills = get_scheduled_backfills( sql_dir, project_id, qualified_table_name, status=status, ignore_old_entries=ignore_old_entries, ignore_missing_metadata=ignore_missing_metadata, ) for qualified_table_name, entry in backfills.items(): click.echo(f"Backfill scheduled for {qualified_table_name}:\n{entry}") click.echo(f"{len(backfills)} backfill(s) require processing.") if json_path is not None: formatted_backfills = [ { "qualified_table_name": qualified_table_name, "entry_date": entry.entry_date.strftime("%Y-%m-%d"), "watchers": entry.watchers, } for qualified_table_name, entry in backfills.items() ] Path(json_path).write_text(json.dumps(formatted_backfills)) @backfill.command( help="""Process entry in backfill.yaml with Initiate status that has not yet been processed. Examples: \b # Initiate backfill entry for specific table ./bqetl backfill initiate moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 Use the `--project_id` option to change the project; default project_id is `moz-fx-data-shared-prod`. """ ) @click.argument("qualified_table_name") @click.option( "--parallelism", default=16, type=int, help="Maximum number of queries to execute concurrently", ) @sql_dir_option @project_id_option( ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") ) @click.pass_context def initiate( ctx, qualified_table_name, parallelism, sql_dir, project_id, ): """Process backfill entry with initiate status in backfill.yaml file(s).""" click.echo("Backfill processing (initiate) started....") backfills_to_process_dict = get_scheduled_backfills( sql_dir, project_id, qualified_table_name, status=BackfillStatus.INITIATE.value ) if not backfills_to_process_dict: click.echo(f"No backfill processed for {qualified_table_name}") return entry_to_initiate = backfills_to_process_dict[qualified_table_name] backfill_staging_qualified_table_name = get_backfill_staging_qualified_table_name( qualified_table_name, entry_to_initiate.entry_date ) project, dataset, table = qualified_table_name_matching(qualified_table_name) query_path = Path(sql_dir) / project / dataset / table / "query.sql" # create schema before deploying staging table if it does not exist schema_path = query_path.parent / SCHEMA_FILE if not schema_path.exists(): # if schema doesn't exist, a schema file is created to allow backfill staging table deployment Schema.from_query_file( query_file=query_path, respect_skip=False, sql_dir=sql_dir, ).to_yaml_file(schema_path) click.echo(f"Schema file created for {qualified_table_name}: {schema_path}") try: deploy_table( artifact_file=query_path, destination_table=backfill_staging_qualified_table_name, respect_dryrun_skip=False, ) except (SkippedDeployException, FailedDeployException) as e: raise RuntimeError( f"Backfill initiate failed to deploy {query_path} to {backfill_staging_qualified_table_name}." ) from e billing_project = DEFAULT_BILLING_PROJECT # override with billing project from backfill entry if entry_to_initiate.billing_project is not None: billing_project = entry_to_initiate.billing_project elif not billing_project.startswith("moz-fx-data-backfill-"): raise ValueError( f"Invalid billing project: {billing_project}. Please use one of the projects assigned to backfills." ) sys.exit(1) click.echo( f"\nInitiating backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date} via dry run:" ) _initiate_backfill( ctx, qualified_table_name, backfill_staging_qualified_table_name, entry_to_initiate, parallelism, dry_run=True, billing_project=billing_project, ) click.echo( f"\nInitiating backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date}:" ) _initiate_backfill( ctx, qualified_table_name, backfill_staging_qualified_table_name, entry_to_initiate, parallelism, billing_project=billing_project, ) click.echo( f"Processed backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date}" ) def _initiate_backfill( ctx, qualified_table_name: str, backfill_staging_qualified_table_name: str, entry: Backfill, parallelism: int = 16, dry_run: bool = False, billing_project=DEFAULT_BILLING_PROJECT, ): if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) project, dataset, table = qualified_table_name_matching(qualified_table_name) logging_str = f"""Initiating backfill for {qualified_table_name} (destination: {backfill_staging_qualified_table_name}). Query will be executed in {billing_project}.""" if dry_run: logging_str += " This is a dry run." log.info(logging_str) custom_query_path = None checks = None custom_checks_name = None metadata = Metadata.from_file( Path("sql") / project / dataset / table / METADATA_FILE ) # Stop if the metadata contains shredder mitigation label and the backfill doesn't. if ( SHREDDER_MITIGATION_LABEL in metadata.labels and entry.shredder_mitigation is False ): click.echo( click.style( f"This backfill cannot continue.\nManaged backfills for tables with metadata label" f" {SHREDDER_MITIGATION_LABEL} require using --shredder_mitigation.", fg="yellow", ) ) sys.exit(1) client = bigquery.Client(project=project) if entry.shredder_mitigation is True: click.echo( click.style( f"Generating query with shredder mitigation for {dataset}.{table}...", fg="blue", ) ) query_path, _ = generate_query_with_shredder_mitigation( client=client, project_id=project, dataset=dataset, destination_table=table, staging_table_name=backfill_staging_qualified_table_name, backfill_date=entry.start_date.isoformat(), ) custom_query_path = Path(query_path) / f"{SHREDDER_MITIGATION_QUERY_NAME}.sql" checks = True custom_checks_name = f"{SHREDDER_MITIGATION_CHECKS_NAME}.sql" click.echo( click.style( f"Starting backfill with custom query: '{custom_query_path}'.", fg="blue", ) ) elif entry.custom_query_path: custom_query_path = Path(entry.custom_query_path) # rewrite query to query the staging table instead of the prod table if table depends on past if metadata.scheduling.get("depends_on_past"): query_path = ( custom_query_path or Path("sql") / project / dataset / table / "query.sql" ) # format to ensure fully qualified references query_text = reformat(query_path.read_text()) updated_query_text = query_text.replace( f"`{project}.{dataset}.{table}`", f"`{backfill_staging_qualified_table_name}`", ) replaced_ref_query = query_path.parent / "replaced_ref.sql" replaced_ref_query.write_text(updated_query_text) custom_query_path = replaced_ref_query override_retention_limit = entry.override_retention_limit # copy previous partition if depends_on_past _initialize_previous_partition( client, qualified_table_name, backfill_staging_qualified_table_name, metadata, entry, ) # Backfill table # in the long-run we should remove the query backfill command and require a backfill entry for all backfills try: ctx.invoke( query_backfill, name=f"{dataset}.{table}", project_id=project, start_date=datetime.fromisoformat(entry.start_date.isoformat()), end_date=datetime.fromisoformat(entry.end_date.isoformat()), exclude=[e.strftime("%Y-%m-%d") for e in entry.excluded_dates], destination_table=backfill_staging_qualified_table_name, parallelism=parallelism, dry_run=dry_run, **( { k: param for k, param in [ ("custom_query_path", custom_query_path), ("checks", checks), ("checks_file_name", custom_checks_name), ] if param is not None } ), billing_project=billing_project, override_retention_range_limit=override_retention_limit, ) except subprocess.CalledProcessError as e: raise ValueError( f"Backfill initiate resulted in error for {qualified_table_name}" ) from e def _initialize_previous_partition( client: bigquery.Client, table_name: str, staging_table_name: str, metadata: Metadata, backfill_entry: Backfill, ): """Initialize initial partition for tables with depends_on_past=true. For tables with a null date partition parameter, the entire table is copied """ if ( metadata.scheduling is None or not metadata.scheduling.get("depends_on_past") or metadata.bigquery is None or metadata.bigquery.time_partitioning is None ): return match metadata.bigquery.time_partitioning.type: case PartitionType.DAY: previous_partition_date = backfill_entry.start_date - timedelta(days=1) case PartitionType.MONTH: previous_partition_date = backfill_entry.start_date - relativedelta( months=1 ) case _: raise ValueError( "Unsupported partitioning type for backfills: " f"{metadata.bigquery.time_partitioning.type}" ) partition_param, offset = "submission_date", 0 if metadata.scheduling: partition_param = metadata.scheduling.get( "date_partition_parameter", partition_param ) offset = metadata.scheduling.get("date_partition_offset", offset) previous_partition_id = get_backfill_partition( previous_partition_date, partition_param, offset, metadata.bigquery.time_partitioning.type, ) if previous_partition_id is None: raise ValueError( f"Unable to get initial partition id for depends_on_past table: {table_name}" ) _copy_table( source_table=f"{table_name}${previous_partition_id}", destination_table=f"{staging_table_name}${previous_partition_id}", client=client, ) @backfill.command( help="""Complete entry in backfill.yaml with Complete status that has not yet been processed.. Examples: \b # Complete backfill entry for specific table ./bqetl backfill complete moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 Use the `--project_id` option to change the project; default project_id is `moz-fx-data-shared-prod`. """ ) @click.argument("qualified_table_name") @sql_dir_option @project_id_option("moz-fx-data-shared-prod") @click.pass_context def complete(ctx, qualified_table_name, sql_dir, project_id): """Process backfill entry with complete status in backfill.yaml file(s).""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login --update-adc` " "and check that the project is set correctly." ) sys.exit(1) client = bigquery.Client(project=project_id) click.echo("Backfill processing (complete) started....") backfills_to_process_dict = get_scheduled_backfills( sql_dir, project_id, qualified_table_name, status=BackfillStatus.COMPLETE.value ) if not backfills_to_process_dict: click.echo(f"No backfill processed for {qualified_table_name}") return entry_to_complete = backfills_to_process_dict[qualified_table_name] click.echo( f"Completing backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}:" ) backfill_staging_qualified_table_name = get_backfill_staging_qualified_table_name( qualified_table_name, entry_to_complete.entry_date ) # clone production table cloned_table_full_name = get_backfill_backup_table_name( qualified_table_name, entry_to_complete.entry_date ) _copy_table(qualified_table_name, cloned_table_full_name, client, clone=True) project, dataset, table = qualified_table_name_matching(qualified_table_name) table_metadata = Metadata.from_file( Path(sql_dir) / project / dataset / table / METADATA_FILE ) _copy_backfill_staging_to_prod( backfill_staging_qualified_table_name, qualified_table_name, client, entry_to_complete, table_metadata, ) # delete backfill staging table client.delete_table(backfill_staging_qualified_table_name) click.echo( f"Backfill staging table deleted: {backfill_staging_qualified_table_name}" ) click.echo( f"Processed backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}" ) def _copy_backfill_staging_to_prod( backfill_staging_table: str, qualified_table_name: str, client: bigquery.Client, entry: Backfill, table_metadata: Metadata, ): """Copy backfill staging table to prod based on table metadata and backfill config. If table is un-partitioned: copy the entire staging table to production. partitioned: determine and copy each partition from staging to production. """ partitioning_type = None if table_metadata.bigquery and table_metadata.bigquery.time_partitioning: partitioning_type = table_metadata.bigquery.time_partitioning.type if partitioning_type is None: _copy_table(backfill_staging_table, qualified_table_name, client) else: backfill_date_range = BackfillDateRange( entry.start_date, entry.end_date, excludes=entry.excluded_dates, range_type=partitioning_type, ) # If date_partition_parameter isn't set it's assumed to be submission_date: # https://github.com/mozilla/telemetry-airflow/blob/dbc2782fa23a34ae8268e7788f9621089ac71def/utils/gcp.py#L194C48-L194C48 partition_param, offset = "submission_date", 0 if table_metadata.scheduling: partition_param = table_metadata.scheduling.get( "date_partition_parameter", partition_param ) offset = table_metadata.scheduling.get("date_partition_offset", offset) for backfill_date in backfill_date_range: if ( partition := get_backfill_partition( backfill_date, partition_param, offset, partitioning_type, ) ) is None: raise ValueError( f"Null partition found completing backfill {entry} for {qualified_table_name}." ) production_table = f"{qualified_table_name}${partition}" backfill_table = f"{backfill_staging_table}${partition}" _copy_table(backfill_table, production_table, client) def _copy_table( source_table: str, destination_table: str, client, clone: bool = False ) -> None: """ Copy and overwrite table from source to destination table. If clone is True, clone (previous) production data for backup before swapping stage data into production. """ job_type_str = "copied" if clone: copy_config = bigquery.CopyJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_EMPTY, operation_type=bigquery.job.copy_.OperationType.CLONE, destination_expiration_time=(datetime.now() + timedelta(days=30)).strftime( "%Y-%m-%dT%H:%M:%SZ" ), ) job_type_str = "cloned" else: copy_config = bigquery.CopyJobConfig( write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, operation_type=bigquery.job.copy_.OperationType.COPY, ) try: client.copy_table( source_table, destination_table, job_config=copy_config, ).result() except NotFound: click.echo(f"Source table not found: {source_table}") sys.exit(1) except Conflict: click.echo(f"Backup table already exists: {destination_table}") sys.exit(1) click.echo( f"Table {source_table} successfully {job_type_str} to {destination_table}" )