public PhysicalPlan rewrite()

in compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java [94:145]


  public PhysicalPlan rewrite(final int messageId) {
    try {
      this.readyToRewriteLatch.await();
    } catch (final InterruptedException e) {
      LOG.error("Interrupted while waiting for the rewrite latch: {}", e);
      Thread.currentThread().interrupt();
    }
    if (currentIRDAG == null) {
      throw new IllegalStateException();
    }
    final Map<Object, Long> aggregatedData = messageIdToAggregatedData.remove(messageId); // remove for GC
    if (aggregatedData == null) {
      throw new IllegalStateException();
    }

    // Find IREdges using the messageId
    final Set<IREdge> examiningEdges = currentIRDAG
      .getVertices()
      .stream()
      .flatMap(v -> currentIRDAG.getIncomingEdgesOf(v).stream())
      .filter(e -> e.getPropertyValue(MessageIdEdgeProperty.class).isPresent()
        && e.getPropertyValue(MessageIdEdgeProperty.class).get().contains(messageId)
        && !(e.getDst() instanceof MessageAggregatorVertex))
      .collect(Collectors.toSet());
    if (examiningEdges.isEmpty()) {
      throw new IllegalArgumentException(String.valueOf(messageId));
    }

    // Optimize using the Message
    final Message message = new Message(messageId, examiningEdges, aggregatedData);
    final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
    this.setCurrentIRDAG(newIRDAG);

    // Re-compile the IRDAG into a physical plan
    final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);

    // Update the physical plan and return
    final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
    final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
    IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
      final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
      currentStages.get(i).setExecutionProperties(newProperties);
      newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
        currentStages.get(i).getTaskIndices().clear();
        currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
          .collect(Collectors.toList()));
        IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
          currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
      });
    });
    return currentPhysicalPlan;
  }