in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [319:369]
private DAGBuilder<IRVertex, IREdge> rebuildExcludingSplitter(final DAG<IRVertex, IREdge> dag,
final Set<IRVertex> excluded) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
dag.getVertices().stream().filter(v -> !excluded.contains(v)).forEach(builder::addVertex);
dag.getEdges().stream()
.filter(e -> !(excluded.contains(e.getSrc()) || excluded.contains(e.getDst())))
.forEach(builder::connectVertices);
for (IRVertex vertex : excluded) {
if (!(vertex instanceof TaskSizeSplitterVertex)) {
break;
}
final TaskSizeSplitterVertex splitter = (TaskSizeSplitterVertex) vertex;
//first, restore original vertices
DAG<IRVertex, IREdge> internalDag = splitter.getDAG();
internalDag.getVertices().stream().filter(v -> !(v instanceof SignalVertex)).forEach(builder::addVertex);
internalDag.getEdges().stream()
.filter(e -> !(e.getSrc() instanceof SignalVertex || e.getDst() instanceof SignalVertex))
.forEach(builder::connectVertices);
//second, take care of edges connected to splitter vertex
for (IREdge edgeToSplitter : dag.getIncomingEdgesOf(splitter)) {
if (edgeToSplitter.getSrc() instanceof TaskSizeSplitterVertex) {
final TaskSizeSplitterVertex prevSp = (TaskSizeSplitterVertex) edgeToSplitter.getSrc();
final IREdge internalEdge = prevSp.getEdgeWithInternalVertex(edgeToSplitter);
final IREdge newEdgeToPrevSp = Util.cloneEdge(internalEdge, prevSp, internalEdge.getDst());
prevSp.mapEdgeWithLoop(newEdgeToPrevSp, internalEdge);
builder.connectVertices(newEdgeToPrevSp);
} else {
final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeToSplitter);
builder.connectVertices(internalEdge);
}
}
for (IREdge edgeFromSplitter : dag.getOutgoingEdgesOf(splitter)) {
if (edgeFromSplitter.getDst() instanceof TaskSizeSplitterVertex) {
final TaskSizeSplitterVertex nextSp = (TaskSizeSplitterVertex) edgeFromSplitter.getDst();
final IREdge internalEdge = nextSp.getEdgeWithInternalVertex(edgeFromSplitter);
final IREdge newEdgeToNextSp = Util.cloneEdge(internalEdge, internalEdge.getSrc(), nextSp);
nextSp.mapEdgeWithLoop(newEdgeToNextSp, internalEdge);
builder.connectVertices(newEdgeToNextSp);
} else {
final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeFromSplitter);
builder.connectVertices(internalEdge);
}
}
}
return builder;
}