in bigquery_etl/cli/query.py [0:0]
def _initialize(query_file):
project, dataset, destination_table = extract_from_query_path(query_file)
client = bigquery.Client(project=project)
full_table_id = f"{project}.{dataset}.{destination_table}"
table = None
sql_content = query_file.read_text()
materialized_views = list(
map(
Path,
glob(f"{query_file.parent}/**/materialized_view.sql", recursive=True),
)
)
# check if the provided file can be initialized and whether existing ones should be skipped
if "is_init()" in sql_content:
try:
table = client.get_table(full_table_id)
if skip_existing:
# table exists; skip initialization
return
if not force and table.num_rows > 0:
raise click.ClickException(
f"Table {full_table_id} already exists and contains data. The initialization process is terminated."
" Use --force to overwrite the existing destination table."
)
except NotFound:
# continue with creating the table
pass
elif len(materialized_views) == 0:
return
try:
# Enable initialization from query.sql files
# Create the table by deploying the schema and metadata, then run the init.
# This does not currently verify the accuracy of the schema or that it
# matches the query.
if "is_init()" in sql_content:
if not table:
ctx.invoke(
update,
name=full_table_id,
sql_dir=sql_dir,
project_id=project,
update_downstream=False,
is_init=True,
)
ctx.invoke(
deploy,
name=full_table_id,
sql_dir=sql_dir,
project_id=project,
force=True,
respect_dryrun_skip=False,
)
arguments = [
"query",
"--use_legacy_sql=false",
"--format=none",
"--append_table",
"--noreplace",
]
if dry_run:
arguments += ["--dry_run"]
if "@sample_id" in sql_content:
sample_ids = list(range(0, 100))
_initialize_in_parallel(
project=project,
table=full_table_id,
dataset=dataset,
query_file=query_file,
arguments=arguments,
parallelism=parallelism,
sample_ids=sample_ids,
addl_templates={
"is_init": lambda: True,
},
billing_project=billing_project,
)
else:
_run_query(
query_files=[query_file],
project_id=project,
public_project_id=None,
destination_table=full_table_id,
dataset_id=dataset,
query_arguments=arguments,
addl_templates={
"is_init": lambda: True,
},
billing_project=billing_project,
)
else:
for file in materialized_views:
with open(file) as init_file_stream:
init_sql = init_file_stream.read()
job_config = bigquery.QueryJobConfig(
dry_run=dry_run,
default_dataset=f"{project}.{dataset}",
)
# only deploy materialized view if it doesn't exist
# TODO: https://github.com/mozilla/bigquery-etl/issues/5804
try:
materialized_view_table = client.get_table(full_table_id)
# Best-effort check, don't fail if there's an error
try:
has_changes = materialized_view_has_changes(
materialized_view_table.mview_query, init_sql
)
except Exception as e:
change_str = f"failed to compare changes: {e}"
else:
change_str = (
"sql changed" if has_changes else "sql not changed"
)
click.echo(
f"Skipping materialized view {full_table_id}, already exists, {change_str}"
)
except NotFound:
job = client.query(init_sql, job_config=job_config)
if not dry_run:
job.result()
except Exception:
print_exc()
return query_file