in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [380:476]
public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
int partitionNumPerRange = req.getPartitionNumPerRange();
int partitionNum = req.getPartitionNum();
long offset = req.getOffset();
int length = req.getLength();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(GetLocalShuffleDataRequest.class.getName(), transportTime);
}
}
String storageType =
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleDataResponse response;
ShuffleDataResult sdr;
String requestInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "]"
+ "offset["
+ offset
+ "]"
+ "length["
+ length
+ "]";
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
Storage storage =
shuffleServer
.getStorageManager()
.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
try {
long start = System.currentTimeMillis();
sdr =
shuffleServer
.getShuffleTaskManager()
.getShuffleData(
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
storageType,
offset,
length);
long readTime = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
shuffleServer
.getNettyMetrics()
.recordProcessTime(GetLocalShuffleDataRequest.class.getName(), readTime);
LOG.info(
"Successfully getShuffleData cost {} ms for shuffle" + " data with {}",
readTime,
requestInfo);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, sdr.getManagedBuffer());
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
} finally {
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
} else {
status = StatusCode.INTERNAL_ERROR;
msg = "Can't require memory to get shuffle data";
LOG.error(msg + " for " + requestInfo);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
}
client.getChannel().writeAndFlush(response);
}