in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1117:1265]
public void getLocalShuffleData(
GetLocalShuffleDataRequest request,
StreamObserver<GetLocalShuffleDataResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleData")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
int partitionNumPerRange = request.getPartitionNumPerRange();
int partitionNum = request.getPartitionNum();
long offset = request.getOffset();
int length = request.getLength();
int storageId = request.getStorageId();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"partitionId="
+ partitionId
+ ", partitionNumPerRange="
+ partitionNumPerRange
+ ", partitionNum="
+ partitionNum
+ ", offset="
+ offset
+ ", length="
+ length
+ ", storageId="
+ storageId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
auditContext.withStatusCode(status);
GetLocalShuffleDataResponse response =
GetLocalShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(status.toString())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
return;
}
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getGrpcMetrics()
.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
}
}
String storageType =
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
String msg = "OK";
GetLocalShuffleDataResponse reply = null;
ShuffleDataResult sdr = null;
String requestInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "]"
+ "offset["
+ offset
+ "]"
+ "length["
+ length
+ "]";
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
Storage storage =
shuffleServer
.getStorageManager()
.selectStorage(
new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
if (shuffleServer.getShuffleBufferManager().requireReadMemory(length)) {
try {
long start = System.currentTimeMillis();
sdr =
shuffleServer
.getShuffleTaskManager()
.getShuffleData(
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
storageType,
offset,
length,
storageId);
reply =
GetLocalShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(sdr.getData()))
.build();
long readTime = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
LOG.info(
"Successfully getShuffleData cost {} ms for shuffle data with {}",
readTime,
requestInfo);
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
reply =
GetLocalShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
if (sdr != null) {
sdr.release();
ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(length);
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require memory to get shuffle data";
LOG.warn("{} for {}", msg, requestInfo);
reply =
GetLocalShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
auditContext.withStatusCode(status);
auditContext.withReturnValue("len=" + (sdr == null ? 0 : sdr.getDataLength()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}