void addScheduleGroupCheckers()

in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [386:434]


  void addScheduleGroupCheckers() {
    final GlobalDAGChecker scheduleGroupTopoOrdering = (irdag -> {
      int lastSeenScheduleGroup = Integer.MIN_VALUE;

      for (final IRVertex v : irdag.getVertices()) {
        final MutableObject violatingReachableVertex = new MutableObject();
        v.getPropertyValue(ScheduleGroupProperty.class).ifPresent(startingScheduleGroup ->
          irdag.dfsDo(
            v,
            visited -> {
              if (visited.getPropertyValue(ScheduleGroupProperty.class).isPresent()
                && visited.getPropertyValue(ScheduleGroupProperty.class).get() < startingScheduleGroup) {
                violatingReachableVertex.setValue(visited);
              }
            },
            DAGInterface.TraversalOrder.PreOrder,
            new HashSet<>()));
        if (violatingReachableVertex.getValue() != null) {
          return failure(
            "A reachable vertex with a smaller schedule group ",
            v,
            ScheduleGroupProperty.class,
            violatingReachableVertex.getValue(),
            ScheduleGroupProperty.class);
        }
      }
      return success();
    });
    globalDAGCheckerList.add(scheduleGroupTopoOrdering);

    final SingleEdgeChecker splitByPull = (edge -> {
      if (Util.isControlEdge(edge)) {
        return success();
      }

      if (Optional.of(DataFlowProperty.Value.PULL).equals(edge.getPropertyValue(DataFlowProperty.class))) {
        final Optional<Integer> srcSG = edge.getSrc().getPropertyValue(ScheduleGroupProperty.class);
        final Optional<Integer> dstSG = edge.getDst().getPropertyValue(ScheduleGroupProperty.class);
        if (srcSG.isPresent() && dstSG.isPresent()) {
          if (srcSG.get().equals(dstSG.get())) {
            return failure("Schedule group must split by Pull",
              edge.getSrc(), ScheduleGroupProperty.class, edge.getDst(), ScheduleGroupProperty.class);
          }
        }
      }
      return success();
    });
    singleEdgeCheckerList.add(splitByPull);
  }