in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanAppender.java [61:130]
public static PhysicalPlan appendPlan(final PhysicalPlan originalPlan,
final PhysicalPlan planToAppend) {
// Add the stage DAG of the original plan to the builder at first.
final DAGBuilder<Stage, StageEdge> physicalDAGBuilder = new DAGBuilder<>(originalPlan.getStageDAG());
// Scan cached data in the original plan.
final Map<UUID, StageEdge> cachedEdges = new HashMap<>();
originalPlan.getStageDAG().getVertices().forEach(
stage -> originalPlan.getStageDAG().getIncomingEdgesOf(stage).stream()
// Cached edge toward a temporary data receiver is a representative edge.
.filter(stageEdge ->
stageEdge.getDstIRVertex().getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).isPresent())
.forEach(stageEdge -> stageEdge.getPropertyValue(CacheIDProperty.class)
.ifPresent(cacheId -> cachedEdges.put(cacheId, stageEdge))
));
// Scan CacheID to a pair of cached source vertex and it's stage from the plan to append.
final Map<UUID, Pair<IRVertex, Stage>> cacheCandidates = new HashMap<>();
final DAG<Stage, StageEdge> dagToAppend = planToAppend.getStageDAG();
dagToAppend.topologicalDo(stage -> {
// Add the stage DAG of the plan to append to the builder.
physicalDAGBuilder.addVertex(stage);
dagToAppend.getIncomingEdgesOf(stage).
forEach(edge -> {
physicalDAGBuilder.connectVertices(edge);
// Find cached-data requiring stage edges in the submitted plan.
if (edge.getSrcIRVertex() instanceof CachedSourceVertex) {
final UUID cacheId = edge.getPropertyValue(CacheIDProperty.class)
.orElseThrow(() -> new PlanAppenderException("No cache id in the cached edge " + edge.getId()));
cacheCandidates.put(cacheId, Pair.of(edge.getSrcIRVertex(), edge.getSrc()));
}
});
// Find cached-data requiring ir edges in the submitted plan.
final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stage.getIRDAG();
stageIRDAG.getVertices().stream()
.filter(irVertex -> irVertex instanceof CachedSourceVertex)
.forEach(cachedSourceVertex ->
stageIRDAG.getOutgoingEdgesOf(cachedSourceVertex).forEach(runtimeEdge -> {
final UUID cacheId = runtimeEdge.getPropertyValue(CacheIDProperty.class)
.orElseThrow(
() -> new PlanAppenderException("No cache id in the cached edge " + runtimeEdge.getId()));
cacheCandidates.put(cacheId, Pair.of(runtimeEdge.getSrc(), stage));
}));
});
// Link the cached data and the stages require the data.
cacheCandidates.forEach((cacheId, vertexStagePair) -> {
final StageEdge cachedEdge = cachedEdges.get(cacheId);
if (cachedEdge != null) {
final StageEdge newEdge = new StageEdge(
IdManager.newEdgeId(),
cachedEdge.getExecutionProperties(),
cachedEdge.getSrcIRVertex(),
vertexStagePair.left(),
cachedEdge.getSrc(),
vertexStagePair.right());
physicalDAGBuilder.connectVertices(newEdge);
final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue =
cachedEdge.getPropertyValue(DuplicateEdgeGroupProperty.class)
.orElseThrow(() -> new PlanAppenderException("Cached edge does not have duplicated edge group property."));
duplicateEdgeGroupPropertyValue.setGroupSize(duplicateEdgeGroupPropertyValue.getGroupSize() + 1);
newEdge.getExecutionProperties().put(DuplicateEdgeGroupProperty.of(duplicateEdgeGroupPropertyValue));
} else {
throw new PlanAppenderException("Cached edge is not found in the original plan.");
}
});
return new PhysicalPlan(originalPlan.getPlanId(), physicalDAGBuilder.build());
}