protected DwNode newDwNode()

in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v1/nodes/parameters/AbstractParameterConverter.java [169:263]


    protected DwNode newDwNode(ProcessMeta processMeta, TaskNode taskDefinition) {
        DwNode dwNode = new DwNode();
        dwNode.setWorkflowRef(dwWorkflow);
        dwWorkflow.getNodes().add(dwNode);
        dwNode.setName(com.aliyun.dataworks.migrationx.domain.dataworks.utils.StringUtils.toValidName(taskDefinition.getName()));
        dwNode.setDescription(taskDefinition.getDesc());
        dwNode.setRawNodeType(Optional.ofNullable(ReflectUtils.getFieldValue(parameter, "type"))
            .filter(type -> type instanceof DbType)
            .map(type -> Joiner.on(".").join(taskDefinition.getType(), type))
            .orElse(taskDefinition.getType().name()));
        dwNode.setDependentType(0);
        dwNode.setCycleType(0);
        dwNode.setNodeUseType(NodeUseType.SCHEDULED);
        if (taskDefinition.getMaxRetryTimes() > 0) {
            dwNode.setRerunMode(RerunMode.ALL_ALLOWED);
        } else {
            dwNode.setRerunMode(RerunMode.FAILURE_ALLOWED);
        }
        dwNode.setTaskRerunTime(taskDefinition.getMaxRetryTimes());
        dwNode.setTaskRerunInterval(taskDefinition.getRetryInterval() * 1000 * 60);

        // outputs
        DwNodeIo output = new DwNodeIo();
        output.setData(getDefaultNodeOutput(processMeta, taskDefinition.getName()));

        output.setParseType(1);
        output.setNodeRef(dwNode);
        dwNode.setOutputs(new ArrayList<>(Collections.singletonList(output)));
        dwNode.setParameter(getParameter());

        // inputs
        List<TaskNodeConnect> connects = processMeta.getProcessDefinitionConnects();

        dwNode.setInputs(ListUtils.emptyIfNull(connects).stream()
            .filter(connect -> StringUtils.equals(connect.getEndPointTargetId(), taskDefinition.getId()))
            .map(connect -> {
                List<TaskNode> upstreamTask = Optional.ofNullable(processData)
                    .map(ProcessData::getTasks)
                    .map(tasks -> ListUtils.emptyIfNull(tasks).stream()
                        .filter(task -> StringUtils.equals(task.getId(), connect.getEndPointSourceId()))
                        .collect(Collectors.toList()))
                    .orElse(null);

                return ListUtils.emptyIfNull(upstreamTask).stream().map(upTask -> {
                    DwNodeIo input = new DwNodeIo();
                    input.setParseType(1);
                    input.setNodeRef(dwNode);
                    input.setData(Joiner.on(".").join(
                        converterContext.getProject().getName(),
                        processMeta.getProjectName(),
                        processMeta.getProcessDefinitionName(),
                        upTask.getName()));
                    return input;
                }).collect(Collectors.toList());
            }).flatMap(List::stream).collect(Collectors.toList()));

        // 本节点的条件结果执行
        List<String> preTasks = GsonUtils.fromJsonString(taskDefinition.getPreTasks(),
            new TypeToken<List<String>>() {}.getType());
        ListUtils.emptyIfNull(processMeta.getProcessDefinitionJson().getTasks()).stream()
            .filter(taskNode -> ListUtils.emptyIfNull(preTasks).stream()
                .anyMatch(preTask -> StringUtils.equals(preTask, taskNode.getName())))
            .filter(preTaskNode -> TaskType.CONDITIONS.equals(preTaskNode.getType()))
            .forEach(preConditionTaskNode -> {
                /**
                 * 如果依赖Conditions节点,则增加依赖 节点名_join_success和节点名_join_failure个输入
                 * @see ConditionsParameterConverter
                 */
                ConditionsParameters conditionResult = preConditionTaskNode.getConditionResult();
                log.info("condition result: {}", GsonUtils.toJsonString(conditionResult));
                Optional.ofNullable(conditionResult).map(ConditionsParameters::getSuccessNode)
                    .filter(successNode -> ListUtils.emptyIfNull(successNode).stream()
                        .anyMatch(n -> StringUtils.equals(n, taskDefinition.getName())))
                    .ifPresent(successNode -> {
                        String successInput = getDefaultNodeOutput(processMeta, Joiner.on("_").join(
                            preConditionTaskNode.getName(), "join", "success"));
                        ListUtils.emptyIfNull(dwNode.getInputs()).stream()
                            .filter(in -> StringUtils.equals(
                                getDefaultNodeOutput(processMeta, preConditionTaskNode.getName()), in.getData()))
                            .findFirst().ifPresent(in -> in.setData(successInput));
                    });
                Optional.ofNullable(conditionResult).map(ConditionsParameters::getFailedNode)
                    .filter(failureNode -> ListUtils.emptyIfNull(failureNode).stream()
                        .anyMatch(n -> StringUtils.equals(n, taskDefinition.getName())))
                    .ifPresent(failureNode -> {
                        String failureInput = getDefaultNodeOutput(processMeta, Joiner.on("_").join(
                            preConditionTaskNode.getName(), "join", "failure"));
                        ListUtils.emptyIfNull(dwNode.getInputs()).stream()
                            .filter(in -> StringUtils.equals(
                                getDefaultNodeOutput(processMeta, preConditionTaskNode.getName()), in.getData()))
                            .findFirst().ifPresent(in -> in.setData(failureInput));
                    });
            });
        return dwNode;
    }