in common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java [171:247]
void addParallelismCheckers() {
final SingleVertexChecker parallelismWithOtherEPsInSingleVertex = (v -> {
final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
if (!parallelism.isPresent()) {
return success(); // No need to check, if the parallelism is not set yet
}
final Optional<Integer> resourceSiteSize = v.getPropertyValue(ResourceSiteProperty.class)
.map(rs -> rs.values().stream().mapToInt(Integer::intValue).sum());
if (resourceSiteSize.isPresent() && !parallelism.equals(resourceSiteSize)) {
return failure("Parallelism must equal to sum of site nums",
v, ParallelismProperty.class, ResourceSiteProperty.class);
}
final Optional<HashSet<Integer>> antiAffinitySet = v.getPropertyValue(ResourceAntiAffinityProperty.class);
if (antiAffinitySet.isPresent()
&& !getZeroToNSet(parallelism.get()).containsAll(antiAffinitySet.get())) {
return failure("Offsets must be within parallelism",
v, ParallelismProperty.class, ResourceAntiAffinityProperty.class);
}
return success();
});
singleVertexCheckerList.add(parallelismWithOtherEPsInSingleVertex);
final SingleVertexChecker parallelismOfSourceVertex = (v -> {
final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
try {
if (parallelism.isPresent() && v instanceof SourceVertex) {
final int numOfReadables = ((SourceVertex) v).getReadables(parallelism.get()).size();
if (parallelism.get() != numOfReadables) {
return failure(String.format("(Parallelism %d) != (Number of SourceVertex %s Readables %d)",
parallelism.get(), v.getId(), numOfReadables));
}
}
} catch (Exception e) {
return failure(e.getMessage());
}
return success();
});
singleVertexCheckerList.add(parallelismOfSourceVertex);
final NeighborChecker parallelismWithCommPattern = ((v, inEdges, outEdges) -> {
// Just look at incoming (edges, as this checker will be applied on every vertex
for (final IREdge inEdge : inEdges) {
if (CommunicationPatternProperty.Value.ONE_TO_ONE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
if (v.getPropertyValue(ParallelismProperty.class).isPresent()
&& inEdge.getSrc().getPropertyValue(ParallelismProperty.class).isPresent()
&& !inEdge.getSrc().getPropertyValue(ParallelismProperty.class)
.equals(v.getPropertyValue(ParallelismProperty.class))) {
return failure("OneToOne edges must have the same parallelism",
inEdge.getSrc(), ParallelismProperty.class, v, ParallelismProperty.class);
}
}
}
return success();
});
neighborCheckerList.add(parallelismWithCommPattern);
final NeighborChecker parallelismWithPartitionSet = ((v, inEdges, outEdges) -> {
final Optional<Integer> parallelism = v.getPropertyValue(ParallelismProperty.class);
for (final IREdge inEdge : inEdges) {
final Optional<Integer> keyRangeListSize = inEdge.getPropertyValue(PartitionSetProperty.class)
.map(List::size);
if (parallelism.isPresent() && keyRangeListSize.isPresent() && !parallelism.equals(keyRangeListSize)) {
return failure("PartitionSet must contain all task offsets required for the dst parallelism",
v, ParallelismProperty.class, inEdge, PartitionSetProperty.class);
}
}
return success();
});
neighborCheckerList.add(parallelismWithPartitionSet);
}