in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v2/nodes/parameters/ConditionsParameterConverter.java [67:142]
public List<DwNode> convertParameter() throws IOException {
log.info("params : {}", taskDefinition.getTaskParams());
JsonObject param = GsonUtils.fromJsonString(taskDefinition.getTaskParams(), JsonObject.class);
DependentParameters dependentParameters = null;
if (param.get("dependence") != null) {
dependentParameters = GsonUtils.fromJson(param.getAsJsonObject("dependence"), DependentParameters.class);
}
if (dependentParameters == null || dependentParameters.getDependTaskList() == null || dependentParameters.getDependTaskList().isEmpty()) {
log.warn("no dependence param {}", taskDefinition.getTaskParams());
return Collections.emptyList();
}
// 本节点的条件依赖
List<DependentTaskModel> dependencies = dependentParameters.getDependTaskList();
final AtomicInteger outerRelationIndex = new AtomicInteger(0);
List<DwNode> taskDepJoinNodes = ListUtils.emptyIfNull(dependencies)
.stream()
.map(dependentTaskModel -> conditionNodeToJoinNode(dependentTaskModel, outerRelationIndex.getAndIncrement()))
.collect(Collectors.toList());
final DependentParameters dependency = dependentParameters;
DwNode joinSuccessNode = newDwNode(taskDefinition);
joinSuccessNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
joinSuccessNode.setName(Joiner.on("_").join(joinSuccessNode.getName(), "join", "success"));
ControllerJoinCode joinSuccessCode = new ControllerJoinCode();
joinSuccessCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
.flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
switch (dependency.getRelation()) {
case OR:
branch.setLogic(1);
break;
case AND:
branch.setLogic(0);
break;
}
branch.setNode(out);
branch.setRunStatus(Collections.singletonList("1"));
return branch;
}).collect(Collectors.toList()));
joinSuccessCode.setResultStatus("1");
joinSuccessNode.setCode(joinSuccessCode.getContent());
ListUtils.emptyIfNull(joinSuccessNode.getOutputs()).stream().findFirst().ifPresent(out ->
out.setData(getDefaultNodeOutput(processMeta, joinSuccessNode.getName())));
joinSuccessNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
.map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
DwNode joinFailureNode = newDwNode(taskDefinition);
joinFailureNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
joinFailureNode.setName(Joiner.on("_").join(joinFailureNode.getName(), "join", "failure"));
ControllerJoinCode joinFailureCode = new ControllerJoinCode();
joinFailureCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
.flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
switch (dependency.getRelation()) {
case OR:
branch.setLogic(1);
break;
case AND:
branch.setLogic(0);
break;
}
branch.setNode(out);
branch.setRunStatus(Collections.singletonList("1"));
return branch;
}).collect(Collectors.toList()));
joinFailureCode.setResultStatus("0");
joinFailureNode.setCode(joinFailureCode.getContent());
ListUtils.emptyIfNull(joinFailureNode.getOutputs()).stream().findFirst().ifPresent(out ->
out.setData(getDefaultNodeOutput(processMeta, joinFailureNode.getName())));
joinFailureNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
.map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
return Arrays.asList(joinSuccessNode, joinFailureNode);
}