public static PhysicalPlan appendPlan()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanAppender.java [61:130]


  public static PhysicalPlan appendPlan(final PhysicalPlan originalPlan,
                                        final PhysicalPlan planToAppend) {
    // Add the stage DAG of the original plan to the builder at first.
    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder = new DAGBuilder<>(originalPlan.getStageDAG());

    // Scan cached data in the original plan.
    final Map<UUID, StageEdge> cachedEdges = new HashMap<>();
    originalPlan.getStageDAG().getVertices().forEach(
      stage -> originalPlan.getStageDAG().getIncomingEdgesOf(stage).stream()
        // Cached edge toward a temporary data receiver is a representative edge.
        .filter(stageEdge ->
          stageEdge.getDstIRVertex().getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).isPresent())
        .forEach(stageEdge -> stageEdge.getPropertyValue(CacheIDProperty.class)
          .ifPresent(cacheId -> cachedEdges.put(cacheId, stageEdge))
        ));

    // Scan CacheID to a pair of cached source vertex and it's stage from the plan to append.
    final Map<UUID, Pair<IRVertex, Stage>> cacheCandidates = new HashMap<>();
    final DAG<Stage, StageEdge> dagToAppend = planToAppend.getStageDAG();
    dagToAppend.topologicalDo(stage -> {
      // Add the stage DAG of the plan to append to the builder.
      physicalDAGBuilder.addVertex(stage);
      dagToAppend.getIncomingEdgesOf(stage).
        forEach(edge -> {
          physicalDAGBuilder.connectVertices(edge);
          // Find cached-data requiring stage edges in the submitted plan.
          if (edge.getSrcIRVertex() instanceof CachedSourceVertex) {
            final UUID cacheId = edge.getPropertyValue(CacheIDProperty.class)
              .orElseThrow(() -> new PlanAppenderException("No cache id in the cached edge " + edge.getId()));
            cacheCandidates.put(cacheId, Pair.of(edge.getSrcIRVertex(), edge.getSrc()));
          }
        });

      // Find cached-data requiring ir edges in the submitted plan.
      final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stage.getIRDAG();
      stageIRDAG.getVertices().stream()
        .filter(irVertex -> irVertex instanceof CachedSourceVertex)
        .forEach(cachedSourceVertex ->
          stageIRDAG.getOutgoingEdgesOf(cachedSourceVertex).forEach(runtimeEdge -> {
            final UUID cacheId = runtimeEdge.getPropertyValue(CacheIDProperty.class)
              .orElseThrow(
                () -> new PlanAppenderException("No cache id in the cached edge " + runtimeEdge.getId()));
            cacheCandidates.put(cacheId, Pair.of(runtimeEdge.getSrc(), stage));
          }));
    });

    // Link the cached data and the stages require the data.
    cacheCandidates.forEach((cacheId, vertexStagePair) -> {
      final StageEdge cachedEdge = cachedEdges.get(cacheId);
      if (cachedEdge != null) {
        final StageEdge newEdge = new StageEdge(
          IdManager.newEdgeId(),
          cachedEdge.getExecutionProperties(),
          cachedEdge.getSrcIRVertex(),
          vertexStagePair.left(),
          cachedEdge.getSrc(),
          vertexStagePair.right());
        physicalDAGBuilder.connectVertices(newEdge);
        final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue =
          cachedEdge.getPropertyValue(DuplicateEdgeGroupProperty.class)
            .orElseThrow(() -> new PlanAppenderException("Cached edge does not have duplicated edge group property."));
        duplicateEdgeGroupPropertyValue.setGroupSize(duplicateEdgeGroupPropertyValue.getGroupSize() + 1);
        newEdge.getExecutionProperties().put(DuplicateEdgeGroupProperty.of(duplicateEdgeGroupPropertyValue));
      } else {
        throw new PlanAppenderException("Cached edge is not found in the original plan.");
      }
    });

    return new PhysicalPlan(originalPlan.getPlanId(), physicalDAGBuilder.build());
  }