in o2a/converter/task_group.py [0:0]
def add_state_handler_if_needed(self):
"""
Add additional tasks and relations to handle error and ok flow.
If the error path is specified, additional relations and task are added to handle
the error state.
If the error path and the ok path is specified, additional relations and task are added
to handle the ok path and the error path.
If the error path and the ok path is not-specified, no action is performed.
"""
if not self.error_downstream_name:
return
error_handler_task_id = self.name + "_error"
error_handler = Task(
task_id=error_handler_task_id, template_name="dummy.tpl", trigger_rule=TriggerRule.ONE_FAILED
)
self.error_handler_task = error_handler
new_relations = (
Relation(from_task_id=t.task_id, to_task_id=error_handler_task_id, is_error=True)
for t in self.tasks
)
self.relations.extend(new_relations)
if not self.downstream_names:
return
ok_handler_task_id = self.name + "_ok"
ok_handler = Task(
task_id=ok_handler_task_id, template_name="dummy.tpl", trigger_rule=TriggerRule.ONE_SUCCESS
)
self.ok_handler_task = ok_handler
self.relations.append(Relation(from_task_id=self.tasks[-1].task_id, to_task_id=ok_handler_task_id))