in compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java [63:107]
public IRDAG apply(final IRDAG dag) {
// Propagate forward source parallelism
dag.topologicalDo(vertex -> {
try {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (inEdges.isEmpty() && vertex instanceof SourceVertex) {
// For source vertices, we try to split the source reader by the desired source parallelism.
// After that, we set the parallelism as the number of split readers.
// (It can be more/less than the desired value.)
final SourceVertex sourceVertex = (SourceVertex) vertex;
final Optional<Integer> originalParallelism = vertex.getPropertyValue(ParallelismProperty.class);
// We manipulate them if it is set as default value of 1.
if (!originalParallelism.isPresent()) {
vertex.setProperty(ParallelismProperty.of(
sourceVertex.getReadables(desiredSourceParallelism).size()));
}
} else if (!inEdges.isEmpty()) {
// No reason to propagate via Broadcast edges, as the data streams that will use the broadcasted data
// as a sideInput will have their own number of parallelism
final Integer o2oParallelism = inEdges.stream()
.filter(edge -> CommunicationPatternProperty.Value.ONE_TO_ONE
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get()))
.mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
.max().orElse(1);
final Integer shuffleParallelism = inEdges.stream()
.filter(edge -> CommunicationPatternProperty.Value.SHUFFLE
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get()))
.mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
.map(i -> i / shuffleDecreaseFactor)
.max().orElse(1);
// We set the greater value as the parallelism.
final Integer parallelism = o2oParallelism > shuffleParallelism ? o2oParallelism : shuffleParallelism;
vertex.setProperty(ParallelismProperty.of(parallelism));
// synchronize one-to-one edges parallelism
recursivelySynchronizeO2OParallelism(dag, vertex, parallelism);
} else if (!vertex.getPropertyValue(ParallelismProperty.class).isPresent()) {
throw new RuntimeException("There is a non-source vertex that doesn't have any inEdges "
+ "(excluding SideInput edges)");
} // No problem otherwise.
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return dag;
}