in functions/data-processing-engines/bq-saved-query-executor/main.py [0:0]
def execute_query_or_get_status(query_file, file_path, job_id=None):
"""Executes a BigQuery query (if job ID not provided) or gets the status of an existing query.
Args:
query_file (str): The Dataform query to execute.
job_id (str, optional): The ID of an existing BigQuery job. Defaults to None.
Returns:
str: The final state of the query job ('DONE', 'FAILED', etc.) or the query job ID if the query times out.
"""
client = bigquery.Client(project=BIGQUERY_PROJECT)
if job_id:
query_job = client.get_job(job_id)
print(f"Checking status of existing job: {job_id}")
if query_job.done():
if query_job.error_result:
raise BadRequest(query_job.error_result)
return query_job.state
else:
print(f"Query still running in state:{str(query_job.state)}")
return query_job.state
else:
job_id = f"aef_{transform_string(file_path)}_{uuid.uuid4()}"
job_config = bigquery.QueryJobConfig(
priority=bigquery.QueryPriority.BATCH
)
query_job = client.query(query=query_file, job_config=job_config, job_id=job_id)
print(f"New query started. Job ID: {query_job.job_id}")
return query_job.job_id