in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [369:417]
public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
final ControlMessage.BlockTransferContextDescriptor descriptor =
ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
final String blockId = descriptor.getBlockId();
final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
backgroundExecutorService.submit(new Runnable() {
@Override
public void run() {
try {
final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
if (optionalBlock.isPresent()) {
if (DataStoreProperty.Value.LOCAL_FILE_STORE.equals(blockStore)
|| DataStoreProperty.Value.GLUSTER_FILE_STORE.equals(blockStore)) {
final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
for (final FileArea fileArea : fileAreas) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
os.writeFileArea(fileArea);
}
}
} else {
final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
for (final SerializedPartition partition : partitions) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
if (optionalBlock.get().getClass() == SerializedMemoryBlock.class) {
os.writeSerializedPartitionBuffer(partition, false);
} else {
// For NonSerializedMemoryBlock, the serialized partition to be sent is transient and needs
// to be released right after the data transfer.
os.writeSerializedPartitionBuffer(partition, true);
}
}
}
}
handleDataPersistence(blockStore, blockId);
outputContext.close();
} else {
// We don't have the block here...
throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
}
} catch (final IOException | BlockFetchException e) {
LOG.error("Closing a block request exceptionally", e);
outputContext.onChannelError(e);
}
}
});
}