in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v1/nodes/parameters/ConditionsParameterConverter.java [66:175]
public List<DwNode> convertParameter() throws IOException {
// 本节点的条件依赖
DependentParameters dependencies = taskDefinition.getDependence();
log.info("dependencies: {}", GsonUtils.toJsonString(dependencies));
AtomicInteger outerRelationIndex = new AtomicInteger(0);
List<DwNode> taskDepJoinNodes = ListUtils.emptyIfNull(Optional.ofNullable(dependencies)
.map(DependentParameters::getDependTaskList).orElse(null))
.stream()
.map(dependentTaskModel -> {
DwNode joinNode = newDwNode(processMeta, taskDefinition);
joinNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
joinNode.setName(Joiner.on("_").join(
joinNode.getName(), "join", outerRelationIndex.getAndIncrement()));
ListUtils.emptyIfNull(joinNode.getOutputs()).stream().findFirst().ifPresent(out ->
out.setData(getDefaultNodeOutput(processMeta, joinNode.getName())));
List<ControllerJoinCode.Branch> branchList = ListUtils.emptyIfNull(dependentTaskModel.getDependItemList())
.stream()
.filter(dependentItem -> StringUtils.isNotBlank(dependentItem.getDepTasks())
&& dependentItem.getStatus() != null)
.map(dependentItem -> {
ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
switch (dependentTaskModel.getRelation()) {
case OR:
branch.setLogic(1);
break;
case AND:
branch.setLogic(0);
break;
}
branch.setNode(getDefaultNodeOutput(processMeta, dependentItem.getDepTasks()));
switch (dependentItem.getStatus()) {
case FAILURE:
branch.setRunStatus(Collections.singletonList("0"));
break;
case SUCCESS:
branch.setRunStatus(Collections.singletonList("1"));
break;
}
return branch;
}).collect(Collectors.toList());
ControllerJoinCode joinCode = new ControllerJoinCode();
joinCode.setBranchList(branchList);
switch (dependentTaskModel.getRelation()) {
case AND:
joinCode.setResultStatus("0");
break;
case OR:
joinCode.setResultStatus("1");
break;
}
joinNode.setCode(joinCode.getContent());
return joinNode;
}).collect(Collectors.toList());
DwNode joinSuccessNode = newDwNode(processMeta, taskDefinition);
joinSuccessNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
joinSuccessNode.setName(Joiner.on("_").join(joinSuccessNode.getName(), "join", "success"));
ControllerJoinCode joinSuccessCode = new ControllerJoinCode();
joinSuccessCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
.flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
switch (dependencies.getRelation()) {
case OR:
branch.setLogic(1);
break;
case AND:
branch.setLogic(0);
break;
}
branch.setNode(out);
branch.setRunStatus(Collections.singletonList("1"));
return branch;
}).collect(Collectors.toList()));
joinSuccessCode.setResultStatus("1");
joinSuccessNode.setCode(joinSuccessCode.getContent());
ListUtils.emptyIfNull(joinSuccessNode.getOutputs()).stream().findFirst().ifPresent(out ->
out.setData(getDefaultNodeOutput(processMeta, joinSuccessNode.getName())));
joinSuccessNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
.map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
DwNode joinFailureNode = newDwNode(processMeta, taskDefinition);
joinFailureNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
joinFailureNode.setName(Joiner.on("_").join(joinFailureNode.getName(), "join", "failure"));
ControllerJoinCode joinFailureCode = new ControllerJoinCode();
joinFailureCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
.flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
switch (dependencies.getRelation()) {
case OR:
branch.setLogic(1);
break;
case AND:
branch.setLogic(0);
break;
}
branch.setNode(out);
branch.setRunStatus(Collections.singletonList("1"));
return branch;
}).collect(Collectors.toList()));
joinFailureCode.setResultStatus("0");
joinFailureNode.setCode(joinFailureCode.getContent());
ListUtils.emptyIfNull(joinFailureNode.getOutputs()).stream().findFirst().ifPresent(out ->
out.setData(getDefaultNodeOutput(processMeta, joinFailureNode.getName())));
joinFailureNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
.map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
return Arrays.asList(joinSuccessNode, joinFailureNode);
}