public void insert()

in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [383:456]


  public void insert(final RelayVertex relayVertex, final IREdge edgeToStreamize) {
    assertNonExistence(relayVertex);
    assertNonControlEdge(edgeToStreamize);

    // Create a completely new DAG with the vertex inserted.
    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();

    // Integrity check
    if (edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).isPresent()
      && !edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).get().isEmpty()) {
      throw new CompileTimeOptimizationException(edgeToStreamize.getId() + " has a MessageId, and cannot be removed");
    }

    // RelayVertex should not be inserted before SplitterVertex.
    if (edgeToStreamize.getDst() instanceof TaskSizeSplitterVertex) {
      return;
    }

    // Insert the vertex.
    final IRVertex vertexToInsert = wrapSamplingVertexIfNeeded(relayVertex, edgeToStreamize.getSrc());
    builder.addVertex(vertexToInsert);
    edgeToStreamize.getSrc().getPropertyValue(ParallelismProperty.class)
      .ifPresent(p -> vertexToInsert.setProperty(ParallelismProperty.of(p)));

    // Build the new DAG to reflect the new topology.
    modifiedDAG.topologicalDo(v -> {
      builder.addVertex(v); // None of the existing vertices are deleted.

      for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
        if (edge.equals(edgeToStreamize)) {
          // MATCH!

          // Edge to the relayVertex
          final IREdge toSV = new IREdge(
            edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(),
            edgeToStreamize.getSrc(),
            vertexToInsert);
          edgeToStreamize.copyExecutionPropertiesTo(toSV);

          // Edge from the relayVertex.
          final IREdge fromSV = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, vertexToInsert, v);
          fromSV.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get()));
          fromSV.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get()));

          // Annotations for efficient data transfers - toSV
          toSV.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
          toSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
          toSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.NONE));

          // Annotations for efficient data transfers - fromSV
          fromSV.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of()));
          fromSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.NONE));
          fromSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4));
          fromSV.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Type.DEDICATED_KEY_PER_ELEMENT));

          // Track the new edges.
          builder.connectVertices(toSV);
          builder.connectVertices(fromSV);
        } else {
          // NO MATCH, so simply connect vertices as before.
          builder.connectVertices(edge);
        }
      }
    });

    if (edgeToStreamize.getSrc() instanceof RelayVertex) {
      streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getSrc()));
    } else if (edgeToStreamize.getDst() instanceof RelayVertex) {
      streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getDst()));
    } else {
      streamVertexToOriginalEdge.put(relayVertex, edgeToStreamize);
    }
    modifiedDAG = builder.build(); // update the DAG.
  }