public void insert()

in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [730:800]


  public void insert(final TaskSizeSplitterVertex toInsert) {
    final Set<IRVertex> originalVertices = toInsert.getOriginalVertices();

    final Set<IREdge> incomingEdgesOfOriginalVertices = originalVertices
      .stream()
      .flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream())
      .collect(Collectors.toSet());

    final Set<IREdge> outgoingEdgesOfOriginalVertices = originalVertices
      .stream()
      .flatMap(ov -> modifiedDAG.getOutgoingEdgesOf(ov).stream())
      .collect(Collectors.toSet());

    final Set<IREdge> fromOutsideToOriginal = toInsert.getEdgesFromOutsideToOriginal(modifiedDAG);
    final Set<IREdge> fromOriginalToOutside = toInsert.getEdgesFromOriginalToOutside(modifiedDAG);

    // make edges connected to splitter vertex
    final Set<IREdge> fromOutsideToSplitter = toInsert.getEdgesFromOutsideToSplitter(modifiedDAG);
    final Set<IREdge> fromSplitterToOutside = toInsert.getEdgesFromSplitterToOutside(modifiedDAG);

    //map splitter vertex connection to corresponding internal vertex connection
    for (IREdge splitterEdge : fromSplitterToOutside) {
      for (IREdge internalEdge : fromOriginalToOutside) {
        if (splitterEdge.getDst() instanceof TaskSizeSplitterVertex) {
          TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) splitterEdge.getDst();
          if (nextSplitter.getOriginalVertices().contains(internalEdge.getDst())) {
            toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
          }
        } else {
          if (splitterEdge.getDst().equals(internalEdge.getDst())) {
            toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
          }
        }
      }
    }

    for (IREdge splitterEdge : fromOutsideToSplitter) {
      for (IREdge internalEdge : fromOutsideToOriginal) {
        if (splitterEdge.getSrc().equals(internalEdge.getSrc())) {
          toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
        }
      }
    }

    fromOutsideToOriginal.forEach(toInsert::addDagIncomingEdge);
    fromOutsideToOriginal.forEach(toInsert::addNonIterativeIncomingEdge);
    fromOriginalToOutside.forEach(toInsert::addDagOutgoingEdge);

    // All preparation done. Insert splitter vertex.
    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();

    //insert vertex and edges irrelevant to splitter vertex
    modifiedDAG.topologicalDo(v -> {
      if (!originalVertices.contains(v)) {
        builder.addVertex(v);
        for (IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
          if (!incomingEdgesOfOriginalVertices.contains(edge) && !outgoingEdgesOfOriginalVertices.contains(edge)) {
            builder.connectVertices(edge);
          }
        }
      }
    });
    //insert splitter vertices
    builder.addVertex(toInsert);

    //connect splitter to outside world
    fromOutsideToSplitter.forEach(builder::connectVertices);
    fromSplitterToOutside.forEach(builder::connectVertices);

    modifiedDAG = builder.build();
  }