in src/pydolphinscheduler/core/yaml_workflow.py [0:0]
def parse_switch(self, task_params, name2task):
"""Parse Switch Task.
This is an example Yaml fragment of task_params
name: switch
condition:
- ["${var} > 1", switch_child_1]
- switch_child_2
"""
from pydolphinscheduler.tasks.switch import (
Branch,
Default,
Switch,
SwitchCondition,
)
condition_datas = task_params["condition"]
conditions = []
for condition_data in condition_datas:
assert "task" in condition_data, f"task must be in {condition_data}"
task_name = condition_data["task"]
condition_string = condition_data.get("condition", None)
# if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
if condition_string is None:
conditions.append(Default(task=name2task.get(task_name)))
# if condition_string is not None, for example:
# {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
else:
conditions.append(
Branch(condition_string, task=name2task.get(task_name))
)
switch = Switch(
name=task_params["name"], condition=SwitchCondition(*conditions)
)
return switch