in src/pydolphinscheduler/core/task.py [0:0]
def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True) -> None:
"""Set parameter tasks dependent to current task.
it is a wrapper for :func:`set_upstream` and :func:`set_downstream`.
"""
if not isinstance(tasks, Sequence):
tasks = [tasks]
for task in tasks:
if upstream:
self._upstream_task_codes.add(task.code)
task._downstream_task_codes.add(self.code)
if self._workflow:
task_relation = TaskRelation(
pre_task_code=task.code,
post_task_code=self.code,
name=f"{task.name} {Delimiter.DIRECTION} {self.name}",
)
self.workflow._task_relations.add(task_relation)
else:
self._downstream_task_codes.add(task.code)
task._upstream_task_codes.add(self.code)
if self._workflow:
task_relation = TaskRelation(
pre_task_code=self.code,
post_task_code=task.code,
name=f"{self.name} {Delimiter.DIRECTION} {task.name}",
)
self.workflow._task_relations.add(task_relation)