in src/pydolphinscheduler/core/yaml_workflow.py [0:0]
def parse_condition(self, task_params, name2task):
"""Parse Condition Task.
This is an example Yaml fragment of task_params
name: condition
success_task: success_branch
failed_task: fail_branch
OP: AND
groups:
-
OP: AND
groups:
- [pre_task_1, true]
- [pre_task_2, true]
- [pre_task_3, false]
-
OP: AND
groups:
- [pre_task_1, false]
- [pre_task_2, true]
- [pre_task_3, true]
"""
from pydolphinscheduler.tasks.condition import (
FAILURE,
SUCCESS,
And,
Condition,
Or,
)
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception(f"OP must be in And or Or, but get: {op}")
return cls
second_cond_ops = []
for first_group in task_params["groups"]:
second_op = first_group["op"]
task_ops = []
for condition_data in first_group["groups"]:
assert "task" in condition_data, f"task must be in {condition_data}"
assert "flag" in condition_data, f"flag must be in {condition_data}"
task_name = condition_data["task"]
flag = condition_data["flag"]
task = name2task[task_name]
# for example: task = pre_task_1, flag = true
if flag:
task_ops.append(SUCCESS(task))
else:
task_ops.append(FAILURE(task))
second_cond_ops.append(get_op_cls(second_op)(*task_ops))
first_op = task_params["op"]
cond_operator = get_op_cls(first_op)(*second_cond_ops)
condition = Condition(
name=task_params["name"],
condition=cond_operator,
success_task=name2task[task_params["success_task"]],
failed_task=name2task[task_params["failed_task"]],
)
return condition