in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [764:863]
public void getMemoryShuffleData(
GetMemoryShuffleDataRequest request,
StreamObserver<GetMemoryShuffleDataResponse> responseObserver) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
long blockId = request.getLastBlockId();
int readBufferSize = request.getReadBufferSize();
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getGrpcMetrics()
.recordTransportTime(
ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
}
}
long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetMemoryShuffleDataResponse reply;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
// todo: if can get the exact memory size?
if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) {
try {
Roaring64NavigableMap expectedTaskIds = null;
if (request.getSerializedExpectedTaskIdsBitmap() != null
&& !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
expectedTaskIds =
RssUtils.deserializeBitMap(
request.getSerializedExpectedTaskIdsBitmap().toByteArray());
}
ShuffleDataResult shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
appId, shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds);
byte[] data = new byte[] {};
List<BufferSegment> bufferSegments = Lists.newArrayList();
if (shuffleDataResult != null) {
data = shuffleDataResult.getData();
bufferSegments = shuffleDataResult.getBufferSegments();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
}
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, costTime);
LOG.info(
"Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}",
costTime,
data.length,
requestInfo);
reply =
GetMemoryShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(data))
.addAllShuffleDataBlockSegments(toShuffleDataBlockSegments(bufferSegments))
.build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg =
"Error happened when get in memory shuffle data for "
+ requestInfo
+ ", "
+ e.getMessage();
LOG.error(msg, e);
reply =
GetMemoryShuffleDataResponse.newBuilder()
.setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
.addAllShuffleDataBlockSegments(Lists.newArrayList())
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
} finally {
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
}
} else {
status = StatusCode.INTERNAL_ERROR;
msg = "Can't require memory to get in memory shuffle data";
LOG.error(msg + " for " + requestInfo);
reply =
GetMemoryShuffleDataResponse.newBuilder()
.setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
.addAllShuffleDataBlockSegments(Lists.newArrayList())
.setStatus(status.toProto())
.setRetMsg(msg)
.build();
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
}