in o2a/transformers/add_node_notificaton_transformer.py [0:0]
def add_all_notifications(self, workflow: Workflow):
task_groups_snapshot = workflow.task_groups.copy().values()
for task_group in task_groups_snapshot:
upstreams: List[TaskGroup] = workflow.find_upstream_task_group(task_group)
if not upstreams:
# TG with no upstream
if isinstance(task_group, ActionTaskGroup):
self._add_status(task_group, workflow)
elif len(upstreams) == 1 and upstreams[0].name in NOTIFICATION_TASK_GROUP_NAMES:
# TG with only the start workflow upstream
if isinstance(task_group, ActionTaskGroup):
self._add_status(task_group, workflow, upstreams[0])
else:
# There is at least 1 upstream TG
pass
for task_group in task_groups_snapshot:
# Now we go downstream only
# For correct execution (of e.g. decision) we need to make a snapshot (copy) of downstream names
downstream_names_snapshot = task_group.downstream_names.copy()
for downstream_name in downstream_names_snapshot:
downstream: TaskGroup = workflow.task_groups[downstream_name]
if isinstance(task_group, ActionTaskGroup) and isinstance(downstream, ActionTaskGroup):
# action -> action = T: S:
self._add_transition_and_status(downstream_name, task_group, workflow)
if isinstance(task_group, ActionTaskGroup) and isinstance(downstream, ControlTaskGroup):
# action -> control = T:
self._add_transition(downstream_name, task_group, workflow)
if isinstance(task_group, ControlTaskGroup) and isinstance(downstream, ActionTaskGroup):
# control -> action = T: S:
self._add_transition_and_status(downstream_name, task_group, workflow)
if isinstance(task_group, ControlTaskGroup) and isinstance(downstream, ControlTaskGroup):
# control -> control = T:
self._add_transition(downstream_name, task_group, workflow)