in gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java [194:277]
List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
DatasetDescriptor destDatasetDescriptor, int numberOfHops) {
List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
List<String> edgeIds = ConfigUtils.getStringList(this.flowConfig, ConfigurationKeys.WHITELISTED_EDGE_IDS);
for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
if (!edgeIds.isEmpty() && !edgeIds.contains(flowEdge.getId())) {
continue;
}
try {
DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
//Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive.
if (!edgeDestination.isActive() || !flowEdge.isActive()) {
continue;
}
boolean foundExecutor = false;
//Iterate over all executors for this edge. Find the first one that resolves the underlying flow template.
for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
Config mergedConfig = getMergedConfig(flowEdge);
List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs =
flowEdge.getFlowTemplate().getDatasetDescriptors(mergedConfig, false);
for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) {
DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
HashMap<String, ArrayList<String>> errors = flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
HashMap<String, HashMap<String, ArrayList<String>>> edgeErrors = new HashMap<>();
HashMap<String, HashMap<String, ArrayList<String>>> templateErrors = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
edgeErrors.put(flowEdge.getId(), errors);
if (errors.size() != 0) {
try {
flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(edgeErrors));
}
catch (JsonProcessingException e) {
e.printStackTrace();
}
continue;
}
ArrayList<String> datasetDescriptorErrors = inputDatasetDescriptor.contains(currentDatasetDescriptor);
if (datasetDescriptorErrors.size() == 0) {
DatasetDescriptor edgeOutputDescriptor = makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
specExecutor);
if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig()).size() == 0) {
/*
Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
with those of destination dataset descriptor.
In other words, we prioritize edges that perform data transformations as close to the source as possible.
*/
prioritizedEdgeList.add(0, flowEdgeContext);
} else {
prioritizedEdgeList.add(flowEdgeContext);
}
foundExecutor = true;
}
else {
HashMap<String, ArrayList<String>> templateError = new HashMap<>();
templateError.put("flowTemplateErrors", datasetDescriptorErrors);
templateErrors.put(flowEdge.getId(), templateError);
try {
flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(templateErrors), numberOfHops);
}
catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
// Found a SpecExecutor. Proceed to the next FlowEdge.
// TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
if (foundExecutor) {
break;
}
}
} catch (IOException | ReflectiveOperationException | SpecNotFoundException | JobTemplate.TemplateException e) {
//Skip the edge; and continue
log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
}
}
return prioritizedEdgeList;
}