in functions/orchestration-helpers/intermediate/main.py [0:0]
def main(request):
"""
Main function, likely triggered by an HTTP request from cloud workflows.
Acts as an intermediary between workflows and executor functions, taking care of the
non-functional requirements as process metadata creation , error handling, notifications management,
checkpoint management, etc.
Args:
request: The incoming HTTP request object.
Returns:
str: The status of the query execution or the job ID (if asynchronous).
"""
request_json = request.get_json()
print("event: " + str(request_json))
try:
if request_json and 'call_type' in request_json:
call_type = request_json['call_type']
else:
Exception("No call type!")
if call_type == "get_id":
get_id_result = evaluate_error(call_custom_function(request_json, None))
status = 'started' if is_valid_step_id(get_id_result) else 'failed_start'
log_step_bigquery(request_json, status)
return get_id_result
elif call_type == "get_status":
if request_json and 'async_job_id' in request_json:
status = evaluate_error(call_custom_function(request_json, request_json['async_job_id']))
else:
Exception("Job Id not received!")
return status
else:
raise Exception("Invalid call type!")
except Exception as ex:
exception_message = "Exception : " + repr(ex)
# TODO register error in checkpoint table
error_client.report_exception()
logger.error(exception_message)
print(RuntimeError(repr(ex)))
return exception_message, 500