in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [283:312]
void addShuffleEdgeCheckers() {
final NeighborChecker shuffleChecker = ((v, inEdges, outEdges) -> {
for (final IREdge inEdge : inEdges) {
if (CommunicationPatternProperty.Value.SHUFFLE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
// Shuffle edges must have the following properties
if (!inEdge.getPropertyValue(KeyExtractorProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyEncoderProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
return failure("Shuffle edge does not have a Key-related property: " + inEdge.getId());
}
} else {
// Non-shuffle edges must not have the following properties
final Optional<Pair<PartitionerProperty.Type, Integer>> partitioner =
inEdge.getPropertyValue(PartitionerProperty.class);
if (partitioner.isPresent() && partitioner.get().left().equals(PartitionerProperty.Type.HASH)) {
return failure("Only shuffle can have the hash partitioner",
inEdge, CommunicationPatternProperty.class, PartitionerProperty.class);
}
if (inEdge.getPropertyValue(PartitionSetProperty.class).isPresent()) {
return failure("Only shuffle can select partition sets",
inEdge, CommunicationPatternProperty.class, PartitionSetProperty.class);
}
}
}
return success();
});
neighborCheckerList.add(shuffleChecker);
}