in common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java [197:246]
public DAGBuilder<V, E> connectSplitterVertexWithReplacing(final E originalEdge, final E edgeToInsert) {
final V src = edgeToInsert.getSrc();
final V dst = edgeToInsert.getDst();
if (vertices.contains(src) && vertices.contains(dst)) {
// integrity check: TaskSizeSplitterVertex should only appear in IRDAG.
if (!(edgeToInsert instanceof IREdge)) {
return this;
}
if (!originalEdge.getSrc().equals(src)) {
throw new IllegalVertexOperationException(originalEdge.getId()
+ " and" + edgeToInsert.getId() + " should have same source, but founded\n edge : source"
+ originalEdge.getId() + " : " + originalEdge.getSrc().getId()
+ edgeToInsert.getId() + " : " + edgeToInsert.getSrc().getId());
}
if (!originalEdge.getDst().equals(dst)) {
throw new IllegalVertexOperationException(originalEdge.getId()
+ " and" + edgeToInsert.getId() + " should have same destination, but founded\n edge : dest"
+ originalEdge.getId() + " : " + originalEdge.getDst().getId()
+ edgeToInsert.getId() + " : " + edgeToInsert.getDst().getId());
}
if (src instanceof TaskSizeSplitterVertex) {
TaskSizeSplitterVertex spSrc = (TaskSizeSplitterVertex) src;
IREdge internalEdge = spSrc.getEdgeWithInternalVertex((IREdge) originalEdge);
IREdge newInternalEdge = Util.cloneEdge(internalEdge, internalEdge.getSrc(), (IRVertex) dst);
spSrc.mapEdgeWithLoop((IREdge) originalEdge, newInternalEdge);
spSrc.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
}
if (dst instanceof TaskSizeSplitterVertex) {
TaskSizeSplitterVertex spDst = (TaskSizeSplitterVertex) dst;
IREdge internalEdge = spDst.getEdgeWithInternalVertex((IREdge) originalEdge);
IREdge newInternalEdge = Util.cloneEdge(internalEdge, (IRVertex) src, internalEdge.getDst());
spDst.mapEdgeWithLoop((IREdge) originalEdge, newInternalEdge);
spDst.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
}
incomingEdges.get(dst).add(edgeToInsert);
outgoingEdges.get(src).add(edgeToInsert);
} else {
this.buildWithoutSourceSinkCheck().storeJSON("debug", "errored_ir", "Errored IR");
throw new IllegalVertexOperationException("The DAG does not contain"
+ (vertices.contains(src) ? "" : " [source]") + (vertices.contains(dst) ? "" : " [destination]")
+ " of the edge: [" + (src == null ? null : src.getId())
+ "]->[" + (dst == null ? null : dst.getId()) + "] in "
+ vertices.stream().map(V::getId).collect(Collectors.toSet()));
}
return this;
}