in backfill/2024-03-23-clients_last_seen/telemetry_derived_clients_last_seen_v2_20240322/backfill_clients_last_seen_v2.py [0:0]
def _backfill_staging_table(client, job_config, project_id, dataset, destination_table, bigquery_schema, submission_date, sample_id):
"""Backfill for a submission_date, sample_id combination."""
full_table_id = f"{project_id}.{dataset}.{destination_table}_{sample_id}"
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
table.schema = bigquery_schema
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='submission_date'
)
table.clustering_fields =['normalized_channel', 'sample_id']
if not table.created:
client.create_table(table)
click.echo(f"Destination table {full_table_id} created.")
else:
client.update_table(table, ["schema"])
if (
partition := get_backfill_partition(
submission_date,
"submission_date",
0,
PartitionType.DAY
)
) is not None:
dest_table = f"{destination_table}_{sample_id}${partition}"
print(f"Running a backfill for sample_id={sample_id} to {dest_table}")
arguments = (
['query', '--use_legacy_sql=false', '--replace', '--project_id=moz-fx-data-shared-prod',
'--format=none']
+ [f'--dataset_id={dataset}']
+ [f'--destination_table={dest_table}']
)
with tempfile.NamedTemporaryFile(mode="w+") as query_stream:
query_stream.write(
PARTITION_QUERY.format(submission_date=submission_date, sample_id=sample_id, full_table_id=full_table_id)
)
query_stream.seek(0)
subprocess.check_call(["bq"] + arguments, stdin=query_stream)