in workflows-generator/WorkflowsGenerator.py [0:0]
def process_step_async(self,level_id, cloud_funciton_level_1_id, step, FUNCTION_ID_NAME, FUNCTION_STATUS_NAME, jobs_definitions_bucket):
"""method to process async step"""
#step_name = step.get("JOB_ID") + "_" + step.get("JOB_NAME")
step_name = step.get("JOB_NAME")
step_body = self.cloud_function_async_template.replace("{JOB_ID}", step_name)
step_body = step_body.replace("{LEVEL_ID}", level_id)
step_body = step_body.replace("{CLOUD_FUNCTION_ID}", cloud_funciton_level_1_id)
step_body = step_body.replace("{CLOUD_FUNCTION_ID_TO_INVOKE}", assemble_cloud_function_id(FUNCTION_ID_NAME,self.exec_config))
step_body = step_body.replace("{CLOUD_FUNCTION_STATUS_TO_INVOKE}",assemble_cloud_function_id(FUNCTION_STATUS_NAME,self.exec_config))
step_body = step_body.replace("{JOB_IDENTIFIER}", step.get("JOB_ID"))
step_body = step_body.replace("{JOB_NAME}", step.get("JOB_NAME"))
step_body = step_body.replace("{WAIT_TIME_SECONDS}", step.get("WAIT_TIME_SECONDS"))
step_body = step_body.replace("{WAIT_TIME_SECONDS}", step.get("WAIT_TIME_SECONDS"))
step_body = step_body.replace("{ASYNC_JOB_ID_VARIABLE_NAME}", step.get("JOB_ID") +
"_async_job_id")
step_body = step_body.replace("{ASYNC_JOB_STATUS_VARIABLE_NAME}", step.get("JOB_ID") +
"_async_job_status")
if "READ_INPUT_FROM" in step.keys():
step_body = step_body.replace("{READ_INPUT_FROM}", step.get("READ_INPUT_FROM"))
else:
step_body = step_body.replace("{READ_INPUT_FROM}", "ENV")
if "TIMEOUT_SECONDS" in step.keys():
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}",
'"TimeoutSeconds": ' + step.get("TIMEOUT_SECONDS") + ',')
else:
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}", '')
if "ASYNC_TIMEOUT_LOOP_IN_MINUTES" in step.keys():
step_body = step_body.replace("{ASYNC_TIMEOUT_LOOP_BLOCK}",
',"PtimeoutMinutes": ' + step.get(
"ASYNC_TIMEOUT_LOOP_IN_MINUTES"))
else:
step_body = step_body.replace("{ASYNC_TIMEOUT_LOOP_BLOCK}", '')
if "CONTINUE_IF_FAIL" in step.keys():
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", ',"PcontinueIfFail": "True"')
else:
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", '')
if "STEP_PROPERTIES" in step.keys():
step_body = step_body.replace("{STEP_PROPERTIES_BLOCK}", "step_properties: > \n"
+ " "
+ step.get("STEP_PROPERTIES"))
else:
step_body = step_body.replace("{STEP_PROPERTIES_BLOCK}", "step_properties: > \n"
+ " "
+ str(f'{{"jobs_definitions_bucket":"{jobs_definitions_bucket}"}}'))
return step_body