void addPartitioningCheckers()

in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [249:281]


  void addPartitioningCheckers() {
    final NeighborChecker partitionerAndPartitionSet = ((v, inEdges, outEdges) -> {
      for (final IREdge inEdge : inEdges) {
        final Optional<Pair<PartitionerProperty.Type, Integer>> partitioner =
          inEdge.getPropertyValue(PartitionerProperty.class);
        final Optional<ArrayList<KeyRange>> partitionSet = inEdge.getPropertyValue(PartitionSetProperty.class);
        // Shuffle edge
        if (partitioner.isPresent() && partitionSet.isPresent()) {
          final Set<Integer> flattenedPartitionOffsets = partitionSet.get()
            .stream()
            .flatMap(keyRange -> IntStream.range(
              (int) keyRange.rangeBeginInclusive(), (int) keyRange.rangeEndExclusive()).boxed())
            .collect(Collectors.toSet());
          if (partitioner.get().right() == PartitionerProperty.NUM_EQUAL_TO_DST_PARALLELISM) {
            final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
            if (parallelism.isPresent()
              && !getZeroToNSet(parallelism.get()).equals(flattenedPartitionOffsets)) {
              return failure("PartitionSet must contain all partition offsets required for dst parallelism",
                v, ParallelismProperty.class, inEdge, PartitionSetProperty.class);
            }
          } else {
            if (!getZeroToNSet(partitioner.get().right()).equals(flattenedPartitionOffsets)) {
              return failure("PartitionSet must contain all partition offsets required for the partitioner",
                inEdge, PartitionerProperty.class, PartitionSetProperty.class);
            }
          }
        }
      }

      return success();
    });
    neighborCheckerList.add(partitionerAndPartitionSet);
  }