in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [180:270]
public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
final String blockIdWildcard,
final String runtimeEdgeId,
final ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties,
final KeyRange keyRange) {
// Let's see if a remote worker has it
final CompletableFuture<ControlMessage.Message> blockLocationFuture;
try {
blockLocationFuture = blockLocationResponseCache.get(blockIdWildcard);
} catch (ExecutionException e) {
throw new RuntimeException(e); // This should never happen, since we're only getting a "future"
}
// Using thenCompose so that fetching block data starts after getting response from master.
return blockLocationFuture.thenCompose(responseFromMaster -> {
if (responseFromMaster.getType() != ControlMessage.MessageType.BlockLocationInfo) {
throw new RuntimeException("Response message type mismatch!");
}
final ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg =
responseFromMaster.getBlockLocationInfoMsg();
if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
throw new BlockFetchException(new Throwable(
"Block " + blockIdWildcard + " location unknown: "
+ "The block state is " + blockLocationInfoMsg.getState()));
}
// This is the executor id that we wanted to know
final String blockId = blockLocationInfoMsg.getBlockId();
final String targetExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
final DataStoreProperty.Value blockStore = edgeProperties.get(DataStoreProperty.class).get();
if (targetExecutorId.equals(executorId) || targetExecutorId.equals(REMOTE_FILE_STORE)) {
// Block resides in the evaluator
return getDataFromLocalBlock(blockId, blockStore, keyRange);
} else {
final ControlMessage.BlockTransferContextDescriptor descriptor =
ControlMessage.BlockTransferContextDescriptor.newBuilder()
.setBlockId(blockId)
.setBlockStore(convertBlockStore(blockStore))
.setRuntimeEdgeId(runtimeEdgeId)
.setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
.build();
final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
.requestTransferPermission(runtimeEdgeId)
.thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), false));
// whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
// even on failures. Actual failure handling and Task retry will be done by DataFetcher.
contextFuture.whenComplete((connectionContext, connectionThrowable) -> {
if (connectionThrowable != null) {
// Something wrong with the connection. Notify blockTransferThrottler immediately.
blockTransferThrottler.onTransferFinished(runtimeEdgeId);
} else {
// Connection is okay. Notify blockTransferThrottler when the actual transfer is done, or fails.
connectionContext.getCompletedFuture().whenComplete((transferContext, transferThrowable) ->
blockTransferThrottler.onTransferFinished(runtimeEdgeId)
);
}
});
final BlockFetchFailureProperty.Value fetchFailure = edgeProperties.get(BlockFetchFailureProperty.class)
.orElse(BlockFetchFailureProperty.Value.CANCEL_TASK); // the default behavior.
if (!fetchFailure.equals(BlockFetchFailureProperty.Value.CANCEL_TASK)) {
/*
Wait until fetching "all elements" of each block.
Problem: If the task won't be cancelled upon fetch failure, then the task can potentially
process blocks partially or process the same elements more than once.
Solution: With this waiting, a task that fetches a block either
- Processes all elements of the block
- Processes no element of the block (i.e., Runs into a block fetch exception while waiting)
*/
return contextFuture
.thenCompose(ByteInputContext::getCompletedFuture)
// thenApply waits for the future.
.thenApply(streams -> new DataUtil.InputStreamIterator<>(
streams, serializerManager.getSerializer(runtimeEdgeId)));
} else {
/*
Process "each element" of a block as soon as the element comes in.
No worries about partial/duplicate processing here, as the task will be cancelled and restarted cleanly.
Probably best performance when there is no failure.
*/
return contextFuture
.thenApply(context -> new DataUtil.InputStreamIterator<>(context.getInputStreams(),
serializerManager.getSerializer(runtimeEdgeId)));
}
}
});
}