def validate_shredder_mitigation()

in bigquery_etl/metadata/validate_metadata.py [0:0]


def validate_shredder_mitigation(query_dir, metadata):
    """Check queries with shredder mitigation label comply with requirements."""
    has_shredder_mitigation = SHREDDER_MITIGATION_LABEL in metadata.labels
    table_not_empty = True

    if has_shredder_mitigation:
        schema_file = Path(query_dir) / SCHEMA_FILE
        if not schema_file.exists():
            click.echo(
                click.style(
                    f"Table {query_dir} does not have schema.yaml required for shredder mitigation.",
                    fg="yellow",
                )
            )
            return False
        schema = Schema.from_schema_file(schema_file).to_bigquery_schema()

        # This label requires that the query doesn't have id-level columns,
        # has a group by that is explicit & all schema columns have descriptions.
        query_file = Path(query_dir) / "query.sql"
        query_group_by = extract_last_group_by_from_query(sql_path=query_file)

        # Validate that this label is only applied to tables in version 1 if they're not empty
        # If the table is empty, it should be backfilled before applying the label.
        project, dataset, table = extract_from_query_path(Path(query_dir))
        client = bigquery.Client(project=project)
        error_message = (
            f"The shredder-mitigation label can only be applied to existing and "
            f"non-empty tables.\nEnsure that the table `{project}.{dataset}.{table}` is deployed"
            f" and run a managed backfill without mitigation before applying this label to"
            f" a new or empty table."
            f"\n\nSubsequent backfills then can use the [shredder mitigation process]"
            f"(https://mozilla.github.io/bigquery-etl/cookbooks/creating_a_derived_dataset/#initiating-the-backfill)."
        )

        # Check that the table exists and it's not empty.
        query_table_is_not_empty = (
            f"SELECT EXISTS (SELECT 1 "
            f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` "
            f"WHERE table_name = '{table}' LIMIT 1) AS not_empty;"
        )

        try:
            table_not_empty = client.query(query_table_is_not_empty).result()
        except Exception:
            click.echo(
                click.style(
                    f"Table {project}.{dataset}.{table} not found or inaccessible."
                    f" for validation. Please check that the name is correct and if the table"
                    f" is in a private repository, ensure that it exists and has data before"
                    f" running a backfill with shredder mitigation.",
                    fg="yellow",
                )
            )

        if table_not_empty is None or table_not_empty is False:
            click.echo(click.style(error_message, fg="yellow"))
            return False

        # Validate that the query group by is explicit and as required.
        integers_in_group_by = False
        for e in query_group_by:
            try:
                int(e)
                integers_in_group_by = True
            except ValueError:
                continue
        if (
            "ALL" in query_group_by
            or not all(isinstance(e, str) for e in query_group_by)
            or not query_group_by
            or integers_in_group_by
        ):
            click.echo(
                "Shredder mitigation validation failed, GROUP BY must use an explicit list "
                "of columns. Avoid expressions like `GROUP BY ALL` or `GROUP BY 1, 2, 3`."
            )
            return False

        with open(ID_LEVEL_COLUMNS_FILE_PATH, "r") as columns_file:
            columns_from_file = yaml.safe_load(columns_file)
            id_level_columns = columns_from_file.get("id_level_columns", [])

        for field in schema:
            # Validate that the query columns have descriptions.
            if not field.description:
                click.echo(
                    f"Shredder mitigation validation failed, {field.name} does not have "
                    f"a description in the schema."
                )
                return False
            # Validate that id-level columns are not present in the query schema.
            if field.name in id_level_columns:
                click.echo(
                    f"Shredder mitigation validation failed, {field.name} is an id-level"
                    f" column that is not allowed for this type of backfill."
                )
                return False
    return True