public CompletableFuture readBlock()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [180:270]


  public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
    final String blockIdWildcard,
    final String runtimeEdgeId,
    final ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties,
    final KeyRange keyRange) {
    // Let's see if a remote worker has it
    final CompletableFuture<ControlMessage.Message> blockLocationFuture;
    try {
      blockLocationFuture = blockLocationResponseCache.get(blockIdWildcard);
    } catch (ExecutionException e) {
      throw new RuntimeException(e); // This should never happen, since we're only getting a "future"
    }

    // Using thenCompose so that fetching block data starts after getting response from master.
    return blockLocationFuture.thenCompose(responseFromMaster -> {
      if (responseFromMaster.getType() != ControlMessage.MessageType.BlockLocationInfo) {
        throw new RuntimeException("Response message type mismatch!");
      }

      final ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg =
        responseFromMaster.getBlockLocationInfoMsg();
      if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
        throw new BlockFetchException(new Throwable(
          "Block " + blockIdWildcard + " location unknown: "
            + "The block state is " + blockLocationInfoMsg.getState()));
      }

      // This is the executor id that we wanted to know
      final String blockId = blockLocationInfoMsg.getBlockId();
      final String targetExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
      final DataStoreProperty.Value blockStore = edgeProperties.get(DataStoreProperty.class).get();
      if (targetExecutorId.equals(executorId) || targetExecutorId.equals(REMOTE_FILE_STORE)) {
        // Block resides in the evaluator
        return getDataFromLocalBlock(blockId, blockStore, keyRange);
      } else {
        final ControlMessage.BlockTransferContextDescriptor descriptor =
          ControlMessage.BlockTransferContextDescriptor.newBuilder()
            .setBlockId(blockId)
            .setBlockStore(convertBlockStore(blockStore))
            .setRuntimeEdgeId(runtimeEdgeId)
            .setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
            .build();
        final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
          .requestTransferPermission(runtimeEdgeId)
          .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), false));

        // whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
        // even on failures. Actual failure handling and Task retry will be done by DataFetcher.
        contextFuture.whenComplete((connectionContext, connectionThrowable) -> {
          if (connectionThrowable != null) {
            // Something wrong with the connection. Notify blockTransferThrottler immediately.
            blockTransferThrottler.onTransferFinished(runtimeEdgeId);
          } else {
            // Connection is okay. Notify blockTransferThrottler when the actual transfer is done, or fails.
            connectionContext.getCompletedFuture().whenComplete((transferContext, transferThrowable) ->
              blockTransferThrottler.onTransferFinished(runtimeEdgeId)
            );
          }
        });

        final BlockFetchFailureProperty.Value fetchFailure = edgeProperties.get(BlockFetchFailureProperty.class)
          .orElse(BlockFetchFailureProperty.Value.CANCEL_TASK); // the default behavior.
        if (!fetchFailure.equals(BlockFetchFailureProperty.Value.CANCEL_TASK)) {
          /*
            Wait until fetching "all elements" of each block.

            Problem: If the task won't be cancelled upon fetch failure, then the task can potentially
            process blocks partially or process the same elements more than once.

            Solution: With this waiting, a task that fetches a block either
            - Processes all elements of the block
            - Processes no element of the block (i.e., Runs into a block fetch exception while waiting)
           */
          return contextFuture
            .thenCompose(ByteInputContext::getCompletedFuture)
            // thenApply waits for the future.
            .thenApply(streams -> new DataUtil.InputStreamIterator<>(
              streams, serializerManager.getSerializer(runtimeEdgeId)));
        } else {
          /*
            Process "each element" of a block as soon as the element comes in.
            No worries about partial/duplicate processing here, as the task will be cancelled and restarted cleanly.
            Probably best performance when there is no failure.
           */
          return contextFuture
            .thenApply(context -> new DataUtil.InputStreamIterator<>(context.getInputStreams(),
              serializerManager.getSerializer(runtimeEdgeId)));
        }
      }
    });
  }