in samza-core/src/main/java/org/apache/samza/execution/JobGraph.java [378:459]
List<JobNode> topologicalSort() {
Collection<JobNode> pnodes = nodes.values();
if (pnodes.size() == 1) {
return new ArrayList<>(pnodes);
}
Queue<JobNode> q = new ArrayDeque<>();
Map<String, Long> indegree = new HashMap<>();
Set<JobNode> visited = new HashSet<>();
pnodes.forEach(node -> {
String nid = node.getJobNameAndId();
//only count the degrees of intermediate streams
long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count();
indegree.put(nid, degree);
if (degree == 0L) {
// start from the nodes that has no intermediate input streams, so it only consumes from input streams
q.add(node);
visited.add(node);
}
});
List<JobNode> sortedNodes = new ArrayList<>();
Set<JobNode> reachable = new HashSet<>();
while (sortedNodes.size() < pnodes.size()) {
// Here we use indegree-based approach to implment Kahn's algorithm for topological sort
// This approach will not change the graph itself during computation.
//
// The algorithm works as:
// 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
// 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
// 3. add any new nodes with ingree being 0
// 4. loop 1-3 until no more nodes with indegree 0
//
while (!q.isEmpty()) {
JobNode node = q.poll();
sortedNodes.add(node);
node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
String nid = n.getJobNameAndId();
Long degree = indegree.get(nid) - 1;
indegree.put(nid, degree);
if (degree == 0L && !visited.contains(n)) {
q.add(n);
visited.add(n);
}
reachable.add(n);
});
}
if (sortedNodes.size() < pnodes.size()) {
// The remaining nodes have cycles
// use the following approach to break the cycles
// start from the nodes that are reachable from previous traverse
reachable.removeAll(sortedNodes);
if (!reachable.isEmpty()) {
//find out the nodes with minimal input edge
long min = Long.MAX_VALUE;
JobNode minNode = null;
for (JobNode node : reachable) {
Long degree = indegree.get(node.getJobNameAndId());
if (degree < min) {
min = degree;
minNode = node;
}
}
// start from the node with minimal input edge again
q.add(minNode);
visited.add(minNode);
} else {
// all the remaining nodes should be reachable from input streams
// start from input streams again to find the next node that hasn't been visited
JobNode nextNode = inputStreams.stream().flatMap(input -> input.getTargetNodes().stream())
.filter(node -> !visited.contains(node))
.findAny().get();
q.add(nextNode);
visited.add(nextNode);
}
}
}
return sortedNodes;
}