in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [298:378]
public void handleGetLocalShuffleIndexRequest(
TransportClient client, GetLocalShuffleIndexRequest req) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
int partitionNumPerRange = req.getPartitionNumPerRange();
int partitionNum = req.getPartitionNum();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleIndexResponse response;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
Storage storage =
shuffleServer
.getStorageManager()
.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
// Index file is expected small size and won't cause oom problem with the assumed size. An index
// segment is 40B,
// with the default size - 2MB, it can support 50k blocks for shuffle data.
long assumedFileSize =
shuffleServer
.getShuffleServerConf()
.getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize)) {
try {
final long start = System.currentTimeMillis();
ShuffleIndexResult shuffleIndexResult =
shuffleServer
.getShuffleTaskManager()
.getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
ByteBuffer data = shuffleIndexResult.getIndexData();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(),
status,
msg,
Unpooled.wrappedBuffer(data),
shuffleIndexResult.getDataFileLen());
long readTime = System.currentTimeMillis() - start;
LOG.info(
"Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
readTime,
data.remaining(),
requestInfo);
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn(
"Index file for {} is not found, maybe the data has been flushed to cold storage.",
requestInfo,
indexFileNotFoundException);
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
} finally {
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
}
} else {
status = StatusCode.INTERNAL_ERROR;
msg = "Can't require memory to get shuffle index";
LOG.error(msg + " for " + requestInfo);
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
client.getChannel().writeAndFlush(response);
}