in bigquery_etl/metadata/validate_metadata.py [0:0]
def validate_shredder_mitigation(query_dir, metadata):
"""Check queries with shredder mitigation label comply with requirements."""
has_shredder_mitigation = SHREDDER_MITIGATION_LABEL in metadata.labels
table_not_empty = True
if has_shredder_mitigation:
schema_file = Path(query_dir) / SCHEMA_FILE
if not schema_file.exists():
click.echo(
click.style(
f"Table {query_dir} does not have schema.yaml required for shredder mitigation.",
fg="yellow",
)
)
return False
schema = Schema.from_schema_file(schema_file).to_bigquery_schema()
# This label requires that the query doesn't have id-level columns,
# has a group by that is explicit & all schema columns have descriptions.
query_file = Path(query_dir) / "query.sql"
query_group_by = extract_last_group_by_from_query(sql_path=query_file)
# Validate that this label is only applied to tables in version 1 if they're not empty
# If the table is empty, it should be backfilled before applying the label.
project, dataset, table = extract_from_query_path(Path(query_dir))
client = bigquery.Client(project=project)
error_message = (
f"The shredder-mitigation label can only be applied to existing and "
f"non-empty tables.\nEnsure that the table `{project}.{dataset}.{table}` is deployed"
f" and run a managed backfill without mitigation before applying this label to"
f" a new or empty table."
f"\n\nSubsequent backfills then can use the [shredder mitigation process]"
f"(https://mozilla.github.io/bigquery-etl/cookbooks/creating_a_derived_dataset/#initiating-the-backfill)."
)
# Check that the table exists and it's not empty.
query_table_is_not_empty = (
f"SELECT EXISTS (SELECT 1 "
f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` "
f"WHERE table_name = '{table}' LIMIT 1) AS not_empty;"
)
try:
table_not_empty = client.query(query_table_is_not_empty).result()
except Exception:
click.echo(
click.style(
f"Table {project}.{dataset}.{table} not found or inaccessible."
f" for validation. Please check that the name is correct and if the table"
f" is in a private repository, ensure that it exists and has data before"
f" running a backfill with shredder mitigation.",
fg="yellow",
)
)
if table_not_empty is None or table_not_empty is False:
click.echo(click.style(error_message, fg="yellow"))
return False
# Validate that the query group by is explicit and as required.
integers_in_group_by = False
for e in query_group_by:
try:
int(e)
integers_in_group_by = True
except ValueError:
continue
if (
"ALL" in query_group_by
or not all(isinstance(e, str) for e in query_group_by)
or not query_group_by
or integers_in_group_by
):
click.echo(
"Shredder mitigation validation failed, GROUP BY must use an explicit list "
"of columns. Avoid expressions like `GROUP BY ALL` or `GROUP BY 1, 2, 3`."
)
return False
with open(ID_LEVEL_COLUMNS_FILE_PATH, "r") as columns_file:
columns_from_file = yaml.safe_load(columns_file)
id_level_columns = columns_from_file.get("id_level_columns", [])
for field in schema:
# Validate that the query columns have descriptions.
if not field.description:
click.echo(
f"Shredder mitigation validation failed, {field.name} does not have "
f"a description in the schema."
)
return False
# Validate that id-level columns are not present in the query schema.
if field.name in id_level_columns:
click.echo(
f"Shredder mitigation validation failed, {field.name} is an id-level"
f" column that is not allowed for this type of backfill."
)
return False
return True