in spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/codemodel/PaiflowYamlCode.java [227:328]
private SpecNode buildSpecNodeByPaiflowSpec(PaiflowSpec node, Map<String, PaiflowParameter> parameterMap, Map<String, SpecNode> nodeMap,
List<SpecFlowDepend> flow, List<PaiflowSpec> pipelinePaiflowSpecs) {
PaiflowSpec nodeSpec = node.getSpec();
if (null == nodeSpec) {
log.error("invalid node spec, without spec:{}", node);
return null;
}
Map<String, Object> nodeMetadata = node.getMetadata();
SpecNode nodeObj = new SpecNode();
SpecScript script = new SpecScript();
PaiflowArguments arguments = nodeSpec.getArguments();
SpecScriptRuntime specScriptRuntime = new SpecScriptRuntime();
nodeObj.setMetadata(nodeMetadata);
nodeObj.setScript(script);
// 设置节点的名称,DataWorks的名称不支持-,需要转为下划线
nodeObj.setName(getNodeName(String.valueOf(nodeMetadata.get("name"))));
nodeObj.setId(UuidUtils.genUuidWithoutHorizontalLine());
nodeMap.put(nodeObj.getName(), nodeObj);
// 从paiflow中获取参数,填充到DataWorks Spec中的paiflowConf中
Map<String, Object> paiflowConf = Maps.newHashMap();
specScriptRuntime.setPaiflowConf(paiflowConf);
script.setRuntime(specScriptRuntime);
// 从paiflow中获取artifacts,填充到DataWorks Spec中的上下文输入参数,并根据上下文输入参数,建立节点间的依赖
SpecFlowDepend specFlowDepend = new SpecFlowDepend();
specFlowDepend.setNodeId(nodeObj);
Optional<PaiflowSpec> paiflowManifestSpec = ListUtils.emptyIfNull(pipelinePaiflowSpecs).stream()
.filter(x -> Objects.equals(x.getMetadata().get("identifier"), nodeMetadata.get("identifier"))
&& Objects.equals(x.getMetadata().get("provider"), nodeMetadata.get("provider"))
&& Objects.equals(x.getMetadata().get("version"), nodeMetadata.get("version"))
).findFirst()
.map(PaiflowSpec::getSpec);
List<PaiflowParameter> parametersWithValue = Optional.ofNullable(arguments).map(PaiflowArguments::getParameters)
.orElse(Collections.emptyList()).stream()
.map(x -> Optional.ofNullable(parameterMap.get(x.getFromParameterName())).map(parameterWithValue -> {
PaiflowParameter paiflowParameter = new PaiflowParameter();
paiflowParameter.setName(x.getName());
paiflowParameter.setValue(parameterWithValue.getValue());
return paiflowParameter;
}).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<PaiflowParameter> parametersWithDefaultValue = paiflowManifestSpec
.map(PaiflowSpec::getInputs)
.map(PaiflowArguments::getParameters)
.orElse(Collections.emptyList()).stream()
.filter(x -> parametersWithValue.stream().noneMatch(y -> StringUtils.equals(x.getName(), y.getName())))
.map(x -> {
PaiflowParameter paiflowParameter = new PaiflowParameter();
paiflowParameter.setName(x.getName());
paiflowParameter.setValue(x.getValue());
if ("Map".equalsIgnoreCase(x.getType())) {
paiflowParameter.setValue(Maps.newHashMap());
}
return paiflowParameter;
}).collect(Collectors.toList());
paiflowConf.put(PAIFLOW_CONF_KEY_ARGUMENT_PARAMETER, parametersWithValue.isEmpty() ? parametersWithDefaultValue : parametersWithValue );
// 从paiflow的静态manifest中获取input artifact,填充到DataWorks Spec中的script parameters
List<SpecVariable> variableParameters = paiflowManifestSpec
.map(PaiflowSpec::getInputs)
.map(PaiflowArguments::getArtifacts)
.orElse(Collections.emptyList()).stream()
.map(x -> buildPaiOutputSpecVariable(x.getName(), null)).collect(Collectors.toList());
Map<String, SpecVariable> variableParameterMap = variableParameters.stream().collect(Collectors.toMap(SpecVariable::getName, x -> x));
// 节点的依赖需要根据名称去重,因此定义为map
LinkedHashMap<String, SpecDepend> nodeDepends = new LinkedHashMap<>();
// 从paiflow的运行参数重获取arguments,将arguments中的from参数替换为DataWorks Spec中的上下文输入参数
List<SpecVariable> inputVariableList = Optional.ofNullable(arguments).map(PaiflowArguments::getArtifacts)
.orElse(Collections.emptyList()).stream()
.filter(x -> variableParameterMap.containsKey(x.getName()))
.map(x -> buildSpecVariableByPaiflowArtifact(x, variableParameterMap.get(x.getName()), nodeDepends))
.filter(Objects::nonNull)
.collect(Collectors.toList());
nodeObj.setInputs(new ArrayList<>(inputVariableList));
// 从paiflow的静态manifest中获取output artifacts,填充到DataWorks Spec中的上下文输出参数
List<SpecVariable> outputContextList = paiflowManifestSpec
.map(PaiflowSpec::getOutputs)
.map(PaiflowArguments::getArtifacts)
.orElse(Collections.emptyList()).stream()
.map(x -> buildPaiOutputSpecVariable(x.getName(), null)).collect(Collectors.toList());
nodeObj.setOutputs(new ArrayList<>(outputContextList));
script.setParameters(variableParameters);
if (MapUtils.isNotEmpty(nodeDepends)) {
specFlowDepend.setDepends(new ArrayList<>(nodeDepends.values()));
flow.add(specFlowDepend);
}
return nodeObj;
}