in bigquery_etl/dryrun.py [0:0]
def validate_schema(self):
"""Check whether schema is valid."""
# delay import to prevent circular imports in 'bigquery_etl.schema'
from .schema import SCHEMA_FILE, Schema
if (
self.skip()
or basename(self.sqlfile) == "script.sql"
or str(self.sqlfile).endswith(".py")
): # noqa E501
print(f"\t...Ignoring schema validation for {self.sqlfile}")
return True
query_file_path = Path(self.sqlfile)
query_schema = Schema.from_json(self.get_schema())
if self.errors():
# ignore file when there are errors that self.get_schema() did not raise
click.echo(f"\t...Ignoring schema validation for {self.sqlfile}")
return True
existing_schema_path = query_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
click.echo(f"No schema file defined for {query_file_path}", err=True)
return True
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
partitioned_by = None
if (
self.metadata
and self.metadata.bigquery
and self.metadata.bigquery.time_partitioning
):
partitioned_by = self.metadata.bigquery.time_partitioning.field
table_schema = Schema.for_table(
project_name,
dataset_name,
table_name,
client=self.client,
id_token=self.id_token,
partitioned_by=partitioned_by,
)
# This check relies on the new schema being deployed to prod
if not query_schema.compatible(table_schema):
click.echo(
click.style(
f"ERROR: Schema for query in {query_file_path} "
f"incompatible with schema deployed for "
f"{project_name}.{dataset_name}.{table_name}\n"
f"Did you deploy new the schema to prod yet?",
fg="red",
),
err=True,
)
return False
else:
existing_schema = Schema.from_schema_file(existing_schema_path)
if not existing_schema.equal(query_schema):
click.echo(
click.style(
f"ERROR: Schema defined in {existing_schema_path} "
f"incompatible with query {query_file_path}",
fg="red",
),
err=True,
)
return False
click.echo(f"Schemas for {query_file_path} are valid.")
return True