in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [940:1025]
public void operationComplete(ChannelFuture future) {
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
long readTime = System.currentTimeMillis() - readStartedTime;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime);
if (request instanceof GetLocalShuffleDataRequest) {
ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(readBufferSize);
} else if (request instanceof GetLocalShuffleIndexRequest) {
ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(readBufferSize);
} else if (request instanceof GetMemoryShuffleDataRequest) {
GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
(GetMemoryShuffleDataResponse) response;
if (CollectionUtils.isNotEmpty(getMemoryShuffleDataResponse.getBufferSegments())) {
ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
}
}
if (!future.isSuccess()) {
Throwable cause = future.cause();
String errorMsg =
"Error happened when executing "
+ request.getOperationType()
+ " for "
+ requestInfo
+ ", "
+ cause.getMessage();
if (future.channel().isWritable()) {
RpcResponse errorResponse;
if (request instanceof GetLocalShuffleDataRequest) {
errorResponse =
new GetLocalShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
} else if (request instanceof GetLocalShuffleIndexRequest) {
errorResponse =
new GetLocalShuffleIndexResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Unpooled.EMPTY_BUFFER,
0L);
} else if (request instanceof GetMemoryShuffleDataRequest) {
errorResponse =
new GetMemoryShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
} else if (request instanceof GetSortedShuffleDataRequest) {
errorResponse =
new GetSortedShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
-1L,
MergeState.INTERNAL_ERROR.code(),
Unpooled.EMPTY_BUFFER);
} else {
LOG.error("Cannot handle request {}", request.type(), cause);
return;
}
client.getChannel().writeAndFlush(errorResponse);
}
LOG.error(
"Failed to execute {} for {}. Took {} ms and could not retrieve {} bytes of data",
request.getOperationType(),
requestInfo,
readTime,
dataSize,
cause);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Successfully executed {} for {}. Took {} ms and retrieved {} bytes of data",
request.getOperationType(),
requestInfo,
readTime,
dataSize);
}
}
}