in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [730:800]
public void insert(final TaskSizeSplitterVertex toInsert) {
final Set<IRVertex> originalVertices = toInsert.getOriginalVertices();
final Set<IREdge> incomingEdgesOfOriginalVertices = originalVertices
.stream()
.flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream())
.collect(Collectors.toSet());
final Set<IREdge> outgoingEdgesOfOriginalVertices = originalVertices
.stream()
.flatMap(ov -> modifiedDAG.getOutgoingEdgesOf(ov).stream())
.collect(Collectors.toSet());
final Set<IREdge> fromOutsideToOriginal = toInsert.getEdgesFromOutsideToOriginal(modifiedDAG);
final Set<IREdge> fromOriginalToOutside = toInsert.getEdgesFromOriginalToOutside(modifiedDAG);
// make edges connected to splitter vertex
final Set<IREdge> fromOutsideToSplitter = toInsert.getEdgesFromOutsideToSplitter(modifiedDAG);
final Set<IREdge> fromSplitterToOutside = toInsert.getEdgesFromSplitterToOutside(modifiedDAG);
//map splitter vertex connection to corresponding internal vertex connection
for (IREdge splitterEdge : fromSplitterToOutside) {
for (IREdge internalEdge : fromOriginalToOutside) {
if (splitterEdge.getDst() instanceof TaskSizeSplitterVertex) {
TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) splitterEdge.getDst();
if (nextSplitter.getOriginalVertices().contains(internalEdge.getDst())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
} else {
if (splitterEdge.getDst().equals(internalEdge.getDst())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
}
}
}
for (IREdge splitterEdge : fromOutsideToSplitter) {
for (IREdge internalEdge : fromOutsideToOriginal) {
if (splitterEdge.getSrc().equals(internalEdge.getSrc())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
}
}
fromOutsideToOriginal.forEach(toInsert::addDagIncomingEdge);
fromOutsideToOriginal.forEach(toInsert::addNonIterativeIncomingEdge);
fromOriginalToOutside.forEach(toInsert::addDagOutgoingEdge);
// All preparation done. Insert splitter vertex.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
//insert vertex and edges irrelevant to splitter vertex
modifiedDAG.topologicalDo(v -> {
if (!originalVertices.contains(v)) {
builder.addVertex(v);
for (IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
if (!incomingEdgesOfOriginalVertices.contains(edge) && !outgoingEdgesOfOriginalVertices.contains(edge)) {
builder.connectVertices(edge);
}
}
}
});
//insert splitter vertices
builder.addVertex(toInsert);
//connect splitter to outside world
fromOutsideToSplitter.forEach(builder::connectVertices);
fromSplitterToOutside.forEach(builder::connectVertices);
modifiedDAG = builder.build();
}