List topologicalSort()

in samza-core/src/main/java/org/apache/samza/execution/JobGraph.java [378:459]


  List<JobNode> topologicalSort() {
    Collection<JobNode> pnodes = nodes.values();
    if (pnodes.size() == 1) {
      return new ArrayList<>(pnodes);
    }

    Queue<JobNode> q = new ArrayDeque<>();
    Map<String, Long> indegree = new HashMap<>();
    Set<JobNode> visited = new HashSet<>();
    pnodes.forEach(node -> {
      String nid = node.getJobNameAndId();
      //only count the degrees of intermediate streams
      long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count();
      indegree.put(nid, degree);

      if (degree == 0L) {
        // start from the nodes that has no intermediate input streams, so it only consumes from input streams
        q.add(node);
        visited.add(node);
      }
    });

    List<JobNode> sortedNodes = new ArrayList<>();
    Set<JobNode> reachable = new HashSet<>();
    while (sortedNodes.size() < pnodes.size()) {
      // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
      // This approach will not change the graph itself during computation.
      //
      // The algorithm works as:
      // 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
      // 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
      // 3. add any new nodes with ingree being 0
      // 4. loop 1-3 until no more nodes with indegree 0
      //
      while (!q.isEmpty()) {
        JobNode node = q.poll();
        sortedNodes.add(node);
        node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
          String nid = n.getJobNameAndId();
          Long degree = indegree.get(nid) - 1;
          indegree.put(nid, degree);
          if (degree == 0L && !visited.contains(n)) {
            q.add(n);
            visited.add(n);
          }
          reachable.add(n);
        });
      }

      if (sortedNodes.size() < pnodes.size()) {
        // The remaining nodes have cycles
        // use the following approach to break the cycles
        // start from the nodes that are reachable from previous traverse
        reachable.removeAll(sortedNodes);
        if (!reachable.isEmpty()) {
          //find out the nodes with minimal input edge
          long min = Long.MAX_VALUE;
          JobNode minNode = null;
          for (JobNode node : reachable) {
            Long degree = indegree.get(node.getJobNameAndId());
            if (degree < min) {
              min = degree;
              minNode = node;
            }
          }
          // start from the node with minimal input edge again
          q.add(minNode);
          visited.add(minNode);
        } else {
          // all the remaining nodes should be reachable from input streams
          // start from input streams again to find the next node that hasn't been visited
          JobNode nextNode = inputStreams.stream().flatMap(input -> input.getTargetNodes().stream())
              .filter(node -> !visited.contains(node))
              .findAny().get();
          q.add(nextNode);
          visited.add(nextNode);
        }
      }
    }

    return sortedNodes;
  }