backfill/2024-11-06-metrics-pings-metric-labels-over-max-length/insert_to_prod.py (78 lines of code) (raw):
"""
Create a insert SQL expression to merge the deduplicated data with the production stable tables.
This script is needed as the order of columns doesn't match between production schema and backfill schemas.
It is necessary to explicitly select columns.
"""
from typing import Optional, Iterable, List, Dict
import yaml
def generate_compatible_select_expression(
source_schema,
target_schema
) -> str:
"""Generate the select expression for the source schema based on the target schema."""
def _type_info(node):
"""Determine the BigQuery type information from Schema object field."""
dtype = node["type"]
if dtype == "RECORD":
dtype = (
"STRUCT<"
+ ", ".join(
f"`{field['name']}` {_type_info(field)}"
for field in node["fields"]
)
+ ">"
)
elif dtype == "FLOAT":
dtype = "FLOAT64"
if node.get("mode") == "REPEATED":
return f"ARRAY<{dtype}>"
return dtype
def recurse_fields(
_source_schema_nodes: List[Dict],
_target_schema_nodes: List[Dict],
path=None,
) -> str:
if path is None:
path = []
select_expr = []
source_schema_nodes = {n["name"]: n for n in _source_schema_nodes}
target_schema_nodes = {n["name"]: n for n in _target_schema_nodes}
# iterate through fields
for node_name, node in target_schema_nodes.items():
dtype = node["type"]
node_path = path + [node_name]
node_path_str = ".".join(node_path)
if node_name in source_schema_nodes: # field exists in app schema
# field matches, can query as-is
if node["name"] == node_name and (
# don't need to unnest scalar
dtype != "RECORD"
):
select_expr.append(node_path_str)
elif (
dtype == "RECORD"
): # for nested fields, recursively generate select expression
if (
node.get("mode", None) == "REPEATED"
): # unnest repeated record
select_expr.append(
f"""
ARRAY(
SELECT
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
[node_name],
)}
)
FROM UNNEST({node_path_str}) AS `{node_name}`
) AS `{node_name}`
"""
)
else: # select struct fields
select_expr.append(
f"""
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
node_path,
)}
) AS `{node_name}`
"""
)
else: # scalar value doesn't match, e.g. different types
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
else: # field not found in source schema
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
return ", ".join(select_expr)
return recurse_fields(
source_schema["fields"],
target_schema["fields"],
)
def main():
with open("stable_metrics.yaml") as stream:
stable_schema = yaml.safe_load(stream)
with open("backfill_metrics.yaml") as stream:
backfill_schema = yaml.safe_load(stream)
select_expression = generate_compatible_select_expression(backfill_schema, stable_schema)
with open("insert.sql", "w") as f:
insert_statement = f"""
INSERT INTO
`moz-fx-data-shared-prod.firefox_desktop_stable.metrics_v1`
{select_expression}
FROM
`moz-fx-data-backfill-1.firefox_desktop_stable.metrics_v1`
WHERE
DATE(submission_timestamp) > "2024-10-01"
"""
f.write(insert_statement)
if __name__ == "__main__":
main()