public void insert()

in common/src/main/java/org/apache/nemo/common/ir/IRDAG.java [479:578]


  public void insert(final MessageGeneratorVertex messageGeneratorVertex,
                     final MessageAggregatorVertex messageAggregatorVertex,
                     final EncoderProperty triggerOutputEncoder,
                     final DecoderProperty triggerOutputDecoder,
                     final Set<IREdge> edgesToGetStatisticsOf,
                     final Set<IREdge> edgesToOptimize) {
    //edge case: when the destination of mav is splitter, do not insert!
    assertNonExistence(messageGeneratorVertex);
    assertNonExistence(messageAggregatorVertex);
    edgesToGetStatisticsOf.forEach(this::assertNonControlEdge);
    edgesToOptimize.forEach(this::assertNonControlEdge);

    if (edgesToGetStatisticsOf.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
      throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
    }
    if (edgesToOptimize.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
      throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
    }

    // Create a completely new DAG with the vertex inserted.
    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();

    // All of the existing vertices and edges remain intact
    modifiedDAG.topologicalDo(v -> {
      builder.addVertex(v);
      modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
    });

    ////////////////////////////////// STEP 1: Insert new vertices and edges (src - trigger - agg - dst)

    // From src to trigger
    final List<IRVertex> triggerList = new ArrayList<>();
    for (final IREdge edge : edgesToGetStatisticsOf) {
      final IRVertex triggerToAdd = wrapSamplingVertexIfNeeded(
        new MessageGeneratorVertex<>(messageGeneratorVertex.getMessageFunction()), edge.getSrc());
      builder.addVertex(triggerToAdd);
      triggerList.add(triggerToAdd);
      edge.getSrc().getPropertyValue(ParallelismProperty.class)
        .ifPresent(p -> triggerToAdd.setProperty(ParallelismProperty.of(p)));

      final IREdge edgeToClone;
      if (edge.getSrc() instanceof RelayVertex) {
        edgeToClone = streamVertexToOriginalEdge.get(edge.getSrc());
      } else if (edge.getDst() instanceof RelayVertex) {
        edgeToClone = streamVertexToOriginalEdge.get(edge.getDst());
      } else {
        edgeToClone = edge;
      }

      final IREdge clone = Util.cloneEdge(
        CommunicationPatternProperty.Value.ONE_TO_ONE, edgeToClone, edge.getSrc(), triggerToAdd);
      if (edge.getSrc() instanceof TaskSizeSplitterVertex) {
        builder.connectSplitterVertexWithoutReplacing(edgeToClone, clone);
      } else {
        builder.connectVertices(clone);
      }
    }

    // Add agg (no need to wrap inside sampling vertices)
    builder.addVertex(messageAggregatorVertex);

    // From trigger to agg
    for (final IRVertex trigger : triggerList) {
      final IREdge edgeToMav = edgeToMessageAggregator(
        trigger, messageAggregatorVertex, triggerOutputEncoder, triggerOutputDecoder);
      builder.connectVertices(edgeToMav);
    }

    // From agg to dst
    // Add a control dependency (no output) from the messageAggregatorVertex to the destination.
    IREdge aggToDst = Util.createControlEdge(
      messageAggregatorVertex, edgesToGetStatisticsOf.iterator().next().getDst());
    if (edgesToGetStatisticsOf.iterator().next().getDst() instanceof TaskSizeSplitterVertex) {
      builder.connectSplitterVertexWithoutReplacing(edgesToGetStatisticsOf.iterator().next(), aggToDst);
    } else {
      builder.connectVertices(aggToDst);
    }


    ////////////////////////////////// STEP 2: Annotate the MessageId on optimization target edges

    modifiedDAG.topologicalDo(v ->
      modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
        if (edgesToOptimize.contains(inEdge)) {
          final HashSet<Integer> msgEdgeIds =
            inEdge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
          msgEdgeIds.add(messageAggregatorVertex.getPropertyValue(MessageIdVertexProperty.class).get());
          inEdge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
        }
      })
    );

    final Set<IRVertex> insertedVertices = new HashSet<>();
    insertedVertices.addAll(triggerList);
    insertedVertices.add(messageAggregatorVertex);
    triggerList.forEach(trigger -> messageVertexToGroup.put(trigger, insertedVertices));
    messageVertexToGroup.put(messageAggregatorVertex, insertedVertices);

    modifiedDAG = builder.build(); // update the DAG.
  }