def add_all_notifications()

in o2a/transformers/add_node_notificaton_transformer.py [0:0]


    def add_all_notifications(self, workflow: Workflow):
        task_groups_snapshot = workflow.task_groups.copy().values()
        for task_group in task_groups_snapshot:
            upstreams: List[TaskGroup] = workflow.find_upstream_task_group(task_group)
            if not upstreams:
                # TG with no upstream
                if isinstance(task_group, ActionTaskGroup):
                    self._add_status(task_group, workflow)
            elif len(upstreams) == 1 and upstreams[0].name in NOTIFICATION_TASK_GROUP_NAMES:
                # TG with only the start workflow upstream
                if isinstance(task_group, ActionTaskGroup):
                    self._add_status(task_group, workflow, upstreams[0])
            else:
                # There is at least 1 upstream TG
                pass

        for task_group in task_groups_snapshot:
            # Now we go downstream only
            # For correct execution (of e.g. decision) we need to make a snapshot (copy) of downstream names
            downstream_names_snapshot = task_group.downstream_names.copy()
            for downstream_name in downstream_names_snapshot:
                downstream: TaskGroup = workflow.task_groups[downstream_name]
                if isinstance(task_group, ActionTaskGroup) and isinstance(downstream, ActionTaskGroup):
                    # action -> action = T: S:
                    self._add_transition_and_status(downstream_name, task_group, workflow)
                if isinstance(task_group, ActionTaskGroup) and isinstance(downstream, ControlTaskGroup):
                    # action -> control = T:
                    self._add_transition(downstream_name, task_group, workflow)
                if isinstance(task_group, ControlTaskGroup) and isinstance(downstream, ActionTaskGroup):
                    # control -> action = T: S:
                    self._add_transition_and_status(downstream_name, task_group, workflow)
                if isinstance(task_group, ControlTaskGroup) and isinstance(downstream, ControlTaskGroup):
                    # control -> control = T:
                    self._add_transition(downstream_name, task_group, workflow)