in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [176:310]
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getGrpcMetrics()
.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getShuffleDataCount() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]";
LOG.warn(errorMsg);
responseMessage = errorMsg;
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg(responseMessage)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
long alreadyReleasedSize = 0;
for (ShufflePartitionedData spd : shufflePartitionedData) {
String shuffleDataInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ spd.getPartitionId()
+ "]";
try {
ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ", statusCode="
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
break;
} else {
long toReleasedSize = spd.getTotalBlockSize();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
manager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (Exception e) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
break;
}
}
// since the required buffer id is only used once, the shuffle client would try to require
// another buffer whether
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
if (info.getRequireSize() > alreadyReleasedSize) {
manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
}
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(ret.toProto())
.setRetMsg(responseMessage)
.build();
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
LOG.debug(
"Cache Shuffle Data for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedData.size()
+ " blocks and "
+ requireSize
+ " bytes");
} else {
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg("No data in request")
.build();
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
}