def process_step_async()

in workflows-generator/ComposerDagGenerator.py [0:0]


    def process_step_async(self, level_id, thread_id, step):
        """method to process async step"""
        step_name = step.get("JOB_NAME")
        step_body = ''
        ##Add new templates here
        if "dataform-tag-executor" in step.get("COMPOSER_STEP"):
            step_body = self.dataform_tag_executor_template.replace("{JOB_ID}", step_name)
        if "dataflow-flextemplate-job-executor" in step.get("COMPOSER_STEP"):
            step_body = self.dataflow_flextemplate_job_executor_template.replace("{JOB_ID}", step_name)
        if "dataproc-serverless-job-executor" in step.get("COMPOSER_STEP"):
            step_body = self.dataproc_serverless_job_executor_template.replace("{JOB_ID}", step_name)
        step_body = step_body.replace("{LEVEL_ID}", level_id)
        step_body = step_body.replace("{THREAD_ID}", thread_id)
        step_body = step_body.replace("{JOB_IDENTIFIER}", step.get("JOB_ID"))
        step_body = step_body.replace("{JOB_NAME}", step.get("JOB_NAME"))

        return step_body