public CompletableFuture read()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java [76:135]


  public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
                                                               final RuntimeEdge runtimeEdge,
                                                               final int dstTaskIndex) {
    final String runtimeEdgeId = runtimeEdge.getId();
    // Get the location of the src task (blocking call)
    final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
      .getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
        ControlMessage.Message.newBuilder()
          .setId(RuntimeIdManager.generateMessageId())
          .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
          .setType(ControlMessage.MessageType.RequestPipeLoc)
          .setRequestPipeLocMsg(
            ControlMessage.RequestPipeLocationMessage.newBuilder()
              .setExecutorId(executorId)
              .setRuntimeEdgeId(runtimeEdgeId)
              .setSrcTaskIndex(srcTaskIndex)
              .build())
          .build());
    return responseFromMasterFuture.thenCompose(responseFromMaster -> {
      // Get executor id
      if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
        throw new RuntimeException("Response message type mismatch!");
      }
      final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
      if (!pipeLocInfo.hasExecutorId()) {
        throw new IllegalStateException();
      }
      final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();

      if (targetExecutorId.equals(executorId)) {
        // Read from the local executor
        final Pair<String, Long> pairKey = Pair.of(runtimeEdge.getId(), Long.valueOf(srcTaskIndex));
        pipeContainer.putPipeListIfAbsent(pairKey, getNumOfPipeToWait(runtimeEdge));

        // initialize a local output context
        final LocalOutputContext outputContext =
          new LocalOutputContext(executorId, runtimeEdgeId, srcTaskIndex, dstTaskIndex);
        pipeContainer.putPipe(pairKey, dstTaskIndex, outputContext);

        // Initialize a local input context and connect it to the corresponding local output context
        final LocalInputContext inputContext = new LocalInputContext(outputContext);
        final CompletableFuture<DataUtil.IteratorWithNumBytes> result = new CompletableFuture<>();
        result.complete(DataUtil.IteratorWithNumBytes.of(inputContext.getIterator()));
        return result;
      } else {
        // Read from the remote executor
        final ControlMessage.PipeTransferContextDescriptor descriptor =
          ControlMessage.PipeTransferContextDescriptor.newBuilder()
            .setRuntimeEdgeId(runtimeEdge.getId())
            .setSrcTaskIndex(srcTaskIndex)
            .setDstTaskIndex(dstTaskIndex)
            .setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
            .build();

        return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
          .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
            serializerManager.getSerializer(runtimeEdge.getId())));
      }
    });
  }