in client/migrationx/migrationx-domain/migrationx-domain-azkaban/src/main/java/com/aliyun/dataworks/migrationx/domain/dataworks/azkaban/service/AzkabanPackageParser.java [203:311]
private Job parseJobFile(FlowLoader flowLoader, File flowFile, Flow flow, Node node) {
Props props = getNodeProps(flowLoader, flowFile, flow, node.getId());
Map<String, Props> nodeProps = jobProperties.computeIfAbsent(flow.getId(), flowName -> new HashMap<>());
nodeProps.put(node.getId(), props);
Properties properties = props.toAllProperties();
String type = properties.getProperty("type");
JobType jobType = JobType.getByName(type);
Job job = JobType.newJobInstance(jobType);
job.setName(node.getId());
if (StringUtils.isNotBlank(props.getSource())) {
job.setJobFile(new File(projectDir, props.getSource()));
}
List<Field> fields = ReflectUtils.getPropertyFields(job);
fields.removeIf(
field -> field.getDeclaringClass().equals(Job.class) && "type".equalsIgnoreCase(field.getName()));
Enumeration<?> e = properties.propertyNames();
Set<String> keys = new HashSet<>();
while (e.hasMoreElements()) {
String key = (String)e.nextElement();
keys.add(key);
}
fields.forEach(field -> {
ConfigProperty anno = field.getDeclaredAnnotation(ConfigProperty.class);
if (anno == null) {
return;
}
try {
field.setAccessible(true);
if (StringUtils.isNotBlank(anno.pattern())) {
Pattern pattern = Pattern.compile(anno.pattern());
List<String> values = keys.stream()
.filter(key -> pattern.matcher(key).matches())
.map(properties::getProperty)
.collect(Collectors.toList());
field.set(job, values);
}
if (StringUtils.isNotBlank(anno.value())) {
String value = properties.getProperty(anno.value(), null);
Object objectValue = convertType(field, anno, value);
field.set(job, objectValue);
}
} catch (IllegalAccessException illegalAccessException) {
throw new RuntimeException(illegalAccessException);
}
});
job.processJobRelativeFiles();
if (CollectionUtils.isEmpty(job.getDependencies())) {
job.setDependencies(CollectionUtils.emptyIfNull(flow.getEdges()).stream()
.filter(edge -> StringUtils.equals(edge.getTargetId(), node.getId()))
.map(Edge::getSourceId).collect(Collectors.toList()));
LOGGER.info("flow: {}, job: {}, depends: {}", flow.getId(), job.getName(), job.getDependencies());
}
// 如果自己是一个flow节点
if (JobType.flow.equals(job.getType())) {
List<String> depends = ListUtils.emptyIfNull(job.getDependencies());
// flow虚节点设置为依赖flow内部所有节点的end节点
List<String> flowDepends = Optional.ofNullable(Optional.ofNullable(
MapUtils.emptyIfNull(flowLoader.getFlowMap())
.get(Joiner.on(Constants.PATH_DELIMITER).join(flow.getId(), job.getName())))
.orElse(flowLoader.getFlowMap().values().stream()
.filter(f -> StringUtils.equals(f.getId(), node.getEmbeddedFlowId()))
.findFirst().orElse(null)))
.map(f -> ListUtils.emptyIfNull(f.getEndNodes()).stream()
.map(n -> Joiner.on(FLOW_NODE_DELIMITER).join(FileNameUtils.normalizedFileName(f.getId()), n.getId()))
.collect(Collectors.toList()))
.orElse(new ArrayList<>());
// flow内部的所有start节点依赖flow虚节点的上游
// Optional.ofNullable(MapUtils.emptyIfNull(flowLoader.getFlowMap()).get(node.getEmbeddedFlowId()))
// .ifPresent(embeddedFlow -> {
// ListUtils.emptyIfNull(depends).forEach(dep -> {
// embeddedFlow.getStartNodes().stream().forEach(n -> {
// Edge edge = new Edge(Joiner.on(FLOW_NODE_DELIMITER).join(flow.getId(), dep), n.getId());
// embeddedFlow.addEdge(edge);
// //LOGGER.info("add edge: {} for flow: {}, inner node: {}",
// // Joiner.on(">>").join(edge.getSourceId(), edge.getTargetId()),
// // embeddedFlow.getId(), n.getId());
// });
// });
//});
Optional.ofNullable(MapUtils.emptyIfNull(flowLoader.getFlowMap()).get(node.getEmbeddedFlowId()))
.ifPresent(f -> f.setEmbeddedFlow(true));
job.setDependencies(new ArrayList<>(CollectionUtils.union(depends, flowDepends)));
LOGGER.info("flow: {}, job: {}, depends: {}", flow.getId(), job.getName(), job.getDependencies());
} else {
// flow内部的start节点, 依赖flow节点的上游
// if (CollectionUtils.isEmpty(job.getDependencies()) && BooleanUtils.isTrue(flow.isEmbeddedFlow())) {
// // 找到flow节点
// flowLoader.getFlowMap().values().stream()
// .filter(f -> f.getNodes().stream().anyMatch(n -> flow.getId().equals(n.getEmbeddedFlowId())))
// .forEach(f ->
// f.getNodes().stream().filter(n -> flow.getId().equals(n.getEmbeddedFlowId()))
// .findFirst()
// .ifPresent(flowNode -> job.setDependencies(
// Collections.singletonList(Joiner.on(FLOW_NODE_DELIMITER).join(f.getId(), flowNode
// .getId())))));
//}
}
return job;
}