def parse_dependent()

in src/pydolphinscheduler/core/yaml_workflow.py [0:0]


    def parse_dependent(self, task_params, name2task):
        """Parse Dependent Task.

        This is an example Yaml fragment of task_params

        name: dependent
        denpendence:
        OP: AND
        groups:
          -
            OP: Or
            groups:
              - [pydolphin, task_dependent_external, task_1]
              - [pydolphin, task_dependent_external, task_2]
          -
            OP: And
            groups:
              - [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
              - [pydolphin, task_dependent_external, task_2, last24Hours]

        """
        from pydolphinscheduler.tasks.dependent import (
            And,
            Dependent,
            DependentDate,
            DependentItem,
            Or,
        )

        def workflow_dependent_date(dependent_date):
            """Parse dependent date (Compatible with key and value of DependentDate)."""
            dependent_date_upper = dependent_date.upper()
            if hasattr(DependentDate, dependent_date_upper):
                dependent_date = getattr(DependentDate, dependent_date_upper)
            return dependent_date

        def get_op_cls(op):
            cls = None
            if op.lower() == "and":
                cls = And
            elif op.lower() == "or":
                cls = Or
            else:
                raise Exception(f"OP must be in And or Or, but get: {op}")
            return cls

        def create_dependent_item(source_items):
            """Parse dependent item.

            project_name: pydolphin
            workflow_name: task_dependent_external
            dependent_task_name: task_1
            dependent_date: LAST_WEDNESDAY
            """
            project_name = source_items["project_name"]
            workflow_name = source_items["workflow_name"]
            dependent_task_name = source_items["dependent_task_name"]
            dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
            dependent_item = DependentItem(
                project_name=project_name,
                workflow_name=workflow_name,
                dependent_task_name=dependent_task_name,
                dependent_date=workflow_dependent_date(dependent_date),
            )

            return dependent_item

        second_dependences = []
        for first_group in task_params["groups"]:
            second_op = first_group[KEY_OP]
            dependence_items = []
            for source_items in first_group["groups"]:
                dependence_items.append(create_dependent_item(source_items))

            second_dependences.append(get_op_cls(second_op)(*dependence_items))

        first_op = task_params[KEY_OP]
        dependence = get_op_cls(first_op)(*second_dependences)

        task = Dependent(
            name=task_params["name"],
            dependence=dependence,
        )
        return task