in script/glam/run_scalar_agg_clustered_query.py [0:0]
def main(submission_date, dst_table, project, billing_project, tmp_project, dataset):
"""Run query per app_version."""
bq_client = bigquery.Client(project=billing_project)
app_versions = [
row["app_version"]
for row in bq_client.query(
VERSION_QUERY_TEMPLATE.format(
date=submission_date, project=project, dataset=dataset
)
).result()
]
print(f"Found versions: {app_versions}")
if len(app_versions) == 0:
print("Source table empty", file=sys.stderr)
sys.exit(1)
sql_path = SQL_BASE_DIR / dst_table / "query.sql"
query_text = sql_path.read_text()
# Write to intermediate table to avoid partial writes to destination table
if tmp_project is None:
tmp_project = project
intermediate_table = f"{tmp_project}.analysis.glam_temp_clustered_query_{dst_table}"
print(f"Writing results to {intermediate_table}")
for i, app_version in enumerate(app_versions):
print(f"Querying for app_version {app_version}")
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter(
"submission_date", "DATE", str(submission_date)
),
bigquery.ScalarQueryParameter("app_version", "INT64", app_version),
],
clustering_fields=["metric", "channel"],
destination=intermediate_table,
default_dataset=f"{project}.{dataset}",
write_disposition=(
bigquery.WriteDisposition.WRITE_TRUNCATE
if i == 0
else bigquery.WriteDisposition.WRITE_APPEND
),
)
query_job = bq_client.query(query_text, job_config=query_config)
# Periodically print so airflow gke operator doesn't think task is dead
elapsed = 0
while not query_job.done():
time.sleep(10)
elapsed += 10
if elapsed % 200 == 10:
print("Waiting on query...")
print(f"Total elapsed: approximately {elapsed} seconds")
results = query_job.result()
print(f"Query job {query_job.job_id} finished")
print(f"{results.total_rows} rows in {intermediate_table}")
copy_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
print(f"Copying {intermediate_table} to {project}.{dataset}.{dst_table}")
bq_client.copy_table(
intermediate_table,
f"{project}.{dataset}.{dst_table}",
job_config=copy_config,
).result()
print(f"Deleting {intermediate_table}")
bq_client.delete_table(intermediate_table)