in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [227:303]
private void deleteRecursively(final IRVertex vertexToDelete, final Set<IRVertex> visited) {
if (!Util.isUtilityVertex(vertexToDelete)) {
throw new IllegalArgumentException(vertexToDelete.getId());
}
if (visited.contains(vertexToDelete)) {
return;
}
// Three data structures
final Set<IRVertex> vertexGroupToDelete = getVertexGroupToDelete(vertexToDelete);
final Set<IRVertex> utilityParents = vertexGroupToDelete.stream()
.map(modifiedDAG::getIncomingEdgesOf)
.flatMap(inEdgeList -> inEdgeList.stream().map(IREdge::getSrc))
.filter(Util::isUtilityVertex)
.collect(Collectors.toSet());
final Set<IRVertex> utilityChildren = vertexGroupToDelete.stream()
.map(modifiedDAG::getOutgoingEdgesOf)
.flatMap(outEdgeList -> outEdgeList.stream().map(IREdge::getDst))
.filter(Util::isUtilityVertex)
.collect(Collectors.toSet());
// We have 'visited' this group
visited.addAll(vertexGroupToDelete);
// STEP 1: Delete parent utility vertices
// Vertices that are 'in between' the group are also deleted here
Sets.difference(utilityParents, vertexGroupToDelete).forEach(ptd -> deleteRecursively(ptd, visited));
// STEP 2: Delete the specified vertex(vertices)
if (vertexToDelete instanceof RelayVertex) {
final DAGBuilder<IRVertex, IREdge> builder = rebuildExcluding(modifiedDAG, vertexGroupToDelete);
// Add a new edge that directly connects the src of the stream vertex to its dst
modifiedDAG.getOutgoingEdgesOf(vertexToDelete).stream()
.filter(e -> !Util.isControlEdge(e))
.map(IREdge::getDst)
.forEach(dstVertex ->
modifiedDAG.getIncomingEdgesOf(vertexToDelete).stream()
.filter(e -> !Util.isControlEdge(e))
.map(IREdge::getSrc)
.forEach(srcVertex -> builder.connectVertices(
Util.cloneEdge(streamVertexToOriginalEdge.get(vertexToDelete), srcVertex, dstVertex))));
modifiedDAG = builder.buildWithoutSourceSinkCheck();
} else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof MessageGeneratorVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
.filter(vtd -> vtd instanceof MessageAggregatorVertex)
.map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
() -> new IllegalArgumentException(
"MessageAggregatorVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
.findAny();
deletedMessageIdOptional.ifPresent(deletedMessageId ->
modifiedDAG.getEdges().forEach(e ->
e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
hashSet -> hashSet.remove(deletedMessageId))));
} else if (vertexToDelete instanceof SamplingVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
} else if (vertexToDelete instanceof SignalVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
.map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
() -> new IllegalArgumentException(
"SignalVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
.findAny();
deletedMessageIdOptional.ifPresent(deletedMessageId ->
modifiedDAG.getEdges().forEach(e ->
e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
hashSet -> hashSet.remove(deletedMessageId))));
} else if (vertexToDelete instanceof TaskSizeSplitterVertex) {
modifiedDAG = rebuildExcludingSplitter(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
} else {
throw new IllegalArgumentException(vertexToDelete.getId());
}
// STEP 3: Delete children utility vertices
Sets.difference(utilityChildren, vertexGroupToDelete).forEach(ctd -> deleteRecursively(ctd, visited));
}