in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [661:724]
public void insert(final Set<SamplingVertex> toInsert,
final Set<IRVertex> executeAfter) {
toInsert.forEach(this::assertNonExistence);
executeAfter.forEach(this::assertExistence);
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// All of the existing vertices and edges remain intact
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);
modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
});
// Add the sampling vertices
toInsert.forEach(builder::addVertex);
// Get the original vertices
final Map<IRVertex, IRVertex> originalToSampling = toInsert.stream()
.collect(Collectors.toMap(sv -> modifiedDAG.getVertexById(sv.getOriginalVertexId()), Function.identity()));
final Set<IREdge> inEdgesOfOriginals = originalToSampling.keySet()
.stream()
.flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream())
.collect(Collectors.toSet());
// [EDGE TYPE 1] Between sampling vertices
final Set<IREdge> betweenOriginals = inEdgesOfOriginals
.stream()
.filter(ovInEdge -> originalToSampling.containsKey(ovInEdge.getSrc()))
.collect(Collectors.toSet());
betweenOriginals.stream().map(boEdge -> Util.cloneEdge(
boEdge,
originalToSampling.get(boEdge.getSrc()),
originalToSampling.get(boEdge.getDst()))).forEach(builder::connectVertices);
// [EDGE TYPE 2] From original IRDAG to sampling vertices
final Set<IREdge> notBetweenOriginals = inEdgesOfOriginals
.stream()
.filter(ovInEdge -> !originalToSampling.containsKey(ovInEdge.getSrc()))
.collect(Collectors.toSet());
notBetweenOriginals.stream().map(nboEdge -> {
final IREdge cloneEdge = Util.cloneEdge(
nboEdge,
nboEdge.getSrc(), // sampling vertices consume a subset of original data partitions here
originalToSampling.get(nboEdge.getDst()));
nboEdge.copyExecutionPropertiesTo(cloneEdge); // exec properties must be exactly the same
return cloneEdge;
}).forEach(builder::connectVertices);
// [EDGE TYPE 3] From sampling vertices to vertices that should be executed after
final Set<IRVertex> sinks = getSinksWithinVertexSet(modifiedDAG, originalToSampling.keySet())
.stream()
.map(originalToSampling::get)
.collect(Collectors.toSet());
for (final IRVertex ea : executeAfter) {
for (final IRVertex sink : sinks) {
// Control edge that enforces execution ordering
builder.connectVertices(Util.createControlEdge(sink, ea));
}
}
toInsert.forEach(tiv -> samplingVertexToGroup.put(tiv, toInsert));
modifiedDAG = builder.build(); // update the DAG.
}