in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [94:214]
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]";
LOG.warn(errorMsg);
responseMessage = errorMsg;
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, responseMessage);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
for (ShufflePartitionedData spd : shufflePartitionedData) {
String shuffleDataInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ spd.getPartitionId()
+ "]";
try {
ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ", statusCode="
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
break;
} else {
long toReleasedSize = spd.getTotalBlockSize();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
manager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (Exception e) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
break;
}
}
// since the required buffer id is only used once, the shuffle client would try to require
// another buffer whether
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
if (info.getRequireSize() > alreadyReleasedSize) {
manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
}
rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getNettyMetrics()
.recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
LOG.debug(
"Cache Shuffle Data for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedData.size()
+ " blocks and "
+ requireSize
+ " bytes");
} else {
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
}
client.getChannel().writeAndFlush(rpcResponse);
}