in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1581:1749]
public void getSortedShuffleData(
RssProtos.GetSortedShuffleDataRequest request,
StreamObserver<RssProtos.GetSortedShuffleDataResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("getSortedShuffleData")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
long blockId = request.getMergedBlockId();
long timestamp = request.getTimestamp();
auditContext
.withAppId(appId)
.withShuffleId(shuffleId)
.withArgs(String.format("partitionId=%d, blockId=%d", partitionId, blockId));
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getGrpcMetrics()
.recordTransportTime(
ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, transportTime);
}
}
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
RssProtos.GetSortedShuffleDataResponse reply = null;
ShuffleDataResult sdr = null;
String requestInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "]"
+ "blockId["
+ blockId
+ "]";
if (!shuffleServer.isRemoteMergeEnable()) {
msg = "Remote merge is disabled";
status = StatusCode.INTERNAL_ERROR;
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
MergeStatus mergeStatus =
shuffleServer
.getShuffleMergeManager()
.tryGetBlock(appId, shuffleId, partitionId, blockId);
MergeState mergeState = mergeStatus.getState();
long blockSize = mergeStatus.getSize();
if (mergeState == MergeState.INITED) {
msg = MergeState.INITED.name();
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.setMState(mergeState.code())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
} else if (mergeState == MergeState.MERGING && blockSize == -1) {
// Notify the client that all merged data has been read, but there may be data that has not
// yet been merged.
msg = MergeState.MERGING.name();
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setNextBlockId(-1)
.setRetMsg(msg)
.setMState(mergeState.code())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
} else if (mergeState == MergeState.DONE && blockSize == -1) {
// Notify the client that all data has been read
msg = MergeState.DONE.name();
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setNextBlockId(-1)
.setRetMsg(msg)
.setMState(mergeState.code())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
} else if (mergeState == MergeState.INTERNAL_ERROR) {
msg = MergeState.INTERNAL_ERROR.name();
status = StatusCode.INTERNAL_ERROR;
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.setMState(mergeState.code())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
if (shuffleServer.getShuffleBufferManager().requireReadMemory(blockSize)) {
try {
long start = System.currentTimeMillis();
sdr =
shuffleServer
.getShuffleMergeManager()
.getShuffleData(appId, shuffleId, partitionId, blockId);
long readTime = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, readTime);
LOG.info(
"Successfully getSortedShuffleData cost {} ms for shuffle"
+ " data with {}, length is {}, state is {}",
readTime,
requestInfo,
sdr.getDataLength(),
mergeState);
auditContext.withReturnValue("len=" + sdr.getDataLength());
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setNextBlockId(blockId + 1) // next block id
.setMState(mergeState.code())
.setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(sdr.getData(), 0, sdr.getDataLength()))
.build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
if (sdr != null) {
sdr.release();
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(blockSize);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require read memory to get sorted shuffle data";
LOG.error(msg + " for " + requestInfo);
reply =
RssProtos.GetSortedShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}