private void buildWorkflowRecursively()

in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java [122:196]


    private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String workflowName,
                                          String nextTaskId, Map<String, AbstractTask> taskMap)
            throws Exception{
        AbstractTask currentTask = taskMap.get(nextTaskId);

        if (currentTask == null) {
            logger.error("Couldn't find a task with id {} in the task map", nextTaskId);
            throw new Exception("Couldn't find a task with id " + nextTaskId +" in the task map");
        }

        BlockingTaskDef blockingTaskDef = currentTask.getClass().getAnnotation(BlockingTaskDef.class);
        NonBlockingTaskDef nonBlockingTaskDef = currentTask.getClass().getAnnotation(NonBlockingTaskDef.class);

        if (blockingTaskDef != null) {
            String taskName = blockingTaskDef.name();
            TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
                    .setTaskId(currentTask.getTaskId())
                    .setCommand(taskName);

            Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
            paramMap.forEach(taskBuilder::addConfig);

            List<TaskConfig> taskBuilds = new ArrayList<>();
            taskBuilds.add(taskBuilder.build());

            JobConfig.Builder job = new JobConfig.Builder()
                    .addTaskConfigs(taskBuilds)
                    .setFailureThreshold(0)
                    .setExpiry(WORKFLOW_EXPIRY_TIME)
                    .setTimeoutPerTask(TASK_EXPIRY_TIME)
                    .setNumConcurrentTasksPerInstance(20)
                    .setMaxAttemptsPerTask(currentTask.getRetryCount());

            workflowBuilder.addJob(currentTask.getTaskId(), job);

            List<OutPort> outPorts = getOutPortsOfTask(currentTask);

            for (OutPort outPort : outPorts) {
                if (outPort != null) {
                    workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
                    logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
                    buildWorkflowRecursively(workflowBuilder, workflowName, outPort.getNextTaskId(), taskMap);
                }
            }
        } else if (nonBlockingTaskDef != null) {

            NonBlockingTask nbTask = (NonBlockingTask) currentTask;

            String taskName = nonBlockingTaskDef.name();
            TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
                    .setTaskId(currentTask.getTaskId())
                    .setCommand(taskName);

            Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
            paramMap.forEach(taskBuilder::addConfig);

            List<TaskConfig> taskBuilds = new ArrayList<>();
            taskBuilds.add(taskBuilder.build());

            JobConfig.Builder job = new JobConfig.Builder()
                    .addTaskConfigs(taskBuilds)
                    .setFailureThreshold(0)
                    .setExpiry(WORKFLOW_EXPIRY_TIME)
                    .setTimeoutPerTask(TASK_EXPIRY_TIME)
                    .setNumConcurrentTasksPerInstance(20)
                    .setMaxAttemptsPerTask(currentTask.getRetryCount());

            workflowBuilder.addJob(currentTask.getTaskId(), job);

            continueNonBlockingRest(taskMap, workflowName, nextTaskId, nbTask.getCurrentSection());
        } else {
            logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
            throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
        }
    }