in src/pydolphinscheduler/core/yaml_workflow.py [0:0]
def create_workflow(self):
"""Create workflow main function."""
# get workflow parameters with key "workflow"
workflow_params = self[KEY_WORKFLOW]
# pop "run" parameter, used at the end
is_run = workflow_params.pop("run", False)
# use YamlWorkflow._parse_rules to parse special value of yaml file
workflow_params = self.parse_params(workflow_params)
workflow_name = workflow_params["name"]
logger.info(f"Create workflow: {workflow_name}")
with Workflow(**workflow_params) as workflow:
# save dependencies between tasks
dependencies = {}
# save name and task mapping
name2task = {}
# get task datas with key "tasks"
for task_data in self[KEY_TASK]:
task = self.parse_task(task_data, name2task)
deps = task_data.get(KEY_DEPS, [])
if deps:
dependencies[task.name] = deps
name2task[task.name] = task
# build dependencies between task
for downstream_task_name, deps in dependencies.items():
downstream_task = name2task[downstream_task_name]
for upstream_task_name in deps:
upstream_task = name2task[upstream_task_name]
upstream_task >> downstream_task
workflow.submit()
# if set is_run, run the workflow after submit
if is_run:
logger.info(f"run workflow: {workflow}")
workflow.run()
return workflow_name