in compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java [94:145]
public PhysicalPlan rewrite(final int messageId) {
try {
this.readyToRewriteLatch.await();
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for the rewrite latch: {}", e);
Thread.currentThread().interrupt();
}
if (currentIRDAG == null) {
throw new IllegalStateException();
}
final Map<Object, Long> aggregatedData = messageIdToAggregatedData.remove(messageId); // remove for GC
if (aggregatedData == null) {
throw new IllegalStateException();
}
// Find IREdges using the messageId
final Set<IREdge> examiningEdges = currentIRDAG
.getVertices()
.stream()
.flatMap(v -> currentIRDAG.getIncomingEdgesOf(v).stream())
.filter(e -> e.getPropertyValue(MessageIdEdgeProperty.class).isPresent()
&& e.getPropertyValue(MessageIdEdgeProperty.class).get().contains(messageId)
&& !(e.getDst() instanceof MessageAggregatorVertex))
.collect(Collectors.toSet());
if (examiningEdges.isEmpty()) {
throw new IllegalArgumentException(String.valueOf(messageId));
}
// Optimize using the Message
final Message message = new Message(messageId, examiningEdges, aggregatedData);
final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
this.setCurrentIRDAG(newIRDAG);
// Re-compile the IRDAG into a physical plan
final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);
// Update the physical plan and return
final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
currentStages.get(i).setExecutionProperties(newProperties);
newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
currentStages.get(i).getTaskIndices().clear();
currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
.collect(Collectors.toList()));
IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
});
});
return currentPhysicalPlan;
}