public List convertParameter()

in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v3/nodes/parameters/ConditionsParameterConverter.java [67:141]


    public List<DwNode> convertParameter() throws IOException {
        log.info("params : {}", taskDefinition.getTaskParams());

        JsonObject param = GsonUtils.fromJsonString(taskDefinition.getTaskParams(), JsonObject.class);
        DependentParameters dependentParameters = null;
        if (param.get("dependence") != null) {
            dependentParameters = GsonUtils.fromJson(param.getAsJsonObject("dependence"), DependentParameters.class);
        }
        if (dependentParameters == null || dependentParameters.getDependTaskList() == null || dependentParameters.getDependTaskList().isEmpty()) {
            log.warn("no dependence param {}", taskDefinition.getTaskParams());
            return Collections.emptyList();
        }
        // 本节点的条件依赖
        List<DependentTaskModel> dependencies = dependentParameters.getDependTaskList();

        final AtomicInteger outerRelationIndex = new AtomicInteger(0);
        List<DwNode> taskDepJoinNodes = ListUtils.emptyIfNull(dependencies)
                .stream()
                .map(dependentTaskModel -> conditionNodeToJoinNode(dependentTaskModel, outerRelationIndex.getAndIncrement()))
                .collect(Collectors.toList());

        final DependentParameters dependency = dependentParameters;
        DwNode joinSuccessNode = newDwNode(taskDefinition);
        joinSuccessNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
        joinSuccessNode.setName(Joiner.on("_").join(joinSuccessNode.getName(), "join", "success"));
        ControllerJoinCode joinSuccessCode = new ControllerJoinCode();
        joinSuccessCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
                .flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
                    ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
                    switch (dependency.getRelation()) {
                        case OR:
                            branch.setLogic(1);
                            break;
                        case AND:
                            branch.setLogic(0);
                            break;
                    }
                    branch.setNode(out);
                    branch.setRunStatus(Collections.singletonList("1"));
                    return branch;
                }).collect(Collectors.toList()));
        joinSuccessCode.setResultStatus("1");
        joinSuccessNode.setCode(joinSuccessCode.getContent());
        ListUtils.emptyIfNull(joinSuccessNode.getOutputs()).stream().findFirst().ifPresent(out ->
                out.setData(getDefaultNodeOutput(processMeta, joinSuccessNode.getName())));
        joinSuccessNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
                .map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));

        DwNode joinFailureNode = newDwNode(taskDefinition);
        joinFailureNode.setType(CodeProgramType.CONTROLLER_JOIN.name());
        joinFailureNode.setName(Joiner.on("_").join(joinFailureNode.getName(), "join", "failure"));
        ControllerJoinCode joinFailureCode = new ControllerJoinCode();
        joinFailureCode.setBranchList(ListUtils.emptyIfNull(taskDepJoinNodes).stream().map(Node::getOutputs)
                .flatMap(List::stream).map(NodeIo::getData).distinct().map(out -> {
                    ControllerJoinCode.Branch branch = new ControllerJoinCode.Branch();
                    switch (dependency.getRelation()) {
                        case OR:
                            branch.setLogic(1);
                            break;
                        case AND:
                            branch.setLogic(0);
                            break;
                    }
                    branch.setNode(out);
                    branch.setRunStatus(Collections.singletonList("1"));
                    return branch;
                }).collect(Collectors.toList()));
        joinFailureCode.setResultStatus("0");
        joinFailureNode.setCode(joinFailureCode.getContent());
        ListUtils.emptyIfNull(joinFailureNode.getOutputs()).stream().findFirst().ifPresent(out ->
                out.setData(getDefaultNodeOutput(processMeta, joinFailureNode.getName())));
        joinFailureNode.setInputs(ListUtils.emptyIfNull(taskDepJoinNodes).stream()
                .map(Node::getOutputs).flatMap(List::stream).collect(Collectors.toList()));
        return Arrays.asList(joinSuccessNode, joinFailureNode);
    }