in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [145:384]
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequestV1 req) {
try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData", client)) {
RpcResponse rpcResponse;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
int stageAttemptNumber = req.getStageAttemptNumber();
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
// info is null, means pre-allocated buffer has been removed by preAllocatedBufferCheck
// thread,
// otherwise we need to release the required size.
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
int requireSize = info == null ? 0 : info.getRequireSize();
int requireBlocksSize = Math.max(requireSize - req.getDecodedLength(), 0);
boolean isPreAllocated = info != null;
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"requireBufferId="
+ requireBufferId
+ ", requireSize="
+ requireSize
+ ", isPreAllocated="
+ isPreAllocated
+ ", requireBlocksSize="
+ requireBlocksSize
+ ", stageAttemptNumber="
+ stageAttemptNumber
+ ", partitionCount="
+ req.getPartitionToBlocks().size());
ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
if (taskInfo == null) {
rpcResponse =
new RpcResponse(
req.getRequestId(), StatusCode.APP_NOT_FOUND, "appId: " + appId + " not found");
String errorMsg =
"APP_NOT_FOUND error, requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], isPreAllocated["
+ isPreAllocated
+ "]";
LOG.error(errorMsg);
ShuffleServerMetrics.counterAppNotFound.inc();
releaseNettyBufferAndMetrics(
req,
appId,
shuffleId,
requireBufferId,
requireBlocksSize,
shuffleBufferManager,
info,
isPreAllocated);
auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
// The Stage retry occurred, and the task before StageNumber was simply ignored and not
// processed if the task was being sent.
if (stageAttemptNumber < latestStageAttemptNumber) {
String responseMessage = "A retry has occurred at the Stage, sending data is invalid.";
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.STAGE_RETRY_IGNORE, responseMessage);
LOG.warn(
"Stage retry occurred, appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], stageAttemptNumber["
+ stageAttemptNumber
+ "], latestStageAttemptNumber["
+ latestStageAttemptNumber
+ "]");
releaseNettyBufferAndMetrics(
req,
appId,
shuffleId,
requireBufferId,
requireBlocksSize,
shuffleBufferManager,
info,
isPreAllocated);
auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
}
}
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
if (!isPreAllocated) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
+ "] for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], probably because the pre-allocated buffer has expired. "
+ "Please increase the expiration time using "
+ ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED.key()
+ " in ShuffleServer's configuration";
LOG.warn(errorMsg);
rpcResponse = new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, errorMsg);
auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
final long start = System.currentTimeMillis();
shuffleBufferManager.releaseMemory(req.getDecodedLength(), false, true);
List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ spd.getPartitionId()
+ "]";
try {
if (hasFailureOccurred) {
continue;
}
ret = shuffleTaskManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ", statusCode="
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
hasFailureOccurred = true;
} else {
if (shuffleServer.isRemoteMergeEnable()) {
shuffleServer.getShuffleMergeManager().setDirect(appId, shuffleId, true);
}
long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
shuffleTaskManager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
"ExceedHugePartitionHardLimitException Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
responseMessage = errorMsg;
LOG.error(errorMsg);
hasFailureOccurred = true;
} catch (Exception e) {
String errorMsg =
"Error happened when shuffleEngine.write for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
hasFailureOccurred = true;
} finally {
// Once the cache failure occurs, we should explicitly release data held by byteBuf
if (hasFailureOccurred) {
Arrays.stream(spd.getBlockList()).forEach(block -> block.getData().release());
shuffleBufferManager.releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
}
}
}
// since the required buffer id is only used once, the shuffle client would try to require
// another buffer whether
// current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
if (requireBlocksSize > alreadyReleasedSize) {
shuffleTaskManager.releasePreAllocatedSize(requireBlocksSize - alreadyReleasedSize);
}
rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getNettyMetrics()
.recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cache Shuffle Data for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedDataList.size()
+ " blocks and "
+ requireBlocksSize
+ " bytes");
}
} else {
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
}
auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
}
}