private SpecNode buildSpecNodeByPaiflowSpec()

in spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/codemodel/PaiflowYamlCode.java [227:328]


    private SpecNode buildSpecNodeByPaiflowSpec(PaiflowSpec node, Map<String, PaiflowParameter> parameterMap, Map<String, SpecNode> nodeMap,
        List<SpecFlowDepend> flow, List<PaiflowSpec> pipelinePaiflowSpecs) {

        PaiflowSpec nodeSpec = node.getSpec();
        if (null == nodeSpec) {
            log.error("invalid node spec, without spec:{}", node);
            return null;
        }

        Map<String, Object> nodeMetadata = node.getMetadata();
        SpecNode nodeObj = new SpecNode();
        SpecScript script = new SpecScript();
        PaiflowArguments arguments = nodeSpec.getArguments();
        SpecScriptRuntime specScriptRuntime = new SpecScriptRuntime();

        nodeObj.setMetadata(nodeMetadata);
        nodeObj.setScript(script);

        // 设置节点的名称,DataWorks的名称不支持-,需要转为下划线
        nodeObj.setName(getNodeName(String.valueOf(nodeMetadata.get("name"))));
        nodeObj.setId(UuidUtils.genUuidWithoutHorizontalLine());
        nodeMap.put(nodeObj.getName(), nodeObj);

        // 从paiflow中获取参数,填充到DataWorks Spec中的paiflowConf中
        Map<String, Object> paiflowConf = Maps.newHashMap();

        specScriptRuntime.setPaiflowConf(paiflowConf);
        script.setRuntime(specScriptRuntime);

        // 从paiflow中获取artifacts,填充到DataWorks Spec中的上下文输入参数,并根据上下文输入参数,建立节点间的依赖
        SpecFlowDepend specFlowDepend = new SpecFlowDepend();
        specFlowDepend.setNodeId(nodeObj);

        Optional<PaiflowSpec> paiflowManifestSpec = ListUtils.emptyIfNull(pipelinePaiflowSpecs).stream()
            .filter(x -> Objects.equals(x.getMetadata().get("identifier"), nodeMetadata.get("identifier"))
                && Objects.equals(x.getMetadata().get("provider"), nodeMetadata.get("provider"))
                && Objects.equals(x.getMetadata().get("version"), nodeMetadata.get("version"))
            ).findFirst()
            .map(PaiflowSpec::getSpec);

        List<PaiflowParameter> parametersWithValue = Optional.ofNullable(arguments).map(PaiflowArguments::getParameters)
            .orElse(Collections.emptyList()).stream()
            .map(x -> Optional.ofNullable(parameterMap.get(x.getFromParameterName())).map(parameterWithValue -> {
                PaiflowParameter paiflowParameter = new PaiflowParameter();
                paiflowParameter.setName(x.getName());
                paiflowParameter.setValue(parameterWithValue.getValue());
                return paiflowParameter;
            }).orElse(null))
            .filter(Objects::nonNull)
            .collect(Collectors.toList());

        List<PaiflowParameter> parametersWithDefaultValue = paiflowManifestSpec
            .map(PaiflowSpec::getInputs)
            .map(PaiflowArguments::getParameters)
            .orElse(Collections.emptyList()).stream()
            .filter(x -> parametersWithValue.stream().noneMatch(y -> StringUtils.equals(x.getName(), y.getName())))
            .map(x -> {
                PaiflowParameter paiflowParameter = new PaiflowParameter();
                paiflowParameter.setName(x.getName());
                paiflowParameter.setValue(x.getValue());
                if ("Map".equalsIgnoreCase(x.getType())) {
                    paiflowParameter.setValue(Maps.newHashMap());
                }
                return paiflowParameter;
            }).collect(Collectors.toList());

        paiflowConf.put(PAIFLOW_CONF_KEY_ARGUMENT_PARAMETER, parametersWithValue.isEmpty() ? parametersWithDefaultValue : parametersWithValue );

        // 从paiflow的静态manifest中获取input artifact,填充到DataWorks Spec中的script parameters
        List<SpecVariable> variableParameters = paiflowManifestSpec
            .map(PaiflowSpec::getInputs)
            .map(PaiflowArguments::getArtifacts)
            .orElse(Collections.emptyList()).stream()
            .map(x -> buildPaiOutputSpecVariable(x.getName(), null)).collect(Collectors.toList());
        Map<String, SpecVariable> variableParameterMap = variableParameters.stream().collect(Collectors.toMap(SpecVariable::getName, x -> x));

        // 节点的依赖需要根据名称去重,因此定义为map
        LinkedHashMap<String, SpecDepend> nodeDepends = new LinkedHashMap<>();
        // 从paiflow的运行参数重获取arguments,将arguments中的from参数替换为DataWorks Spec中的上下文输入参数
        List<SpecVariable> inputVariableList = Optional.ofNullable(arguments).map(PaiflowArguments::getArtifacts)
            .orElse(Collections.emptyList()).stream()
            .filter(x -> variableParameterMap.containsKey(x.getName()))
            .map(x -> buildSpecVariableByPaiflowArtifact(x, variableParameterMap.get(x.getName()), nodeDepends))
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
        nodeObj.setInputs(new ArrayList<>(inputVariableList));

        // 从paiflow的静态manifest中获取output artifacts,填充到DataWorks Spec中的上下文输出参数
        List<SpecVariable> outputContextList = paiflowManifestSpec
            .map(PaiflowSpec::getOutputs)
            .map(PaiflowArguments::getArtifacts)
            .orElse(Collections.emptyList()).stream()
            .map(x -> buildPaiOutputSpecVariable(x.getName(), null)).collect(Collectors.toList());
        nodeObj.setOutputs(new ArrayList<>(outputContextList));

        script.setParameters(variableParameters);
        if (MapUtils.isNotEmpty(nodeDepends)) {
            specFlowDepend.setDepends(new ArrayList<>(nodeDepends.values()));
            flow.add(specFlowDepend);
        }
        return nodeObj;
    }