def create_workflow()

in src/pydolphinscheduler/core/yaml_workflow.py [0:0]


    def create_workflow(self):
        """Create workflow main function."""
        # get workflow parameters with key "workflow"
        workflow_params = self[KEY_WORKFLOW]

        # pop "run" parameter, used at the end
        is_run = workflow_params.pop("run", False)

        # use YamlWorkflow._parse_rules to parse special value of yaml file
        workflow_params = self.parse_params(workflow_params)

        workflow_name = workflow_params["name"]
        logger.info(f"Create workflow: {workflow_name}")
        with Workflow(**workflow_params) as workflow:
            # save dependencies between tasks
            dependencies = {}

            # save name and task mapping
            name2task = {}

            # get task datas with key "tasks"
            for task_data in self[KEY_TASK]:
                task = self.parse_task(task_data, name2task)

                deps = task_data.get(KEY_DEPS, [])
                if deps:
                    dependencies[task.name] = deps
                name2task[task.name] = task

            # build dependencies between task
            for downstream_task_name, deps in dependencies.items():
                downstream_task = name2task[downstream_task_name]
                for upstream_task_name in deps:
                    upstream_task = name2task[upstream_task_name]
                    upstream_task >> downstream_task

            workflow.submit()
            # if set is_run, run the workflow after submit
            if is_run:
                logger.info(f"run workflow: {workflow}")
                workflow.run()

        return workflow_name