in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [383:456]
public void insert(final RelayVertex relayVertex, final IREdge edgeToStreamize) {
assertNonExistence(relayVertex);
assertNonControlEdge(edgeToStreamize);
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// Integrity check
if (edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).isPresent()
&& !edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).get().isEmpty()) {
throw new CompileTimeOptimizationException(edgeToStreamize.getId() + " has a MessageId, and cannot be removed");
}
// RelayVertex should not be inserted before SplitterVertex.
if (edgeToStreamize.getDst() instanceof TaskSizeSplitterVertex) {
return;
}
// Insert the vertex.
final IRVertex vertexToInsert = wrapSamplingVertexIfNeeded(relayVertex, edgeToStreamize.getSrc());
builder.addVertex(vertexToInsert);
edgeToStreamize.getSrc().getPropertyValue(ParallelismProperty.class)
.ifPresent(p -> vertexToInsert.setProperty(ParallelismProperty.of(p)));
// Build the new DAG to reflect the new topology.
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v); // None of the existing vertices are deleted.
for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
if (edge.equals(edgeToStreamize)) {
// MATCH!
// Edge to the relayVertex
final IREdge toSV = new IREdge(
edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(),
edgeToStreamize.getSrc(),
vertexToInsert);
edgeToStreamize.copyExecutionPropertiesTo(toSV);
// Edge from the relayVertex.
final IREdge fromSV = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, vertexToInsert, v);
fromSV.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get()));
fromSV.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get()));
// Annotations for efficient data transfers - toSV
toSV.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
toSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
toSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.NONE));
// Annotations for efficient data transfers - fromSV
fromSV.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of()));
fromSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.NONE));
fromSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4));
fromSV.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Type.DEDICATED_KEY_PER_ELEMENT));
// Track the new edges.
builder.connectVertices(toSV);
builder.connectVertices(fromSV);
} else {
// NO MATCH, so simply connect vertices as before.
builder.connectVertices(edge);
}
}
});
if (edgeToStreamize.getSrc() instanceof RelayVertex) {
streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getSrc()));
} else if (edgeToStreamize.getDst() instanceof RelayVertex) {
streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getDst()));
} else {
streamVertexToOriginalEdge.put(relayVertex, edgeToStreamize);
}
modifiedDAG = builder.build(); // update the DAG.
}