def parse_condition()

in src/pydolphinscheduler/core/yaml_workflow.py [0:0]


    def parse_condition(self, task_params, name2task):
        """Parse Condition Task.

        This is an example Yaml fragment of task_params

        name: condition
        success_task: success_branch
        failed_task: fail_branch
        OP: AND
        groups:
          -
            OP: AND
            groups:
              - [pre_task_1, true]
              - [pre_task_2, true]
              - [pre_task_3, false]
          -
            OP: AND
            groups:
              - [pre_task_1, false]
              - [pre_task_2, true]
              - [pre_task_3, true]

        """
        from pydolphinscheduler.tasks.condition import (
            FAILURE,
            SUCCESS,
            And,
            Condition,
            Or,
        )

        def get_op_cls(op):
            cls = None
            if op.lower() == "and":
                cls = And
            elif op.lower() == "or":
                cls = Or
            else:
                raise Exception("OP must be in And or Or, but get: %s" % op)
            return cls

        second_cond_ops = []
        for first_group in task_params["groups"]:
            second_op = first_group["op"]
            task_ops = []
            for condition_data in first_group["groups"]:
                assert "task" in condition_data, "task must be in %s" % condition_data
                assert "flag" in condition_data, "flag must be in %s" % condition_data
                task_name = condition_data["task"]
                flag = condition_data["flag"]
                task = name2task[task_name]

                # for example: task = pre_task_1, flag = true
                if flag:
                    task_ops.append(SUCCESS(task))
                else:
                    task_ops.append(FAILURE(task))

            second_cond_ops.append(get_op_cls(second_op)(*task_ops))

        first_op = task_params["op"]
        cond_operator = get_op_cls(first_op)(*second_cond_ops)

        condition = Condition(
            name=task_params["name"],
            condition=cond_operator,
            success_task=name2task[task_params["success_task"]],
            failed_task=name2task[task_params["failed_task"]],
        )
        return condition