bigquery_etl/cli/backfill.py (709 lines of code) (raw):
"""bigquery-etl CLI backfill command."""
import json
import logging
import subprocess
import sys
import tempfile
from collections import defaultdict
from datetime import date, datetime, timedelta
from pathlib import Path
import rich_click as click
import yaml
from dateutil.relativedelta import relativedelta
from google.cloud import bigquery
from google.cloud.exceptions import Conflict, NotFound
from ..backfill.date_range import BackfillDateRange, get_backfill_partition
from ..backfill.parse import (
BACKFILL_FILE,
DEFAULT_BILLING_PROJECT,
DEFAULT_REASON,
DEFAULT_WATCHER,
Backfill,
BackfillStatus,
)
from ..backfill.shredder_mitigation import (
SHREDDER_MITIGATION_CHECKS_NAME,
SHREDDER_MITIGATION_QUERY_NAME,
generate_query_with_shredder_mitigation,
)
from ..backfill.utils import (
MAX_BACKFILL_ENTRY_AGE_DAYS,
get_backfill_backup_table_name,
get_backfill_file_from_qualified_table_name,
get_backfill_staging_qualified_table_name,
get_entries_from_qualified_table_name,
get_qualified_table_name_to_entries_map_by_project,
get_scheduled_backfills,
qualified_table_name_matching,
validate_table_metadata,
)
from ..backfill.validate import (
validate_depends_on_past_end_date,
validate_duplicate_entry_with_initiate_status,
validate_file,
)
from ..cli.query import backfill as query_backfill
from ..cli.utils import (
billing_project_option,
is_authenticated,
project_id_option,
sql_dir_option,
)
from ..config import ConfigLoader
from ..deploy import FailedDeployException, SkippedDeployException, deploy_table
from ..format_sql.formatter import reformat
from ..metadata.parse_metadata import METADATA_FILE, Metadata, PartitionType
from ..metadata.validate_metadata import SHREDDER_MITIGATION_LABEL
from ..schema import SCHEMA_FILE, Schema
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
ignore_missing_metadata_option = click.option(
"--ignore-missing-metadata",
is_flag=True,
default=False,
help="Ignore backfills for tables missing metadata.yaml. "
"This can be used to run on checked-in queries without running sql generation.",
)
@click.group(help="Commands for managing backfills.")
@click.pass_context
def backfill(ctx):
"""Create the CLI group for the backfill command."""
# create temporary directory generated content is written to
# the directory will be deleted automatically after the command exits
ctx.ensure_object(dict)
ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory())
@backfill.command(
help="""Create a new backfill entry in the backfill.yaml file. Create
a backfill.yaml file if it does not already exist.
Examples:
\b
./bqetl backfill create moz-fx-data-shared-prod.telemetry_derived.deviations_v1 \\
--start_date=2021-03-01 \\
--end_date=2021-03-31 \\
--exclude=2021-03-03 \\
""",
)
@click.argument("qualified_table_name")
@sql_dir_option
@click.option(
"--start_date",
"--start-date",
"-s",
help="First date to be backfilled. Date format: yyyy-mm-dd",
type=click.DateTime(formats=["%Y-%m-%d"]),
required=True,
)
@click.option(
"--end_date",
"--end-date",
"-e",
help="Last date to be backfilled. Date format: yyyy-mm-dd",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=datetime.today(),
)
@click.option(
"--exclude",
"-x",
multiple=True,
help="Dates excluded from backfill. Date format: yyyy-mm-dd",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
)
@click.option(
"--watcher",
"-w",
help="Watcher of the backfill (email address)",
default=DEFAULT_WATCHER,
)
@click.option(
"--custom_query_path",
"--custom-query-path",
help="Path of the custom query to run the backfill. Optional.",
)
@click.option(
"--shredder_mitigation/--no_shredder_mitigation",
help="Wether to run a backfill using an auto-generated query that mitigates shredder effect.",
)
@click.option(
"--override-retention-range-limit",
"--override_retention_range_limit",
required=False,
type=bool,
is_flag=True,
help="True to allow running a backfill outside the retention policy limit.",
default=False,
)
@click.option(
"--override-depends-on-past-end-date",
"--override_depends_on_past_end_date",
is_flag=True,
help="If set, allow backfill for depends_on_past tables to have an end date before the entry date. "
"In some cases, this can cause inconsistencies in the data.",
)
# If not specified, the billing project will be set to the default billing project when the backfill is initiated.
@billing_project_option()
@click.pass_context
def create(
ctx,
qualified_table_name,
sql_dir,
start_date,
end_date,
exclude,
watcher,
custom_query_path,
shredder_mitigation,
override_retention_range_limit,
override_depends_on_past_end_date,
billing_project,
):
"""CLI command for creating a new backfill entry in backfill.yaml file.
A backfill.yaml file will be created if it does not already exist.
"""
if errors := validate_table_metadata(
sql_dir, qualified_table_name, ignore_missing_metadata=False
):
click.echo("\n".join(errors))
sys.exit(1)
existing_backfills = get_entries_from_qualified_table_name(
sql_dir, qualified_table_name
)
new_entry = Backfill(
entry_date=date.today(),
start_date=start_date.date(),
end_date=end_date.date(),
excluded_dates=[e.date() for e in list(exclude)],
reason=DEFAULT_REASON,
watchers=[watcher],
status=BackfillStatus.INITIATE,
custom_query_path=custom_query_path,
shredder_mitigation=shredder_mitigation,
override_retention_limit=override_retention_range_limit,
override_depends_on_past_end_date=override_depends_on_past_end_date,
billing_project=billing_project,
)
backfill_file = get_backfill_file_from_qualified_table_name(
sql_dir, qualified_table_name
)
validate_duplicate_entry_with_initiate_status(new_entry, existing_backfills)
validate_depends_on_past_end_date(new_entry, backfill_file)
existing_backfills.insert(0, new_entry)
backfill_file.write_text(
"\n".join(
backfill.to_yaml() for backfill in sorted(existing_backfills, reverse=True)
)
)
click.echo(f"Created backfill entry in {backfill_file}.")
@backfill.command(
help="""Validate backfill.yaml file format and content.
Examples:
./bqetl backfill validate moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
\b
# validate all backfill.yaml files if table is not specified
Use the `--project_id` option to change the project to be validated;
default is `moz-fx-data-shared-prod`.
Examples:
./bqetl backfill validate
"""
)
@click.argument("qualified_table_name", required=False)
@sql_dir_option
@project_id_option()
@ignore_missing_metadata_option
@click.pass_context
def validate(
ctx,
qualified_table_name,
sql_dir,
project_id,
ignore_missing_metadata,
):
"""Validate backfill.yaml files."""
if qualified_table_name:
backfills_dict = {
qualified_table_name: get_entries_from_qualified_table_name(
sql_dir, qualified_table_name
)
}
else:
backfills_dict = get_qualified_table_name_to_entries_map_by_project(
sql_dir, project_id
)
errors = defaultdict(list)
for table_name in backfills_dict:
if metadata_errors := validate_table_metadata(
sql_dir, table_name, ignore_missing_metadata
):
click.echo("\n".join(metadata_errors))
sys.exit(1)
try:
backfill_file = get_backfill_file_from_qualified_table_name(
sql_dir, table_name
)
validate_file(backfill_file)
except (yaml.YAMLError, ValueError) as e:
errors[table_name].append(
f"Backfill.yaml file for {table_name} contains the following error:\n {e}"
)
except FileNotFoundError:
if ignore_missing_metadata:
click.echo(f"Skipping {table_name} due to --ignore-missing-metadata")
else:
raise
if table_name in errors:
click.echo(f"{BACKFILL_FILE} validation failed for {table_name}")
else:
click.echo(f"{BACKFILL_FILE} has been validated for {table_name}.")
if len(errors) > 0:
click.echo("Failed to validate the following backfill entries:")
for table_name, error_list in errors.items():
if len(error_list) == 0:
continue
click.echo(f"{table_name}:")
click.echo("\n".join(error_list))
sys.exit(1)
elif backfills_dict:
click.echo(
f"All {BACKFILL_FILE} files have been validated for project {project_id}."
)
@backfill.command(
help="""Get backfill(s) information from all or specific table(s).
Examples:
# Get info for specific table.
./bqetl backfill info moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
\b
# Get info for all tables.
./bqetl backfill info
\b
# Get info from all tables with specific status.
./bqetl backfill info --status=Initiate
""",
)
@click.argument("qualified_table_name", required=False)
@sql_dir_option
@project_id_option(
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
)
@click.option(
"--status",
type=click.Choice([s.value for s in BackfillStatus]),
help="Filter backfills with this status.",
)
@click.pass_context
def info(ctx, qualified_table_name, sql_dir, project_id, status):
"""Return backfill(s) information from all or specific table(s)."""
if qualified_table_name:
backfills_dict = {
qualified_table_name: get_entries_from_qualified_table_name(
sql_dir, qualified_table_name, status
)
}
else:
backfills_dict = get_qualified_table_name_to_entries_map_by_project(
sql_dir, project_id, status
)
total_backfills_count = 0
for qualified_table_name, entries in backfills_dict.items():
entries_count = len(entries)
total_backfills_count += entries_count
project, dataset, table = qualified_table_name_matching(qualified_table_name)
status_str = f" with {status} status" if status is not None else ""
click.echo(
f"""{project}.{dataset}.{table} has {entries_count} backfill(s){status_str}:"""
)
for entry in entries:
click.echo(str(entry))
click.echo(f"\nThere are a total of {total_backfills_count} backfill(s).")
@backfill.command(
help="""Get information on backfill(s) that require processing.
Examples:
# Get info for specific table.
./bqetl backfill scheduled moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
\b
# Get info for all tables.
./bqetl backfill scheduled
""",
)
@click.argument("qualified_table_name", required=False)
@sql_dir_option
@project_id_option(
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
)
@click.option(
"--status",
type=click.Choice([s.value for s in BackfillStatus]),
default=BackfillStatus.INITIATE.value,
help="Whether to get backfills to process or to complete.",
)
@click.option("--json_path", type=click.Path())
@click.option(
"--ignore-old-entries",
is_flag=True,
default=False,
help=f"If set, entries older than {MAX_BACKFILL_ENTRY_AGE_DAYS} days will be ignored due "
"to BigQuery retention settings.",
)
@ignore_missing_metadata_option
@click.pass_context
def scheduled(
ctx,
qualified_table_name,
sql_dir,
project_id,
status,
json_path,
ignore_old_entries,
ignore_missing_metadata,
):
"""Return list of backfill(s) that require processing."""
backfills = get_scheduled_backfills(
sql_dir,
project_id,
qualified_table_name,
status=status,
ignore_old_entries=ignore_old_entries,
ignore_missing_metadata=ignore_missing_metadata,
)
for qualified_table_name, entry in backfills.items():
click.echo(f"Backfill scheduled for {qualified_table_name}:\n{entry}")
click.echo(f"{len(backfills)} backfill(s) require processing.")
if json_path is not None:
formatted_backfills = [
{
"qualified_table_name": qualified_table_name,
"entry_date": entry.entry_date.strftime("%Y-%m-%d"),
"watchers": entry.watchers,
}
for qualified_table_name, entry in backfills.items()
]
Path(json_path).write_text(json.dumps(formatted_backfills))
@backfill.command(
help="""Process entry in backfill.yaml with Initiate status that has not yet been processed.
Examples:
\b
# Initiate backfill entry for specific table
./bqetl backfill initiate moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
Use the `--project_id` option to change the project;
default project_id is `moz-fx-data-shared-prod`.
"""
)
@click.argument("qualified_table_name")
@click.option(
"--parallelism",
default=16,
type=int,
help="Maximum number of queries to execute concurrently",
)
@sql_dir_option
@project_id_option(
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
)
@click.pass_context
def initiate(
ctx,
qualified_table_name,
parallelism,
sql_dir,
project_id,
):
"""Process backfill entry with initiate status in backfill.yaml file(s)."""
click.echo("Backfill processing (initiate) started....")
backfills_to_process_dict = get_scheduled_backfills(
sql_dir, project_id, qualified_table_name, status=BackfillStatus.INITIATE.value
)
if not backfills_to_process_dict:
click.echo(f"No backfill processed for {qualified_table_name}")
return
entry_to_initiate = backfills_to_process_dict[qualified_table_name]
backfill_staging_qualified_table_name = get_backfill_staging_qualified_table_name(
qualified_table_name, entry_to_initiate.entry_date
)
project, dataset, table = qualified_table_name_matching(qualified_table_name)
query_path = Path(sql_dir) / project / dataset / table / "query.sql"
# create schema before deploying staging table if it does not exist
schema_path = query_path.parent / SCHEMA_FILE
if not schema_path.exists():
# if schema doesn't exist, a schema file is created to allow backfill staging table deployment
Schema.from_query_file(
query_file=query_path,
respect_skip=False,
sql_dir=sql_dir,
).to_yaml_file(schema_path)
click.echo(f"Schema file created for {qualified_table_name}: {schema_path}")
try:
deploy_table(
artifact_file=query_path,
destination_table=backfill_staging_qualified_table_name,
respect_dryrun_skip=False,
)
except (SkippedDeployException, FailedDeployException) as e:
raise RuntimeError(
f"Backfill initiate failed to deploy {query_path} to {backfill_staging_qualified_table_name}."
) from e
billing_project = DEFAULT_BILLING_PROJECT
# override with billing project from backfill entry
if entry_to_initiate.billing_project is not None:
billing_project = entry_to_initiate.billing_project
elif not billing_project.startswith("moz-fx-data-backfill-"):
raise ValueError(
f"Invalid billing project: {billing_project}. Please use one of the projects assigned to backfills."
)
sys.exit(1)
click.echo(
f"\nInitiating backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date} via dry run:"
)
_initiate_backfill(
ctx,
qualified_table_name,
backfill_staging_qualified_table_name,
entry_to_initiate,
parallelism,
dry_run=True,
billing_project=billing_project,
)
click.echo(
f"\nInitiating backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date}:"
)
_initiate_backfill(
ctx,
qualified_table_name,
backfill_staging_qualified_table_name,
entry_to_initiate,
parallelism,
billing_project=billing_project,
)
click.echo(
f"Processed backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date}"
)
def _initiate_backfill(
ctx,
qualified_table_name: str,
backfill_staging_qualified_table_name: str,
entry: Backfill,
parallelism: int = 16,
dry_run: bool = False,
billing_project=DEFAULT_BILLING_PROJECT,
):
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login --update-adc` "
"and check that the project is set correctly."
)
sys.exit(1)
project, dataset, table = qualified_table_name_matching(qualified_table_name)
logging_str = f"""Initiating backfill for {qualified_table_name} (destination: {backfill_staging_qualified_table_name}).
Query will be executed in {billing_project}."""
if dry_run:
logging_str += " This is a dry run."
log.info(logging_str)
custom_query_path = None
checks = None
custom_checks_name = None
metadata = Metadata.from_file(
Path("sql") / project / dataset / table / METADATA_FILE
)
# Stop if the metadata contains shredder mitigation label and the backfill doesn't.
if (
SHREDDER_MITIGATION_LABEL in metadata.labels
and entry.shredder_mitigation is False
):
click.echo(
click.style(
f"This backfill cannot continue.\nManaged backfills for tables with metadata label"
f" {SHREDDER_MITIGATION_LABEL} require using --shredder_mitigation.",
fg="yellow",
)
)
sys.exit(1)
client = bigquery.Client(project=project)
if entry.shredder_mitigation is True:
click.echo(
click.style(
f"Generating query with shredder mitigation for {dataset}.{table}...",
fg="blue",
)
)
query_path, _ = generate_query_with_shredder_mitigation(
client=client,
project_id=project,
dataset=dataset,
destination_table=table,
staging_table_name=backfill_staging_qualified_table_name,
backfill_date=entry.start_date.isoformat(),
)
custom_query_path = Path(query_path) / f"{SHREDDER_MITIGATION_QUERY_NAME}.sql"
checks = True
custom_checks_name = f"{SHREDDER_MITIGATION_CHECKS_NAME}.sql"
click.echo(
click.style(
f"Starting backfill with custom query: '{custom_query_path}'.",
fg="blue",
)
)
elif entry.custom_query_path:
custom_query_path = Path(entry.custom_query_path)
# rewrite query to query the staging table instead of the prod table if table depends on past
if metadata.scheduling.get("depends_on_past"):
query_path = (
custom_query_path or Path("sql") / project / dataset / table / "query.sql"
)
# format to ensure fully qualified references
query_text = reformat(query_path.read_text())
updated_query_text = query_text.replace(
f"`{project}.{dataset}.{table}`",
f"`{backfill_staging_qualified_table_name}`",
)
replaced_ref_query = query_path.parent / "replaced_ref.sql"
replaced_ref_query.write_text(updated_query_text)
custom_query_path = replaced_ref_query
override_retention_limit = entry.override_retention_limit
# copy previous partition if depends_on_past
_initialize_previous_partition(
client,
qualified_table_name,
backfill_staging_qualified_table_name,
metadata,
entry,
)
# Backfill table
# in the long-run we should remove the query backfill command and require a backfill entry for all backfills
try:
ctx.invoke(
query_backfill,
name=f"{dataset}.{table}",
project_id=project,
start_date=datetime.fromisoformat(entry.start_date.isoformat()),
end_date=datetime.fromisoformat(entry.end_date.isoformat()),
exclude=[e.strftime("%Y-%m-%d") for e in entry.excluded_dates],
destination_table=backfill_staging_qualified_table_name,
parallelism=parallelism,
dry_run=dry_run,
**(
{
k: param
for k, param in [
("custom_query_path", custom_query_path),
("checks", checks),
("checks_file_name", custom_checks_name),
]
if param is not None
}
),
billing_project=billing_project,
override_retention_range_limit=override_retention_limit,
)
except subprocess.CalledProcessError as e:
raise ValueError(
f"Backfill initiate resulted in error for {qualified_table_name}"
) from e
def _initialize_previous_partition(
client: bigquery.Client,
table_name: str,
staging_table_name: str,
metadata: Metadata,
backfill_entry: Backfill,
):
"""Initialize initial partition for tables with depends_on_past=true.
For tables with a null date partition parameter, the entire table is copied
"""
if (
metadata.scheduling is None
or not metadata.scheduling.get("depends_on_past")
or metadata.bigquery is None
or metadata.bigquery.time_partitioning is None
):
return
match metadata.bigquery.time_partitioning.type:
case PartitionType.DAY:
previous_partition_date = backfill_entry.start_date - timedelta(days=1)
case PartitionType.MONTH:
previous_partition_date = backfill_entry.start_date - relativedelta(
months=1
)
case _:
raise ValueError(
"Unsupported partitioning type for backfills: "
f"{metadata.bigquery.time_partitioning.type}"
)
partition_param, offset = "submission_date", 0
if metadata.scheduling:
partition_param = metadata.scheduling.get(
"date_partition_parameter", partition_param
)
offset = metadata.scheduling.get("date_partition_offset", offset)
previous_partition_id = get_backfill_partition(
previous_partition_date,
partition_param,
offset,
metadata.bigquery.time_partitioning.type,
)
if previous_partition_id is None:
raise ValueError(
f"Unable to get initial partition id for depends_on_past table: {table_name}"
)
_copy_table(
source_table=f"{table_name}${previous_partition_id}",
destination_table=f"{staging_table_name}${previous_partition_id}",
client=client,
)
@backfill.command(
help="""Complete entry in backfill.yaml with Complete status that has not yet been processed..
Examples:
\b
# Complete backfill entry for specific table
./bqetl backfill complete moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
Use the `--project_id` option to change the project;
default project_id is `moz-fx-data-shared-prod`.
"""
)
@click.argument("qualified_table_name")
@sql_dir_option
@project_id_option("moz-fx-data-shared-prod")
@click.pass_context
def complete(ctx, qualified_table_name, sql_dir, project_id):
"""Process backfill entry with complete status in backfill.yaml file(s)."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login --update-adc` "
"and check that the project is set correctly."
)
sys.exit(1)
client = bigquery.Client(project=project_id)
click.echo("Backfill processing (complete) started....")
backfills_to_process_dict = get_scheduled_backfills(
sql_dir, project_id, qualified_table_name, status=BackfillStatus.COMPLETE.value
)
if not backfills_to_process_dict:
click.echo(f"No backfill processed for {qualified_table_name}")
return
entry_to_complete = backfills_to_process_dict[qualified_table_name]
click.echo(
f"Completing backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}:"
)
backfill_staging_qualified_table_name = get_backfill_staging_qualified_table_name(
qualified_table_name, entry_to_complete.entry_date
)
# clone production table
cloned_table_full_name = get_backfill_backup_table_name(
qualified_table_name, entry_to_complete.entry_date
)
_copy_table(qualified_table_name, cloned_table_full_name, client, clone=True)
project, dataset, table = qualified_table_name_matching(qualified_table_name)
table_metadata = Metadata.from_file(
Path(sql_dir) / project / dataset / table / METADATA_FILE
)
_copy_backfill_staging_to_prod(
backfill_staging_qualified_table_name,
qualified_table_name,
client,
entry_to_complete,
table_metadata,
)
# delete backfill staging table
client.delete_table(backfill_staging_qualified_table_name)
click.echo(
f"Backfill staging table deleted: {backfill_staging_qualified_table_name}"
)
click.echo(
f"Processed backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}"
)
def _copy_backfill_staging_to_prod(
backfill_staging_table: str,
qualified_table_name: str,
client: bigquery.Client,
entry: Backfill,
table_metadata: Metadata,
):
"""Copy backfill staging table to prod based on table metadata and backfill config.
If table is
un-partitioned: copy the entire staging table to production.
partitioned: determine and copy each partition from staging to production.
"""
partitioning_type = None
if table_metadata.bigquery and table_metadata.bigquery.time_partitioning:
partitioning_type = table_metadata.bigquery.time_partitioning.type
if partitioning_type is None:
_copy_table(backfill_staging_table, qualified_table_name, client)
else:
backfill_date_range = BackfillDateRange(
entry.start_date,
entry.end_date,
excludes=entry.excluded_dates,
range_type=partitioning_type,
)
# If date_partition_parameter isn't set it's assumed to be submission_date:
# https://github.com/mozilla/telemetry-airflow/blob/dbc2782fa23a34ae8268e7788f9621089ac71def/utils/gcp.py#L194C48-L194C48
partition_param, offset = "submission_date", 0
if table_metadata.scheduling:
partition_param = table_metadata.scheduling.get(
"date_partition_parameter", partition_param
)
offset = table_metadata.scheduling.get("date_partition_offset", offset)
for backfill_date in backfill_date_range:
if (
partition := get_backfill_partition(
backfill_date,
partition_param,
offset,
partitioning_type,
)
) is None:
raise ValueError(
f"Null partition found completing backfill {entry} for {qualified_table_name}."
)
production_table = f"{qualified_table_name}${partition}"
backfill_table = f"{backfill_staging_table}${partition}"
_copy_table(backfill_table, production_table, client)
def _copy_table(
source_table: str, destination_table: str, client, clone: bool = False
) -> None:
"""
Copy and overwrite table from source to destination table.
If clone is True, clone (previous) production data for backup before swapping stage data into production.
"""
job_type_str = "copied"
if clone:
copy_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,
operation_type=bigquery.job.copy_.OperationType.CLONE,
destination_expiration_time=(datetime.now() + timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
)
job_type_str = "cloned"
else:
copy_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
operation_type=bigquery.job.copy_.OperationType.COPY,
)
try:
client.copy_table(
source_table,
destination_table,
job_config=copy_config,
).result()
except NotFound:
click.echo(f"Source table not found: {source_table}")
sys.exit(1)
except Conflict:
click.echo(f"Backup table already exists: {destination_table}")
sys.exit(1)
click.echo(
f"Table {source_table} successfully {job_type_str} to {destination_table}"
)