in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [479:578]
public void insert(final MessageGeneratorVertex messageGeneratorVertex,
final MessageAggregatorVertex messageAggregatorVertex,
final EncoderProperty triggerOutputEncoder,
final DecoderProperty triggerOutputDecoder,
final Set<IREdge> edgesToGetStatisticsOf,
final Set<IREdge> edgesToOptimize) {
//edge case: when the destination of mav is splitter, do not insert!
assertNonExistence(messageGeneratorVertex);
assertNonExistence(messageAggregatorVertex);
edgesToGetStatisticsOf.forEach(this::assertNonControlEdge);
edgesToOptimize.forEach(this::assertNonControlEdge);
if (edgesToGetStatisticsOf.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
}
if (edgesToOptimize.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
}
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// All of the existing vertices and edges remain intact
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);
modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
});
////////////////////////////////// STEP 1: Insert new vertices and edges (src - trigger - agg - dst)
// From src to trigger
final List<IRVertex> triggerList = new ArrayList<>();
for (final IREdge edge : edgesToGetStatisticsOf) {
final IRVertex triggerToAdd = wrapSamplingVertexIfNeeded(
new MessageGeneratorVertex<>(messageGeneratorVertex.getMessageFunction()), edge.getSrc());
builder.addVertex(triggerToAdd);
triggerList.add(triggerToAdd);
edge.getSrc().getPropertyValue(ParallelismProperty.class)
.ifPresent(p -> triggerToAdd.setProperty(ParallelismProperty.of(p)));
final IREdge edgeToClone;
if (edge.getSrc() instanceof RelayVertex) {
edgeToClone = streamVertexToOriginalEdge.get(edge.getSrc());
} else if (edge.getDst() instanceof RelayVertex) {
edgeToClone = streamVertexToOriginalEdge.get(edge.getDst());
} else {
edgeToClone = edge;
}
final IREdge clone = Util.cloneEdge(
CommunicationPatternProperty.Value.ONE_TO_ONE, edgeToClone, edge.getSrc(), triggerToAdd);
if (edge.getSrc() instanceof TaskSizeSplitterVertex) {
builder.connectSplitterVertexWithoutReplacing(edgeToClone, clone);
} else {
builder.connectVertices(clone);
}
}
// Add agg (no need to wrap inside sampling vertices)
builder.addVertex(messageAggregatorVertex);
// From trigger to agg
for (final IRVertex trigger : triggerList) {
final IREdge edgeToMav = edgeToMessageAggregator(
trigger, messageAggregatorVertex, triggerOutputEncoder, triggerOutputDecoder);
builder.connectVertices(edgeToMav);
}
// From agg to dst
// Add a control dependency (no output) from the messageAggregatorVertex to the destination.
IREdge aggToDst = Util.createControlEdge(
messageAggregatorVertex, edgesToGetStatisticsOf.iterator().next().getDst());
if (edgesToGetStatisticsOf.iterator().next().getDst() instanceof TaskSizeSplitterVertex) {
builder.connectSplitterVertexWithoutReplacing(edgesToGetStatisticsOf.iterator().next(), aggToDst);
} else {
builder.connectVertices(aggToDst);
}
////////////////////////////////// STEP 2: Annotate the MessageId on optimization target edges
modifiedDAG.topologicalDo(v ->
modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
if (edgesToOptimize.contains(inEdge)) {
final HashSet<Integer> msgEdgeIds =
inEdge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
msgEdgeIds.add(messageAggregatorVertex.getPropertyValue(MessageIdVertexProperty.class).get());
inEdge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
}
})
);
final Set<IRVertex> insertedVertices = new HashSet<>();
insertedVertices.addAll(triggerList);
insertedVertices.add(messageAggregatorVertex);
triggerList.forEach(trigger -> messageVertexToGroup.put(trigger, insertedVertices));
messageVertexToGroup.put(messageAggregatorVertex, insertedVertices);
modifiedDAG = builder.build(); // update the DAG.
}