in src/pydolphinscheduler/core/yaml_workflow.py [0:0]
def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
"""Parse various types of tasks.
:param task_data: dict.
{
"task_type": "Shell",
"params": {"name": "shell_task", "command":"ehco hellp"}
}
:param name2task: Dict[str, Task]), mapping of task_name and task
Some task type have special parse func:
if task type is Switch, use parse_switch;
if task type is Condition, use parse_condition;
if task type is Dependent, use parse_dependent;
other, we pass all task_params as input to task class, like "task_cls(**task_params)".
"""
task_type = task_data["task_type"]
# get params without special key
task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
task_cls = get_task_cls(task_type)
# use YamlWorkflow._parse_rules to parse special value of yaml file
task_params = self.parse_params(task_params)
if task_cls == tasks.Switch:
task = self.parse_switch(task_params, name2task)
elif task_cls == tasks.Condition:
task = self.parse_condition(task_params, name2task)
elif task_cls == tasks.Dependent:
task = self.parse_dependent(task_params, name2task)
else:
task = task_cls(**task_params)
logger.info(task_type, task)
return task