in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v1/nodes/parameters/AbstractParameterConverter.java [169:263]
protected DwNode newDwNode(ProcessMeta processMeta, TaskNode taskDefinition) {
DwNode dwNode = new DwNode();
dwNode.setWorkflowRef(dwWorkflow);
dwWorkflow.getNodes().add(dwNode);
dwNode.setName(com.aliyun.dataworks.migrationx.domain.dataworks.utils.StringUtils.toValidName(taskDefinition.getName()));
dwNode.setDescription(taskDefinition.getDesc());
dwNode.setRawNodeType(Optional.ofNullable(ReflectUtils.getFieldValue(parameter, "type"))
.filter(type -> type instanceof DbType)
.map(type -> Joiner.on(".").join(taskDefinition.getType(), type))
.orElse(taskDefinition.getType().name()));
dwNode.setDependentType(0);
dwNode.setCycleType(0);
dwNode.setNodeUseType(NodeUseType.SCHEDULED);
if (taskDefinition.getMaxRetryTimes() > 0) {
dwNode.setRerunMode(RerunMode.ALL_ALLOWED);
} else {
dwNode.setRerunMode(RerunMode.FAILURE_ALLOWED);
}
dwNode.setTaskRerunTime(taskDefinition.getMaxRetryTimes());
dwNode.setTaskRerunInterval(taskDefinition.getRetryInterval() * 1000 * 60);
// outputs
DwNodeIo output = new DwNodeIo();
output.setData(getDefaultNodeOutput(processMeta, taskDefinition.getName()));
output.setParseType(1);
output.setNodeRef(dwNode);
dwNode.setOutputs(new ArrayList<>(Collections.singletonList(output)));
dwNode.setParameter(getParameter());
// inputs
List<TaskNodeConnect> connects = processMeta.getProcessDefinitionConnects();
dwNode.setInputs(ListUtils.emptyIfNull(connects).stream()
.filter(connect -> StringUtils.equals(connect.getEndPointTargetId(), taskDefinition.getId()))
.map(connect -> {
List<TaskNode> upstreamTask = Optional.ofNullable(processData)
.map(ProcessData::getTasks)
.map(tasks -> ListUtils.emptyIfNull(tasks).stream()
.filter(task -> StringUtils.equals(task.getId(), connect.getEndPointSourceId()))
.collect(Collectors.toList()))
.orElse(null);
return ListUtils.emptyIfNull(upstreamTask).stream().map(upTask -> {
DwNodeIo input = new DwNodeIo();
input.setParseType(1);
input.setNodeRef(dwNode);
input.setData(Joiner.on(".").join(
converterContext.getProject().getName(),
processMeta.getProjectName(),
processMeta.getProcessDefinitionName(),
upTask.getName()));
return input;
}).collect(Collectors.toList());
}).flatMap(List::stream).collect(Collectors.toList()));
// 本节点的条件结果执行
List<String> preTasks = GsonUtils.fromJsonString(taskDefinition.getPreTasks(),
new TypeToken<List<String>>() {}.getType());
ListUtils.emptyIfNull(processMeta.getProcessDefinitionJson().getTasks()).stream()
.filter(taskNode -> ListUtils.emptyIfNull(preTasks).stream()
.anyMatch(preTask -> StringUtils.equals(preTask, taskNode.getName())))
.filter(preTaskNode -> TaskType.CONDITIONS.equals(preTaskNode.getType()))
.forEach(preConditionTaskNode -> {
/**
* 如果依赖Conditions节点,则增加依赖 节点名_join_success和节点名_join_failure个输入
* @see ConditionsParameterConverter
*/
ConditionsParameters conditionResult = preConditionTaskNode.getConditionResult();
log.info("condition result: {}", GsonUtils.toJsonString(conditionResult));
Optional.ofNullable(conditionResult).map(ConditionsParameters::getSuccessNode)
.filter(successNode -> ListUtils.emptyIfNull(successNode).stream()
.anyMatch(n -> StringUtils.equals(n, taskDefinition.getName())))
.ifPresent(successNode -> {
String successInput = getDefaultNodeOutput(processMeta, Joiner.on("_").join(
preConditionTaskNode.getName(), "join", "success"));
ListUtils.emptyIfNull(dwNode.getInputs()).stream()
.filter(in -> StringUtils.equals(
getDefaultNodeOutput(processMeta, preConditionTaskNode.getName()), in.getData()))
.findFirst().ifPresent(in -> in.setData(successInput));
});
Optional.ofNullable(conditionResult).map(ConditionsParameters::getFailedNode)
.filter(failureNode -> ListUtils.emptyIfNull(failureNode).stream()
.anyMatch(n -> StringUtils.equals(n, taskDefinition.getName())))
.ifPresent(failureNode -> {
String failureInput = getDefaultNodeOutput(processMeta, Joiner.on("_").join(
preConditionTaskNode.getName(), "join", "failure"));
ListUtils.emptyIfNull(dwNode.getInputs()).stream()
.filter(in -> StringUtils.equals(
getDefaultNodeOutput(processMeta, preConditionTaskNode.getName()), in.getData()))
.findFirst().ifPresent(in -> in.setData(failureInput));
});
});
return dwNode;
}