private void deleteRecursively()

in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [227:303]


  private void deleteRecursively(final IRVertex vertexToDelete, final Set<IRVertex> visited) {
    if (!Util.isUtilityVertex(vertexToDelete)) {
      throw new IllegalArgumentException(vertexToDelete.getId());
    }
    if (visited.contains(vertexToDelete)) {
      return;
    }

    // Three data structures
    final Set<IRVertex> vertexGroupToDelete = getVertexGroupToDelete(vertexToDelete);
    final Set<IRVertex> utilityParents = vertexGroupToDelete.stream()
      .map(modifiedDAG::getIncomingEdgesOf)
      .flatMap(inEdgeList -> inEdgeList.stream().map(IREdge::getSrc))
      .filter(Util::isUtilityVertex)
      .collect(Collectors.toSet());
    final Set<IRVertex> utilityChildren = vertexGroupToDelete.stream()
      .map(modifiedDAG::getOutgoingEdgesOf)
      .flatMap(outEdgeList -> outEdgeList.stream().map(IREdge::getDst))
      .filter(Util::isUtilityVertex)
      .collect(Collectors.toSet());

    // We have 'visited' this group
    visited.addAll(vertexGroupToDelete);

    // STEP 1: Delete parent utility vertices
    // Vertices that are 'in between' the group are also deleted here
    Sets.difference(utilityParents, vertexGroupToDelete).forEach(ptd -> deleteRecursively(ptd, visited));

    // STEP 2: Delete the specified vertex(vertices)
    if (vertexToDelete instanceof RelayVertex) {
      final DAGBuilder<IRVertex, IREdge> builder = rebuildExcluding(modifiedDAG, vertexGroupToDelete);

      // Add a new edge that directly connects the src of the stream vertex to its dst
      modifiedDAG.getOutgoingEdgesOf(vertexToDelete).stream()
        .filter(e -> !Util.isControlEdge(e))
        .map(IREdge::getDst)
        .forEach(dstVertex ->
          modifiedDAG.getIncomingEdgesOf(vertexToDelete).stream()
            .filter(e -> !Util.isControlEdge(e))
            .map(IREdge::getSrc)
            .forEach(srcVertex -> builder.connectVertices(
              Util.cloneEdge(streamVertexToOriginalEdge.get(vertexToDelete), srcVertex, dstVertex))));
      modifiedDAG = builder.buildWithoutSourceSinkCheck();
    } else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof MessageGeneratorVertex) {
      modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
      final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
        .filter(vtd -> vtd instanceof MessageAggregatorVertex)
        .map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
          () -> new IllegalArgumentException(
            "MessageAggregatorVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
        .findAny();
      deletedMessageIdOptional.ifPresent(deletedMessageId ->
        modifiedDAG.getEdges().forEach(e ->
          e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
            hashSet -> hashSet.remove(deletedMessageId))));
    } else if (vertexToDelete instanceof SamplingVertex) {
      modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
    } else if (vertexToDelete instanceof SignalVertex) {
      modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
      final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
        .map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
          () -> new IllegalArgumentException(
            "SignalVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
        .findAny();
      deletedMessageIdOptional.ifPresent(deletedMessageId ->
        modifiedDAG.getEdges().forEach(e ->
          e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
            hashSet -> hashSet.remove(deletedMessageId))));
    } else if (vertexToDelete instanceof TaskSizeSplitterVertex) {
      modifiedDAG = rebuildExcludingSplitter(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
    } else {
      throw new IllegalArgumentException(vertexToDelete.getId());
    }

    // STEP 3: Delete children utility vertices
    Sets.difference(utilityChildren, vertexGroupToDelete).forEach(ctd -> deleteRecursively(ctd, visited));
  }