public IRDAG apply()

in compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java [63:107]


  public IRDAG apply(final IRDAG dag) {
    // Propagate forward source parallelism
    dag.topologicalDo(vertex -> {
      try {
        final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
        if (inEdges.isEmpty() && vertex instanceof SourceVertex) {
          // For source vertices, we try to split the source reader by the desired source parallelism.
          // After that, we set the parallelism as the number of split readers.
          // (It can be more/less than the desired value.)
          final SourceVertex sourceVertex = (SourceVertex) vertex;
          final Optional<Integer> originalParallelism = vertex.getPropertyValue(ParallelismProperty.class);
          // We manipulate them if it is set as default value of 1.
          if (!originalParallelism.isPresent()) {
            vertex.setProperty(ParallelismProperty.of(
              sourceVertex.getReadables(desiredSourceParallelism).size()));
          }
        } else if (!inEdges.isEmpty()) {
          // No reason to propagate via Broadcast edges, as the data streams that will use the broadcasted data
          // as a sideInput will have their own number of parallelism
          final Integer o2oParallelism = inEdges.stream()
            .filter(edge -> CommunicationPatternProperty.Value.ONE_TO_ONE
              .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get()))
            .mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
            .max().orElse(1);
          final Integer shuffleParallelism = inEdges.stream()
            .filter(edge -> CommunicationPatternProperty.Value.SHUFFLE
              .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get()))
            .mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
            .map(i -> i / shuffleDecreaseFactor)
            .max().orElse(1);
          // We set the greater value as the parallelism.
          final Integer parallelism = o2oParallelism > shuffleParallelism ? o2oParallelism : shuffleParallelism;
          vertex.setProperty(ParallelismProperty.of(parallelism));
          // synchronize one-to-one edges parallelism
          recursivelySynchronizeO2OParallelism(dag, vertex, parallelism);
        } else if (!vertex.getPropertyValue(ParallelismProperty.class).isPresent()) {
          throw new RuntimeException("There is a non-source vertex that doesn't have any inEdges "
            + "(excluding SideInput edges)");
        } // No problem otherwise.
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    });
    return dag;
  }