in functions/data-processing-engines/bq-saved-query-executor/main.py [0:0]
def main(request):
"""
Main function, likely triggered by an HTTP request. Extracts parameters, reads a BigQuery saved query
(backed by Dataform),executes the file's contents as a BigQuery query, and reports the result status or job ID.
Args:
request: The incoming HTTP request object.
Returns:
str: The status of the query execution or the job ID (if asynchronous).
"""
request_json = request.get_json(silent=True)
print("event:" + str(request_json))
try:
dataform_location = request_json['workflow_properties']['dataform_location']
dataform_project_id = request_json['workflow_properties']['dataform_project_id']
repository_name = request_json['workflow_properties']['repository_name']
workflow_name = request_json['workflow_name']
job_name = request_json['job_name']
file_path = f"definitions/{workflow_name}/{job_name}.sqlx"
job_id = request_json.get('job_id', None)
query_variables = request_json.get('query_variables', None)
query_file = read_file(dataform_project_id, dataform_location, repository_name, file_path, query_variables)
status_or_job_id = execute_query_or_get_status(query_file, file_path, job_id)
if status_or_job_id.startswith('aef_'):
print(f"Running Query, track it with Job ID: {status_or_job_id}")
else:
print(f"Query finished with status: {status_or_job_id}")
return status_or_job_id
except Exception as error:
err_message = "Exception: " + repr(error)
response = {
"error": error.__class__.__name__,
"message": repr(error)
}
return response