in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [774:875]
public void handleGetSortedShuffleDataRequest(
TransportClient client, GetSortedShuffleDataRequest req) {
final long start = System.currentTimeMillis();
long requestId = req.getRequestId();
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
long blockId = req.getBlockId();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
long transportTime = start - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(GetSortedShuffleDataRequest.class.getName(), transportTime);
}
}
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetSortedShuffleDataResponse response;
String requestInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "], blockId["
+ blockId
+ "]";
if (!shuffleServer.isRemoteMergeEnable()) {
msg = "Remote merge is disabled";
status = StatusCode.INTERNAL_ERROR;
response =
new GetSortedShuffleDataResponse(
requestId, status, msg, -1, MergeState.INTERNAL_ERROR.code(), Unpooled.EMPTY_BUFFER);
client.getChannel().writeAndFlush(response);
return;
}
MergeStatus mergeStatus =
shuffleServer.getShuffleMergeManager().tryGetBlock(appId, shuffleId, partitionId, blockId);
MergeState mergeState = mergeStatus.getState();
long readBlockSize = mergeStatus.getSize();
if (mergeState == MergeState.INITED
|| (mergeState == MergeState.MERGING && readBlockSize == -1)
|| (mergeState == MergeState.DONE && readBlockSize == -1)
|| mergeState == MergeState.INTERNAL_ERROR) {
msg = mergeState.name();
response =
new GetSortedShuffleDataResponse(
requestId, status, msg, -1, mergeState.code(), Unpooled.EMPTY_BUFFER);
client.getChannel().writeAndFlush(response);
return;
}
if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBlockSize)) {
ShuffleDataResult sdr = null;
try {
sdr =
shuffleServer
.getShuffleMergeManager()
.getShuffleData(appId, shuffleId, partitionId, blockId);
response =
new GetSortedShuffleDataResponse(
requestId, status, msg, blockId + 1, mergeState.code(), sdr.getManagedBuffer());
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
start, readBlockSize, sdr.getDataLength(), requestInfo, req, response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
} catch (Exception e) {
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBlockSize);
if (sdr != null) {
sdr.release();
}
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
response =
new GetSortedShuffleDataResponse(
requestId,
status,
msg,
-1,
MergeState.INTERNAL_ERROR.code(),
Unpooled.EMPTY_BUFFER);
client.getChannel().writeAndFlush(response);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require read memory to get sorted shuffle data";
LOG.error(msg + " for " + requestInfo);
response =
new GetSortedShuffleDataResponse(
requestId, status, msg, -1, mergeState.code(), Unpooled.EMPTY_BUFFER);
client.getChannel().writeAndFlush(response);
}
}