def replace()

in sql/moz-fx-data-shared-prod/telemetry/unnest_parquet_view.sql.py [0:0]


def replace(field, unnest_layer=0, *prefix):
    """Convert BigQuery field to a SQL expression for use in a REPLACE block."""
    if field.field_type == "RECORD":
        is_list = (
            len(field.fields) == 1
            and field.fields[0].name == "list"
            and field.fields[0].mode == "REPEATED"
            and len(field.fields[0].fields) == 1
            and field.fields[0].fields[0].name == "element"
        )
        is_map = (
            len(field.fields) == 1
            and field.fields[0].name == "key_value"
            and field.fields[0].mode == "REPEATED"
        )
        if is_list or is_map:
            # prevent naming collisions via `UNNEST(...) AS`
            unnest_as = f"_{unnest_layer}"
            if is_list:
                unnest = ".".join(prefix + (field.name, "list"))
                nested = replace(field.fields[0].fields[0], unnest_layer + 1, unnest_as)
                # handle simplest case without unnest_as
                if nested == f"{unnest_as}.element":
                    return f"ARRAY(SELECT * FROM UNNEST({unnest})) AS {field.name}"
            elif is_map:
                unnest = ".".join(prefix + (field.name, "key_value"))
                nested = replace(
                    bigquery.SchemaField(
                        field_type="RECORD",
                        fields=field.fields[0].fields,
                        name=unnest_as,
                    ),
                    unnest_layer + 1,
                )
                # handle simplest case without array unnest_layer
                if nested == unnest_as:
                    return f"{unnest} AS {field.name}"
            # shorten special case for nested struct
            special_prefix = f"(SELECT AS STRUCT {unnest_as}.* "
            if nested.startswith(special_prefix):
                index = len(special_prefix)
                suffix = nested[index:].rsplit(")", 1)[0]
                select = f"SELECT AS STRUCT * {suffix}"
            # shorten nested struct
            elif nested.startswith("(SELECT AS STRUCT"):
                select = nested[1:].rsplit(")", 1)[0]
            # handle nested array
            elif nested.startswith("ARRAY"):
                select = f"SELECT AS STRUCT {nested}"
            else:
                select = f"SELECT {nested}"
            from_ = f"FROM UNNEST({unnest}) AS {unnest_as}"
            return f"ARRAY({select} {from_}) AS {field.name}"
        # handle other structs with nested replacements
        replacements = []
        for subfield in field.fields:
            replacement = replace(subfield, unnest_layer, *prefix, field.name)
            if " AS " in replacement:
                replacements += [replacement]
        if replacements:
            select = f"SELECT AS STRUCT {'.'.join(prefix + (field.name, '*'))}"
            replacements = ", ".join(replacements)
            return f"({select} REPLACE ({replacements})) AS {field.name}"
    # no unnesting needed
    return ".".join(prefix + (field.name,))