workflows-generator/WorkflowsGenerator.py (170 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from commons import *
class WorkflowsGenerator:
def __init__(self, workflow_config, exec_config, generate_for_pipeline, config_file ):
self.workflow_config = workflow_config
self.exec_config = exec_config
self.generate_for_pipeline = generate_for_pipeline
self.config_file = config_file
self.workflow_template = ''
self.level_template = ''
self.thread_template = ''
self.cloud_function_async_template = ''
self.workflows_sync_template = ''
self.cloud_function_sync_template = ''
self.workflows_folder = "workflows-templates"
def load_templates(self):
"""method for loading templates"""
self.workflow_template = read_template("workflow",self.generate_for_pipeline, self.workflows_folder, "json")
self.level_template = read_template("level",self.generate_for_pipeline, self.workflows_folder, "json")
self.thread_template = read_template("thread",self.generate_for_pipeline, self.workflows_folder, "json")
self.cloud_function_async_template = read_template("async_call",self.generate_for_pipeline, self.workflows_folder, "json")
def get_unidented_template(self,template):
new_template_lines = []
lines = template.splitlines()
for line in lines:
line = line[12:]
new_template_lines.append(line)
return '\n'.join(new_template_lines)
def generate_workflows_body(self):
"""method to generate cloud workflows body"""
levels = self.process_levels(self.workflow_config)
workflow_body = self.workflow_template.replace("<<LEVELS>>", "".join(levels))
return workflow_body
def process_levels(self,config):
"""method to process levels"""
levels = []
for index, level in enumerate(config):
threads = self.process_threads(level.get("THREADS"), level.get("LEVEL_ID"))
if len(threads) == 1:
level_body = " <<THREADS>>"
else:
level_body = self.level_template.replace("{LEVEL_ID}", level.get("LEVEL_ID"))
level_body = level_body.replace("<<THREADS>>", "".join(threads))
if len(level.get("THREADS")) == 1:
level_body = self.get_unidented_template(level_body)
levels.append(level_body)
return levels
def process_threads(self,threads, level_id):
"""method to process threads"""
thread_bodies = []
single_thread = len(threads) == 1
for index, thread in enumerate(threads):
thread_body = self.thread_template.replace("{LEVEL_ID}", level_id)
thread_body = thread_body.replace("{THREAD_ID}", thread.get("THREAD_ID"))
#first_step_in_thread = thread.get("STEPS")[0].get("JOB_ID") + "_" + thread.get("STEPS")[0].get("JOB_NAME")
first_step_in_thread = thread.get("STEPS")[0].get("JOB_NAME")
thread_body = thread_body.replace("{STARTING_JOB_ID}", first_step_in_thread)
steps = self.process_steps(thread.get("STEPS"), level_id, thread.get("THREAD_ID"), single_thread)
thread_body = thread_body.replace("<<THREAD_STEPS>>", "".join(steps))
thread_bodies.append(thread_body)
return thread_bodies
def process_steps(self,steps, level_id, thread_id, single_thread):
"""method to process steps"""
step_bodies = []
cloud_function_intermediate_name = self.exec_config.get("pFunctionIntermediateName")
jobs_definitions_bucket = self.exec_config.get("pJobsDefinitionsBucket")
for index, step in enumerate(steps):
step_body = ''
if step.get("TYPE") == 'sync':
step_body = self.process_step_sync(
assemble_cloud_function_id(cloud_function_intermediate_name, self.exec_config), step,
step.get("FUNCTION_NAME"))
elif step.get("TYPE") == 'async':
step_body = self.process_step_async(level_id,
assemble_cloud_function_id(cloud_function_intermediate_name, self.exec_config), step,
step.get("FUNCTION_ID_NAME"),
step.get("FUNCTION_STATUS_NAME"),jobs_definitions_bucket)
#TODO workflows functionality
elif step.get("TYPE") == 'workflows':
workflows_name = step.get("workflows_name")
workflows_id = assemble_workflows_id(workflows_name)
step_body = self.process_step_workflows(workflows_id, step)
step_body = step_body.replace("{LEVEL_ID}", level_id)
step_body = step_body.replace("{THREAD_ID}", thread_id)
step_body = self.process_next_step(steps, step, index, level_id, thread_id, step_body,single_thread)
step_bodies.append(step_body)
return step_bodies
def process_step_sync(self,cloud_function_level_1_id, step, cloud_function_name):
"""method to process sync step"""
#step_name = step.get("JOB_ID") + "_" + step.get("JOB_NAME")
step_name = step.get("JOB_NAME")
step_body = self.cloud_function_sync_template.replace("{JOB_ID}", step_name)
step_body = step_body.replace("{CLOUD_FUNCITON_ID}", cloud_function_level_1_id)
step_body = step_body.replace("{CLOUD_FUNCTION_TO_INVOKE}", cloud_function_name)
step_body = step_body.replace("{ENVIRONMENT}", self.environment)
step_body = step_body.replace("{JOB_IDENTIFIER}", step.get("JOB_ID"))
step_body = step_body.replace("{JOB_NAME}", step.get("JOB_NAME"))
if "TIMEOUT_SECONDS" in step.keys():
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}",
'"TimeoutSeconds": ' + step.get("TIMEOUT_SECONDS") + ',')
else:
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}", '')
#TODO continue if fail logic
if "CONTINUE_IF_FAIL" in step.keys():
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", ',"PcontinueIfFail": "True"')
else:
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", '')
return step_body
def process_step_async(self,level_id, cloud_funciton_level_1_id, step, FUNCTION_ID_NAME, FUNCTION_STATUS_NAME, jobs_definitions_bucket):
"""method to process async step"""
#step_name = step.get("JOB_ID") + "_" + step.get("JOB_NAME")
step_name = step.get("JOB_NAME")
step_body = self.cloud_function_async_template.replace("{JOB_ID}", step_name)
step_body = step_body.replace("{LEVEL_ID}", level_id)
step_body = step_body.replace("{CLOUD_FUNCTION_ID}", cloud_funciton_level_1_id)
step_body = step_body.replace("{CLOUD_FUNCTION_ID_TO_INVOKE}", assemble_cloud_function_id(FUNCTION_ID_NAME,self.exec_config))
step_body = step_body.replace("{CLOUD_FUNCTION_STATUS_TO_INVOKE}",assemble_cloud_function_id(FUNCTION_STATUS_NAME,self.exec_config))
step_body = step_body.replace("{JOB_IDENTIFIER}", step.get("JOB_ID"))
step_body = step_body.replace("{JOB_NAME}", step.get("JOB_NAME"))
step_body = step_body.replace("{WAIT_TIME_SECONDS}", step.get("WAIT_TIME_SECONDS"))
step_body = step_body.replace("{WAIT_TIME_SECONDS}", step.get("WAIT_TIME_SECONDS"))
step_body = step_body.replace("{ASYNC_JOB_ID_VARIABLE_NAME}", step.get("JOB_ID") +
"_async_job_id")
step_body = step_body.replace("{ASYNC_JOB_STATUS_VARIABLE_NAME}", step.get("JOB_ID") +
"_async_job_status")
if "READ_INPUT_FROM" in step.keys():
step_body = step_body.replace("{READ_INPUT_FROM}", step.get("READ_INPUT_FROM"))
else:
step_body = step_body.replace("{READ_INPUT_FROM}", "ENV")
if "TIMEOUT_SECONDS" in step.keys():
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}",
'"TimeoutSeconds": ' + step.get("TIMEOUT_SECONDS") + ',')
else:
step_body = step_body.replace("{TIMEOUT_SECONDS_BLOCK}", '')
if "ASYNC_TIMEOUT_LOOP_IN_MINUTES" in step.keys():
step_body = step_body.replace("{ASYNC_TIMEOUT_LOOP_BLOCK}",
',"PtimeoutMinutes": ' + step.get(
"ASYNC_TIMEOUT_LOOP_IN_MINUTES"))
else:
step_body = step_body.replace("{ASYNC_TIMEOUT_LOOP_BLOCK}", '')
if "CONTINUE_IF_FAIL" in step.keys():
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", ',"PcontinueIfFail": "True"')
else:
step_body = step_body.replace("{CONTINUE_IF_FAIL_BLOCK}", '')
if "STEP_PROPERTIES" in step.keys():
step_body = step_body.replace("{STEP_PROPERTIES_BLOCK}", "step_properties: > \n"
+ " "
+ step.get("STEP_PROPERTIES"))
else:
step_body = step_body.replace("{STEP_PROPERTIES_BLOCK}", "step_properties: > \n"
+ " "
+ str(f'{{"jobs_definitions_bucket":"{jobs_definitions_bucket}"}}'))
return step_body
def process_step_workflows(self,workflows_id, step):
"""method to process step of workflows type"""
#step_name = step.get("JOB_ID") + "_" + step.get("JOB_NAME")
step_name = step.get("JOB_NAME")
step_body = self.workflows_sync_template.replace("{JOB_ID}", step_name)
step_body = step_body.replace("{workflows_id}", workflows_id)
return step_body
def process_next_step(self,steps, step, index, level_id, thread_id, step_body, single_thread):
"""method to process next step"""
if step.get("TYPE") in ('sync', 'async', 'workflows'):
if "NEXT" not in step.keys():
try:
next_step = steps[index + 1]
#next_step_name = next_step.get("JOB_ID") + "_" + next_step.get("JOB_NAME")
next_step_name = next_step.get("JOB_NAME")
step_body = step_body.replace("{NEXT_JOB_ID}", next_step_name)
except (KeyError, IndexError):
if level_exists(int(level_id) + 1, self.workflow_config) and single_thread:
if level_exists_and_is_parallel(int(level_id) + 1, self.workflow_config):
step_body = step_body.replace("{NEXT_JOB_ID}",
"Level_" + str(int(level_id) + 1))
else:
step_body = step_body.replace("{NEXT_JOB_ID}",
"Level_" + str(int(level_id) + 1) + "_Thread_" + thread_id)
else:
if single_thread:
step_body = step_body.replace("{NEXT_JOB_ID}","end")
else:
step_body = step_body.replace("{NEXT_JOB_ID}","continue")
else:
next_step = find_step_by_id(step.get("NEXT"))
#next_step_name = next_step.get("JOB_ID") + "_" + next_step.get("JOB_NAME")
next_step_name = next_step.get("JOB_NAME")
step_body = step_body.replace("{NEXT_JOB_ID}", next_step_name)
return step_body