private DAGBuilder rebuildExcludingSplitter()

in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [319:369]


  private DAGBuilder<IRVertex, IREdge> rebuildExcludingSplitter(final DAG<IRVertex, IREdge> dag,
                                                                final Set<IRVertex> excluded) {
    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
    dag.getVertices().stream().filter(v -> !excluded.contains(v)).forEach(builder::addVertex);
    dag.getEdges().stream()
      .filter(e -> !(excluded.contains(e.getSrc()) || excluded.contains(e.getDst())))
      .forEach(builder::connectVertices);

    for (IRVertex vertex : excluded) {
      if (!(vertex instanceof TaskSizeSplitterVertex)) {
        break;
      }
      final TaskSizeSplitterVertex splitter = (TaskSizeSplitterVertex) vertex;
      //first, restore original vertices
      DAG<IRVertex, IREdge> internalDag = splitter.getDAG();
      internalDag.getVertices().stream().filter(v -> !(v instanceof SignalVertex)).forEach(builder::addVertex);
      internalDag.getEdges().stream()
        .filter(e -> !(e.getSrc() instanceof SignalVertex || e.getDst() instanceof SignalVertex))
        .forEach(builder::connectVertices);

      //second, take care of edges connected to splitter vertex
      for (IREdge edgeToSplitter : dag.getIncomingEdgesOf(splitter)) {
        if (edgeToSplitter.getSrc() instanceof TaskSizeSplitterVertex) {
          final TaskSizeSplitterVertex prevSp = (TaskSizeSplitterVertex) edgeToSplitter.getSrc();
          final IREdge internalEdge = prevSp.getEdgeWithInternalVertex(edgeToSplitter);
          final IREdge newEdgeToPrevSp = Util.cloneEdge(internalEdge, prevSp, internalEdge.getDst());
          prevSp.mapEdgeWithLoop(newEdgeToPrevSp, internalEdge);

          builder.connectVertices(newEdgeToPrevSp);
        } else {
          final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeToSplitter);
          builder.connectVertices(internalEdge);
        }
      }

      for (IREdge edgeFromSplitter : dag.getOutgoingEdgesOf(splitter)) {
        if (edgeFromSplitter.getDst() instanceof TaskSizeSplitterVertex) {
          final TaskSizeSplitterVertex nextSp = (TaskSizeSplitterVertex) edgeFromSplitter.getDst();
          final IREdge internalEdge = nextSp.getEdgeWithInternalVertex(edgeFromSplitter);
          final IREdge newEdgeToNextSp = Util.cloneEdge(internalEdge, internalEdge.getSrc(), nextSp);
          nextSp.mapEdgeWithLoop(newEdgeToNextSp, internalEdge);

          builder.connectVertices(newEdgeToNextSp);
        } else {
          final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeFromSplitter);
          builder.connectVertices(internalEdge);
        }
      }
    }
    return builder;
  }