in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [408:621]
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData")) {
SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
int stageAttemptNumber = req.getStageAttemptNumber();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"requireBufferId="
+ requireBufferId
+ ", timestamp="
+ timestamp
+ ", stageAttemptNumber="
+ stageAttemptNumber
+ ", shuffleDataSize="
+ req.getShuffleDataCount());
ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
if (taskInfo == null) {
String errorMsg =
"APP_NOT_FOUND error, requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]";
LOG.error(errorMsg);
ShuffleServerMetrics.counterAppNotFound.inc();
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.APP_NOT_FOUND.toProto())
.setRetMsg(errorMsg)
.build();
auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
// The Stage retry occurred, and the task before StageNumber was simply ignored and not
// processed if the task was being sent.
if (stageAttemptNumber < latestStageAttemptNumber) {
String responseMessage = "A retry has occurred at the Stage, sending data is invalid.";
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.STAGE_RETRY_IGNORE.toProto())
.setRetMsg(responseMessage)
.build();
auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getGrpcMetrics()
.recordTransportTime(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getShuffleDataCount() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]";
LOG.warn(errorMsg);
responseMessage = errorMsg;
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg(responseMessage)
.build();
auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ spd.getPartitionId()
+ "]";
try {
ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ", statusCode="
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
hasFailureOccurred = true;
break;
} else {
if (shuffleServer.isRemoteMergeEnable()) {
shuffleServer.getShuffleMergeManager().setDirect(appId, shuffleId, false);
}
long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
manager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
"ExceedHugePartitionHardLimitException Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
responseMessage = errorMsg;
LOG.error(errorMsg);
hasFailureOccurred = true;
} catch (Exception e) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
hasFailureOccurred = true;
break;
} finally {
if (hasFailureOccurred) {
shuffleServer
.getShuffleBufferManager()
.releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
}
}
}
// since the required buffer id is only used once, the shuffle client would try to require
// another buffer whether
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
if (info.getRequireSize() > alreadyReleasedSize) {
manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
}
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(ret.toProto())
.setRetMsg(responseMessage)
.build();
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cache Shuffle Data for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedDataList.size()
+ " blocks and "
+ requireSize
+ " bytes");
}
} else {
reply =
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg("No data in request")
.build();
}
auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}