def parse_switch()

in src/pydolphinscheduler/core/yaml_workflow.py [0:0]


    def parse_switch(self, task_params, name2task):
        """Parse Switch Task.

        This is an example Yaml fragment of task_params

        name: switch
        condition:
          - ["${var} > 1", switch_child_1]
          - switch_child_2
        """
        from pydolphinscheduler.tasks.switch import (
            Branch,
            Default,
            Switch,
            SwitchCondition,
        )

        condition_datas = task_params["condition"]
        conditions = []
        for condition_data in condition_datas:
            assert "task" in condition_data, "task must be in %s" % condition_data
            task_name = condition_data["task"]
            condition_string = condition_data.get("condition", None)

            # if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
            if condition_string is None:
                conditions.append(Default(task=name2task.get(task_name)))

            # if condition_string is not None, for example:
            # {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
            else:
                conditions.append(
                    Branch(condition_string, task=name2task.get(task_name))
                )

        switch = Switch(
            name=task_params["name"], condition=SwitchCondition(*conditions)
        )
        return switch