in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [413:522]
public void handleGetMemoryShuffleDataRequest(
TransportClient client, GetMemoryShuffleDataRequest req) {
try (ServerRpcAuditContext auditContext = createAuditContext("getMemoryShuffleData", client)) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
long blockId = req.getLastBlockId();
int readBufferSize = req.getReadBufferSize();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"requestId="
+ req.getRequestId()
+ ", partitionId="
+ partitionId
+ ", blockId="
+ blockId
+ ", readBufferSize="
+ readBufferSize);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
auditContext.withStatusCode(status);
GetMemoryShuffleDataResponse response =
new GetMemoryShuffleDataResponse(
req.getRequestId(),
status,
status.toString(),
Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
client.getChannel().writeAndFlush(response);
return;
}
long timestamp = req.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
}
}
String msg = "OK";
GetMemoryShuffleDataResponse response;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
// todo: if can get the exact memory size?
if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
ShuffleDataResult shuffleDataResult = null;
try {
final long start = System.currentTimeMillis();
shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
appId,
shuffleId,
partitionId,
blockId,
readBufferSize,
req.getExpectedTaskIdsBitmap());
ManagedBuffer data = NettyManagedBuffer.EMPTY_BUFFER;
List<BufferSegment> bufferSegments = Lists.newArrayList();
if (shuffleDataResult != null) {
data = shuffleDataResult.getManagedBuffer();
bufferSegments = shuffleDataResult.getBufferSegments();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
}
auditContext.withStatusCode(status);
auditContext.withReturnValue(
"len=" + data.size() + ", bufferSegments=" + bufferSegments.size());
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, bufferSegments, data);
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
start, readBufferSize, data.size(), requestInfo, req, response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
return;
} catch (Exception e) {
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
if (shuffleDataResult != null) {
shuffleDataResult.release();
}
status = StatusCode.INTERNAL_ERROR;
msg =
"Error happened when get in memory shuffle data for "
+ requestInfo
+ ", "
+ e.getMessage();
LOG.error(msg, e);
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require memory to get in memory shuffle data";
LOG.warn("{} for {}", msg, requestInfo);
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
}
auditContext.withStatusCode(response.getStatusCode());
client.getChannel().writeAndFlush(response);
}
}