in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [216:296]
public void handleGetMemoryShuffleDataRequest(
TransportClient client, GetMemoryShuffleDataRequest req) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
long blockId = req.getLastBlockId();
int readBufferSize = req.getReadBufferSize();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
}
}
long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetMemoryShuffleDataResponse response;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
// todo: if can get the exact memory size?
if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) {
try {
ShuffleDataResult shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
appId,
shuffleId,
partitionId,
blockId,
readBufferSize,
req.getExpectedTaskIdsBitmap());
ByteBuf data = Unpooled.EMPTY_BUFFER;
List<BufferSegment> bufferSegments = Lists.newArrayList();
if (shuffleDataResult != null) {
data = Unpooled.wrappedBuffer(shuffleDataResult.getDataBuffer());
bufferSegments = shuffleDataResult.getBufferSegments();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.readableBytes());
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.readableBytes());
}
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getNettyMetrics()
.recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), costTime);
LOG.info(
"Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}",
costTime,
data.readableBytes(),
requestInfo);
response =
new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data);
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg =
"Error happened when get in memory shuffle data for "
+ requestInfo
+ ", "
+ e.getMessage();
LOG.error(msg, e);
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
} finally {
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
}
} else {
status = StatusCode.INTERNAL_ERROR;
msg = "Can't require memory to get in memory shuffle data";
LOG.error(msg + " for " + requestInfo);
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
}
client.getChannel().writeAndFlush(response);
}