in backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py [0:0]
def _deploy(query_file):
if respect_dryrun_skip and str(query_file) in DryRun.skipped_files():
click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.")
return
query_file_path = Path(query_file)
existing_schema_path = query_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
click.echo(f"No schema file found for {query_file}")
return
try:
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
if destination_table:
full_table_id = destination_table
else:
full_table_id = f"{project_name}.{dataset_name}.{table_name}"
existing_schema = Schema.from_schema_file(existing_schema_path)
if not force and str(query_file_path).endswith("query.sql"):
query_schema = Schema.from_query_file(
query_file_path,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
)
if not existing_schema.equal(query_schema):
click.echo(
f"Query {query_file_path} does not match "
f"schema in {existing_schema_path}. "
f"To update the local schema file, "
f"run `./bqetl query schema update "
f"{dataset_name}.{table_name}`",
err=True,
)
sys.exit(1)
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
existing_schema.to_json_file(Path(tmp_schema_file.name))
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
table.schema = bigquery_schema
_attach_metadata(query_file_path, table)
if not table.created:
client.create_table(table)
click.echo(f"Destination table {full_table_id} created.")
elif not skip_existing:
client.update_table(
table,
[
"schema",
"friendly_name",
"description",
"time_partitioning",
"clustering_fields",
"labels",
],
)
click.echo(f"Schema (and metadata) updated for {full_table_id}.")
except Exception:
print_exc()
return query_file