public List convertParameter()

in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v1/nodes/parameters/ConditionsParameterConverter.java [66:175]


    public List<DwNode> convertParameter() throws IOException {
        // 本节点的条件依赖
        DependentParameters dependencies = taskDefinition.getDependence();
        log.info("dependencies: {}", GsonUtils.toJsonString(dependencies));

        AtomicInteger outerRelationIndex = new AtomicInteger(0);
        List<DwNode> taskDepJoinNodes = ListUtils.emptyIfNull(Optional.ofNullable(dependencies)
                        .map(DependentParameters::getDependTaskList).orElse(null))
                .stream()
                .map(dependentTaskModel -> {
                    DwNode joinNode = newDwNode(processMeta, taskDefinition);
                    joinNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
                    joinNode.setName(Joiner.on("_").join(
                            joinNode.getName(), "join", outerRelationIndex.getAndIncrement()));
                    ListUtils.emptyIfNull(joinNode.getOutputs()).stream().findFirst().ifPresent(out ->
                            out.setData(getDefaultNodeOutput(processMeta, joinNode.getName())));

                    List<ControllerJoinCode.Branch> branchList = ListUtils.emptyIfNull(dependentTaskModel.getDependItemList())
                            .stream()
                            .filter(dependentItem -> StringUtils.isNotBlank(dependentItem.getDepTasks())
                                    && dependentItem.getStatus() != null)
                            .map(dependentItem -> {
                                ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
                                switch (dependentTaskModel.getRelation()) {
                                    case OR:
                                        branch.setLogic(1);
                                        break;
                                    case AND:
                                        branch.setLogic(0);
                                        break;
                                }
                                branch.setNode(getDefaultNodeOutput(processMeta, dependentItem.getDepTasks()));
                                switch (dependentItem.getStatus()) {
                                    case FAILURE:
                                        branch.setRunStatus(Collections.singletonList("0"));
                                        break;
                                    case SUCCESS:
                                        branch.setRunStatus(Collections.singletonList("1"));
                                        break;
                                }
                                return branch;
                            }).collect(Collectors.toList());

                    ControllerJoinCode joinCode = new ControllerJoinCode();
                    joinCode.setBranchList(branchList);
                    switch (dependentTaskModel.getRelation()) {
                        case AND:
                            joinCode.setResultStatus("0");
                            break;
                        case OR:
                            joinCode.setResultStatus("1");
                            break;
                    }
                    joinNode.setCode(joinCode.getContent());
                    return joinNode;
                }).collect(Collectors.toList());

        DwNode joinSuccessNode = newDwNode(processMeta, taskDefinition);
        joinSuccessNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
        joinSuccessNode.setName(Joiner.on("_").join(joinSuccessNode.getName(), "join", "success"));
        ControllerJoinCode joinSuccessCode = new ControllerJoinCode();
        joinSuccessCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
                .flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
                    ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
                    switch (dependencies.getRelation()) {
                        case OR:
                            branch.setLogic(1);
                            break;
                        case AND:
                            branch.setLogic(0);
                            break;
                    }
                    branch.setNode(out);
                    branch.setRunStatus(Collections.singletonList("1"));
                    return branch;
                }).collect(Collectors.toList()));
        joinSuccessCode.setResultStatus("1");
        joinSuccessNode.setCode(joinSuccessCode.getContent());
        ListUtils.emptyIfNull(joinSuccessNode.getOutputs()).stream().findFirst().ifPresent(out ->
                out.setData(getDefaultNodeOutput(processMeta, joinSuccessNode.getName())));
        joinSuccessNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
                .map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));

        DwNode joinFailureNode = newDwNode(processMeta, taskDefinition);
        joinFailureNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
        joinFailureNode.setName(Joiner.on("_").join(joinFailureNode.getName(), "join", "failure"));
        ControllerJoinCode joinFailureCode = new ControllerJoinCode();
        joinFailureCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
                .flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
                    ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
                    switch (dependencies.getRelation()) {
                        case OR:
                            branch.setLogic(1);
                            break;
                        case AND:
                            branch.setLogic(0);
                            break;
                    }
                    branch.setNode(out);
                    branch.setRunStatus(Collections.singletonList("1"));
                    return branch;
                }).collect(Collectors.toList()));
        joinFailureCode.setResultStatus("0");
        joinFailureNode.setCode(joinFailureCode.getContent());
        ListUtils.emptyIfNull(joinFailureNode.getOutputs()).stream().findFirst().ifPresent(out ->
                out.setData(getDefaultNodeOutput(processMeta, joinFailureNode.getName())));
        joinFailureNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
                .map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
        return Arrays.asList(joinSuccessNode, joinFailureNode);
    }