in backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py [0:0]
def initialize(ctx, name, sql_dir, project_id, dry_run):
"""Create the destination table for the provided query."""
if not is_authenticated():
click.echo("Authentication required for creating tables.", err=True)
sys.exit(1)
if Path(name).exists():
# Allow name to be a path
query_files = [Path(name)]
else:
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if not query_files:
click.echo(
f"Couldn't find directory matching `{name}`. Failed to initialize query.",
err=True,
)
sys.exit(1)
for query_file in query_files:
sql_content = query_file.read_text()
client = bigquery.Client()
# Enable initialization from query.sql files
# Create the table by deploying the schema and metadata, then run the init.
# This does not currently verify the accuracy of the schema or that it
# matches the query.
if "is_init()" in sql_content:
project = query_file.parent.parent.parent.name
dataset = query_file.parent.parent.name
destination_table = query_file.parent.name
full_table_id = f"{project}.{dataset}.{destination_table}"
try:
table = client.get_table(full_table_id)
except NotFound:
table = bigquery.Table(full_table_id)
if table.created:
raise PreconditionFailed(
f"Table {full_table_id} already exists. The initialization process is terminated."
)
ctx.invoke(deploy, name=full_table_id, force=True)
arguments = [
"query",
"--use_legacy_sql=false",
"--replace",
"--format=none",
]
if dry_run:
arguments += ["--dry_run"]
if "parallel_run" in sql_content:
_initialize_in_parallel(
project=project,
table=destination_table,
dataset=dataset,
query_file=query_file,
arguments=arguments,
parallelism=DEFAULT_PARALLELISM,
addl_templates={
"is_init": lambda: True,
"parallel_run": lambda: True,
},
)
else:
_run_query(
project_id=project,
public_project_id=None,
destination_table=destination_table,
dataset_id=dataset,
query_arguments=arguments,
addl_templates={
"is_init": lambda: True,
},
)
else:
init_files = Path(query_file.parent).rglob("init.sql")
for init_file in init_files:
project = init_file.parent.parent.parent.name
with open(init_file) as init_file_stream:
init_sql = init_file_stream.read()
dataset = Path(init_file).parent.parent.name
destination_table = query_file.parent.name
job_config = bigquery.QueryJobConfig(
dry_run=dry_run,
default_dataset=f"{project}.{dataset}",
destination=f"{project}.{dataset}.{destination_table}",
)
if "CREATE MATERIALIZED VIEW" in init_sql:
click.echo(f"Create materialized view for {init_file}")
# existing materialized view have to be deleted before re-creation
view_name = query_file.parent.name
client.delete_table(
f"{project}.{dataset}.{view_name}", not_found_ok=True
)
else:
click.echo(f"Create destination table for {init_file}")
job = client.query(init_sql, job_config=job_config)
if not dry_run:
job.result()