in functions/data-processing-engines/dataform-tag-executor/main.py [0:0]
def main(request):
"""
Main function, likely triggered by an HTTP request. Extracts parameters, reads a repository from
Dataform, executes the file's contents as a BigQuery query, and reports the result status or job ID.
Args:
request: The incoming HTTP request object.
Returns:
str: The status of the query execution or the job ID (if asynchronous).
"""
request_json = request.get_json(silent=True)
print("event:" + str(request_json))
try:
job_name = request_json.get('job_name', None)
workflow_name = request_json.get('workflow_name', None)
jobs_definitions_bucket = request_json.get("workflow_properties", {}).get("jobs_definitions_bucket")
repository_name = None
tags = None
branch = None
dataform_location = None
dataform_project_id = None
if jobs_definitions_bucket:
extracted_params = extract_params(
bucket_name=jobs_definitions_bucket,
job_name=job_name,
function_name=function_name
)
repository_name = extracted_params.get("repository_name")
tags = extracted_params.get("tags")
branch = extracted_params.get("branch")
dataform_location = extracted_params.get("dataform_location")
dataform_project_id = extracted_params.get("dataform_project_id")
job_id = request_json.get('job_id', None)
query_variables = request_json.get('query_variables', None)
status_or_job_id = run_repo_or_get_status(job_id, gcp_project=dataform_project_id, location=dataform_location,
repo_name=repository_name, tags=tags, branch=branch,
query_variables=query_variables)
if status_or_job_id.startswith('aef_'):
print(f"Running Query, track it with Job ID: {status_or_job_id}")
else:
print(f"Query finished with status: {status_or_job_id}")
return status_or_job_id
except Exception as error:
err_message = "Exception: " + repr(error)
response = {
"error": error.__class__.__name__,
"message": repr(err_message)
}
return response