in common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java [268:303]
public DAGBuilder<V, E> connectSplitterVertexWithoutReplacing(final E edgeToReference, final E edgeToInsert) {
final V src = edgeToInsert.getSrc();
final V dst = edgeToInsert.getDst();
if (vertices.contains(src) && vertices.contains(dst)) {
// integrity check: TaskSizeSplitterVertex should only appear in IRDAG.
if (!(edgeToInsert instanceof IREdge)) {
return this;
}
if (src instanceof TaskSizeSplitterVertex && edgeToReference.getSrc().equals(src)) {
TaskSizeSplitterVertex spSrc = (TaskSizeSplitterVertex) src;
IREdge internalEdge = spSrc.getEdgeWithInternalVertex((IREdge) edgeToReference);
IREdge newInternalEdge = Util.cloneEdge((IREdge) edgeToInsert, internalEdge.getSrc(), (IRVertex) dst);
spSrc.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
}
if (dst instanceof TaskSizeSplitterVertex && edgeToReference.getDst().equals(dst)) {
TaskSizeSplitterVertex spDst = (TaskSizeSplitterVertex) dst;
IREdge internalEdge = spDst.getEdgeWithInternalVertex((IREdge) edgeToReference);
IREdge newInternalEdge = Util.cloneEdge(internalEdge,
(IRVertex) src,
internalEdge.getDst());
spDst.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
}
incomingEdges.get(dst).add(edgeToInsert);
outgoingEdges.get(src).add(edgeToInsert);
} else {
this.buildWithoutSourceSinkCheck().storeJSON("debug", "errored_ir", "Errored IR");
throw new IllegalVertexOperationException("The DAG does not contain"
+ (vertices.contains(src) ? "" : " [source]") + (vertices.contains(dst) ? "" : " [destination]")
+ " of the edge: [" + (src == null ? null : src.getId())
+ "]->[" + (dst == null ? null : dst.getId()) + "] in "
+ vertices.stream().map(V::getId).collect(Collectors.toSet()));
}
return this;
}