in workflows-generator/ComposerDagGenerator.py [0:0]
def process_step_async(self, level_id, thread_id, step):
"""method to process async step"""
step_name = step.get("JOB_NAME")
step_body = ''
##Add new templates here
if "dataform-tag-executor" in step.get("COMPOSER_STEP"):
step_body = self.dataform_tag_executor_template.replace("{JOB_ID}", step_name)
if "dataflow-flextemplate-job-executor" in step.get("COMPOSER_STEP"):
step_body = self.dataflow_flextemplate_job_executor_template.replace("{JOB_ID}", step_name)
if "dataproc-serverless-job-executor" in step.get("COMPOSER_STEP"):
step_body = self.dataproc_serverless_job_executor_template.replace("{JOB_ID}", step_name)
step_body = step_body.replace("{LEVEL_ID}", level_id)
step_body = step_body.replace("{THREAD_ID}", thread_id)
step_body = step_body.replace("{JOB_IDENTIFIER}", step.get("JOB_ID"))
step_body = step_body.replace("{JOB_NAME}", step.get("JOB_NAME"))
return step_body