public DAGBuilder connectSplitterVertexWithoutReplacing()

in common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java [268:303]


  public DAGBuilder<V, E> connectSplitterVertexWithoutReplacing(final E edgeToReference, final E edgeToInsert) {
    final V src = edgeToInsert.getSrc();
    final V dst = edgeToInsert.getDst();

    if (vertices.contains(src) && vertices.contains(dst)) {
      // integrity check: TaskSizeSplitterVertex should only appear in IRDAG.
      if (!(edgeToInsert instanceof IREdge)) {
        return this;
      }

      if (src instanceof TaskSizeSplitterVertex && edgeToReference.getSrc().equals(src)) {
        TaskSizeSplitterVertex spSrc = (TaskSizeSplitterVertex) src;
        IREdge internalEdge = spSrc.getEdgeWithInternalVertex((IREdge) edgeToReference);
        IREdge newInternalEdge = Util.cloneEdge((IREdge) edgeToInsert, internalEdge.getSrc(), (IRVertex) dst);
        spSrc.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
      }
      if (dst instanceof TaskSizeSplitterVertex && edgeToReference.getDst().equals(dst)) {
        TaskSizeSplitterVertex spDst = (TaskSizeSplitterVertex) dst;
        IREdge internalEdge = spDst.getEdgeWithInternalVertex((IREdge) edgeToReference);
        IREdge newInternalEdge = Util.cloneEdge(internalEdge,
          (IRVertex) src,
          internalEdge.getDst());
        spDst.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
      }
      incomingEdges.get(dst).add(edgeToInsert);
      outgoingEdges.get(src).add(edgeToInsert);
    } else {
      this.buildWithoutSourceSinkCheck().storeJSON("debug", "errored_ir", "Errored IR");
      throw new IllegalVertexOperationException("The DAG does not contain"
        + (vertices.contains(src) ? "" : " [source]") + (vertices.contains(dst) ? "" : " [destination]")
        + " of the edge: [" + (src == null ? null : src.getId())
        + "]->[" + (dst == null ? null : dst.getId()) + "] in "
        + vertices.stream().map(V::getId).collect(Collectors.toSet()));
    }
    return this;
  }