in script/marketing/generate_app_store_queries.py [0:0]
def main(project, source_dataset, destination_dataset, create_table, backfill, dryrun):
"""Generate queries and optionally create the tables in BigQuery."""
client = bigquery.Client(project=project)
exported_tables = [
table.table_id
for table in client.list_tables(source_dataset)
if table.table_type == "TABLE"
]
tables_by_dimension = defaultdict(list)
opt_in_metrics = set()
# group table names by the dimension it is grouped by
for table_name in exported_tables:
if table_name.endswith("_total"):
dimension = None
else:
metric, dimension = table_name.split("_by_")
if dimension.startswith("opt_in"):
opt_in_metrics.add(metric)
dimension = dimension.replace("opt_in_", "")
tables_by_dimension[dimension].append(table_name)
for dimension, table_names in tables_by_dimension.items():
qualified_table_names = [
f"`{project}.{source_dataset}.{table_name}`" for table_name in table_names
]
if dimension is not None:
fields = f"date, app_name, {dimension}"
table_name = f"metrics_by_{dimension}"
metrics = [table_name.split("_by_")[0] for table_name in table_names]
else:
fields = "date, app_name"
table_name = "metrics_total"
metrics = [table_name.split("_total")[0] for table_name in table_names]
join_clauses = [
JOIN_TEMPLATE.format(table=table_name, fields=fields)
for table_name in qualified_table_names[1:]
]
# add _opt_in to opt-in metrics
fields_to_add_opt_in = [
metric for metric in metrics if metric in opt_in_metrics
]
excepted_fields = ",".join(fields_to_add_opt_in)
additional_fields = [
f"{name} AS {name}_opt_in"
for name in fields_to_add_opt_in
if name != "rate"
]
# rename rate column to opt_in_rate and
if "rate" in metrics:
additional_fields.append("rate AS opt_in_rate")
query_text = QUERY_TEMPLATE.format(
excepted_fields=excepted_fields,
additional_fields=", ".join(additional_fields),
first_table=qualified_table_names[0],
joined_tables="\n".join(join_clauses),
filter="date=@submission_date",
)
query_path = os.path.join(SQL_DIR, destination_dataset, table_name, "query.sql")
if not os.path.exists(os.path.dirname(query_path)):
os.makedirs(os.path.dirname(query_path))
with open(query_path, "w") as f:
print(f"Writing {query_path}")
f.write(reformat(query_text))
f.write("\n")
if create_table:
query_text = QUERY_TEMPLATE.format(
excepted_fields=excepted_fields,
additional_fields=", ".join(additional_fields),
first_table=qualified_table_names[0],
joined_tables="\n".join(join_clauses),
filter="TRUE" if backfill else "FALSE",
)
schema_update_options = (
[] if backfill else [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]
)
job_config = bigquery.QueryJobConfig(
use_legacy_sql=False,
dry_run=dryrun,
destination=f"{project}.{destination_dataset}.{table_name}",
schema_update_options=schema_update_options,
time_partitioning=bigquery.TimePartitioning(field="date"),
create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
write_disposition=(
bigquery.WriteDisposition.WRITE_TRUNCATE
if backfill
else bigquery.WriteDisposition.WRITE_APPEND
),
)
print(f"Creating table {table_name}")
query_job = client.query(query_text, job_config)
if not dryrun:
query_job.result()