public DAGBuilder connectSplitterVertexWithReplacing()

in common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java [197:246]


  public DAGBuilder<V, E> connectSplitterVertexWithReplacing(final E originalEdge, final E edgeToInsert) {
    final V src = edgeToInsert.getSrc();
    final V dst = edgeToInsert.getDst();

    if (vertices.contains(src) && vertices.contains(dst)) {
      // integrity check: TaskSizeSplitterVertex should only appear in IRDAG.
      if (!(edgeToInsert instanceof IREdge)) {
        return this;
      }

      if (!originalEdge.getSrc().equals(src)) {
        throw new IllegalVertexOperationException(originalEdge.getId()
          + " and" + edgeToInsert.getId() + " should have same source, but founded\n edge : source"
          + originalEdge.getId() + " : " + originalEdge.getSrc().getId()
          + edgeToInsert.getId() + " : " + edgeToInsert.getSrc().getId());
      }

      if (!originalEdge.getDst().equals(dst)) {
        throw new IllegalVertexOperationException(originalEdge.getId()
          + " and" + edgeToInsert.getId() + " should have same destination, but founded\n edge : dest"
          + originalEdge.getId() + " : " + originalEdge.getDst().getId()
          + edgeToInsert.getId() + " : " + edgeToInsert.getDst().getId());
      }

      if (src instanceof TaskSizeSplitterVertex) {
        TaskSizeSplitterVertex spSrc = (TaskSizeSplitterVertex) src;
        IREdge internalEdge = spSrc.getEdgeWithInternalVertex((IREdge) originalEdge);
        IREdge newInternalEdge = Util.cloneEdge(internalEdge, internalEdge.getSrc(), (IRVertex) dst);
        spSrc.mapEdgeWithLoop((IREdge) originalEdge, newInternalEdge);
        spSrc.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
      }
      if (dst instanceof TaskSizeSplitterVertex) {
        TaskSizeSplitterVertex spDst = (TaskSizeSplitterVertex) dst;
        IREdge internalEdge = spDst.getEdgeWithInternalVertex((IREdge) originalEdge);
        IREdge newInternalEdge = Util.cloneEdge(internalEdge, (IRVertex) src, internalEdge.getDst());
        spDst.mapEdgeWithLoop((IREdge) originalEdge, newInternalEdge);
        spDst.mapEdgeWithLoop((IREdge) edgeToInsert, newInternalEdge);
      }
      incomingEdges.get(dst).add(edgeToInsert);
      outgoingEdges.get(src).add(edgeToInsert);
    } else {
      this.buildWithoutSourceSinkCheck().storeJSON("debug", "errored_ir", "Errored IR");
      throw new IllegalVertexOperationException("The DAG does not contain"
        + (vertices.contains(src) ? "" : " [source]") + (vertices.contains(dst) ? "" : " [destination]")
        + " of the edge: [" + (src == null ? null : src.getId())
        + "]->[" + (dst == null ? null : dst.getId()) + "] in "
        + vertices.stream().map(V::getId).collect(Collectors.toSet()));
    }
    return this;
  }