in src/pydolphinscheduler/core/yaml_workflow.py [0:0]
def parse_dependent(self, task_params, name2task):
"""Parse Dependent Task.
This is an example Yaml fragment of task_params
name: dependent
denpendence:
OP: AND
groups:
-
OP: Or
groups:
- [pydolphin, task_dependent_external, task_1]
- [pydolphin, task_dependent_external, task_2]
-
OP: And
groups:
- [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
- [pydolphin, task_dependent_external, task_2, last24Hours]
"""
from pydolphinscheduler.tasks.dependent import (
And,
Dependent,
DependentDate,
DependentItem,
Or,
)
def workflow_dependent_date(dependent_date):
"""Parse dependent date (Compatible with key and value of DependentDate)."""
dependent_date_upper = dependent_date.upper()
if hasattr(DependentDate, dependent_date_upper):
dependent_date = getattr(DependentDate, dependent_date_upper)
return dependent_date
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception(f"OP must be in And or Or, but get: {op}")
return cls
def create_dependent_item(source_items):
"""Parse dependent item.
project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
"""
project_name = source_items["project_name"]
workflow_name = source_items["workflow_name"]
dependent_task_name = source_items["dependent_task_name"]
dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
dependent_item = DependentItem(
project_name=project_name,
workflow_name=workflow_name,
dependent_task_name=dependent_task_name,
dependent_date=workflow_dependent_date(dependent_date),
)
return dependent_item
second_dependences = []
for first_group in task_params["groups"]:
second_op = first_group[KEY_OP]
dependence_items = []
for source_items in first_group["groups"]:
dependence_items.append(create_dependent_item(source_items))
second_dependences.append(get_op_cls(second_op)(*dependence_items))
first_op = task_params[KEY_OP]
dependence = get_op_cls(first_op)(*second_dependences)
task = Dependent(
name=task_params["name"],
dependence=dependence,
)
return task