in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1268:1393]
public void getLocalShuffleIndex(
GetLocalShuffleIndexRequest request,
StreamObserver<GetLocalShuffleIndexResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleIndex")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
int partitionNumPerRange = request.getPartitionNumPerRange();
int partitionNum = request.getPartitionNum();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"partitionId="
+ partitionId
+ ", partitionNumPerRange="
+ partitionNumPerRange
+ ", partitionNum="
+ partitionNum);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
auditContext.withStatusCode(status);
GetLocalShuffleIndexResponse reply =
GetLocalShuffleIndexResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(status.toString())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
String msg = "OK";
GetLocalShuffleIndexResponse reply;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
Storage storage =
shuffleServer
.getStorageManager()
.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
// Index file is expected small size and won't cause oom problem with the assumed size. An
// index
// segment is 40B,
// with the default size - 2MB, it can support 50k blocks for shuffle data.
long assumedFileSize =
shuffleServer
.getShuffleServerConf()
.getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
if (shuffleServer.getShuffleBufferManager().requireReadMemory(assumedFileSize)) {
ShuffleIndexResult shuffleIndexResult = null;
try {
final long start = System.currentTimeMillis();
shuffleIndexResult =
shuffleServer
.getShuffleTaskManager()
.getShuffleIndex(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
ByteBuffer data = shuffleIndexResult.getIndexData();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
GetLocalShuffleIndexResponse.Builder builder =
GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
builder.addAllStorageIds(
Arrays.stream(shuffleIndexResult.getStorageIds())
.boxed()
.collect(Collectors.toList()));
long readTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD, readTime);
LOG.info(
"Successfully getShuffleIndex cost {} ms for {} bytes with {}",
readTime,
data.remaining(),
requestInfo);
auditContext.withReturnValue("len=" + shuffleIndexResult.getDataFileLen());
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn(
"Index file for {} is not found, maybe the data has been flushed to cold storage "
+ "or still in memory buffer pool.",
requestInfo,
indexFileNotFoundException);
reply = GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
reply =
GetLocalShuffleIndexResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
if (shuffleIndexResult != null) {
shuffleIndexResult.release();
ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(assumedFileSize);
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require memory to get shuffle index";
LOG.warn("{} for {}", msg, requestInfo);
reply =
GetLocalShuffleIndexResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
auditContext.withStatusCode(status);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}