in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/wm/WorkflowOperator.java [122:196]
private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String workflowName,
String nextTaskId, Map<String, AbstractTask> taskMap)
throws Exception{
AbstractTask currentTask = taskMap.get(nextTaskId);
if (currentTask == null) {
logger.error("Couldn't find a task with id {} in the task map", nextTaskId);
throw new Exception("Couldn't find a task with id " + nextTaskId +" in the task map");
}
BlockingTaskDef blockingTaskDef = currentTask.getClass().getAnnotation(BlockingTaskDef.class);
NonBlockingTaskDef nonBlockingTaskDef = currentTask.getClass().getAnnotation(NonBlockingTaskDef.class);
if (blockingTaskDef != null) {
String taskName = blockingTaskDef.name();
TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
.setTaskId(currentTask.getTaskId())
.setCommand(taskName);
Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
paramMap.forEach(taskBuilder::addConfig);
List<TaskConfig> taskBuilds = new ArrayList<>();
taskBuilds.add(taskBuilder.build());
JobConfig.Builder job = new JobConfig.Builder()
.addTaskConfigs(taskBuilds)
.setFailureThreshold(0)
.setExpiry(WORKFLOW_EXPIRY_TIME)
.setTimeoutPerTask(TASK_EXPIRY_TIME)
.setNumConcurrentTasksPerInstance(20)
.setMaxAttemptsPerTask(currentTask.getRetryCount());
workflowBuilder.addJob(currentTask.getTaskId(), job);
List<OutPort> outPorts = getOutPortsOfTask(currentTask);
for (OutPort outPort : outPorts) {
if (outPort != null) {
workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId());
logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId());
buildWorkflowRecursively(workflowBuilder, workflowName, outPort.getNextTaskId(), taskMap);
}
}
} else if (nonBlockingTaskDef != null) {
NonBlockingTask nbTask = (NonBlockingTask) currentTask;
String taskName = nonBlockingTaskDef.name();
TaskConfig.Builder taskBuilder = new TaskConfig.Builder()
.setTaskId(currentTask.getTaskId())
.setCommand(taskName);
Map<String, String> paramMap = TaskUtil.serializeTaskData(currentTask);
paramMap.forEach(taskBuilder::addConfig);
List<TaskConfig> taskBuilds = new ArrayList<>();
taskBuilds.add(taskBuilder.build());
JobConfig.Builder job = new JobConfig.Builder()
.addTaskConfigs(taskBuilds)
.setFailureThreshold(0)
.setExpiry(WORKFLOW_EXPIRY_TIME)
.setTimeoutPerTask(TASK_EXPIRY_TIME)
.setNumConcurrentTasksPerInstance(20)
.setMaxAttemptsPerTask(currentTask.getRetryCount());
workflowBuilder.addJob(currentTask.getTaskId(), job);
continueNonBlockingRest(taskMap, workflowName, nextTaskId, nbTask.getCurrentSection());
} else {
logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName());
throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName());
}
}