in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [249:281]
void addPartitioningCheckers() {
final NeighborChecker partitionerAndPartitionSet = ((v, inEdges, outEdges) -> {
for (final IREdge inEdge : inEdges) {
final Optional<Pair<PartitionerProperty.Type, Integer>> partitioner =
inEdge.getPropertyValue(PartitionerProperty.class);
final Optional<ArrayList<KeyRange>> partitionSet = inEdge.getPropertyValue(PartitionSetProperty.class);
// Shuffle edge
if (partitioner.isPresent() && partitionSet.isPresent()) {
final Set<Integer> flattenedPartitionOffsets = partitionSet.get()
.stream()
.flatMap(keyRange -> IntStream.range(
(int) keyRange.rangeBeginInclusive(), (int) keyRange.rangeEndExclusive()).boxed())
.collect(Collectors.toSet());
if (partitioner.get().right() == PartitionerProperty.NUM_EQUAL_TO_DST_PARALLELISM) {
final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
if (parallelism.isPresent()
&& !getZeroToNSet(parallelism.get()).equals(flattenedPartitionOffsets)) {
return failure("PartitionSet must contain all partition offsets required for dst parallelism",
v, ParallelismProperty.class, inEdge, PartitionSetProperty.class);
}
} else {
if (!getZeroToNSet(partitioner.get().right()).equals(flattenedPartitionOffsets)) {
return failure("PartitionSet must contain all partition offsets required for the partitioner",
inEdge, PartitionerProperty.class, PartitionSetProperty.class);
}
}
}
}
return success();
});
neighborCheckerList.add(partitionerAndPartitionSet);
}