in functions/orchestration-helpers/intermediate/main.py [0:0]
def call_custom_function(request_json, async_job_id):
"""
calls an executor function passed by parameter
Args:
request_json: json input object with parameters
async_job_id: if filled, function ask by the execution status. if not, launches the execution for the first time
Returns:
raise Exception if the word "exception" is found in message
str: original message coming from executor functions
"""
workflow_name = request_json['workflow_name']
job_name = request_json['job_name']
workflow_properties = request_json.get('workflow_properties')
step_properties = request_json.get('step_properties')
workflow_properties = join_properties(workflow_properties, step_properties)
params = {
"workflow_properties": workflow_properties,
"workflow_name": workflow_name,
"job_name": job_name,
"query_variables": {
"start_date": "'" + request_json['query_variables']['start_date'] + "'",
"end_date": "'" + request_json['query_variables']['end_date'] + "'"
}
}
if async_job_id:
params['job_id'] = async_job_id
target_function_url = request_json['function_url_to_call']
try:
req = urllib.request.Request(target_function_url, data=json.dumps(params).encode("utf-8"))
auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(auth_req, target_function_url)
req.add_header("Authorization", f"Bearer {id_token}")
req.add_header("Content-Type", "application/json")
response = urllib.request.urlopen(req)
response = response.read()
print('response: ' + str(response))
final_response = ''
# Handle the response
decoded_response = response.decode("utf-8")
if async_job_id is None and is_valid_step_id(decoded_response):
final_response = decoded_response
elif decoded_response in JobStatus.SUCCESS.value:
final_response = "success"
log_step_bigquery(request_json, final_response)
elif decoded_response in JobStatus.RUNNING.value:
final_response = "running"
else: # FAILURE
final_response = f"Exception calling target function {target_function_url.split('/')[-1]}:{decoded_response}"
log_step_bigquery(request_json, "failed")
print("final response: " + final_response)
return final_response
except (urllib.error.HTTPError) as e:
print('Exception: ' + repr(e))
raise Exception(
"Unexpected error in custom function: " + target_function_url.split('/')[-1] + ":" + repr(e))