in fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClientHandler.java [55:144]
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
boolean hasReleased = false;
try {
int frameLength = buffer.readInt();
ResponseType respType = ResponseType.forId(buffer.readByte());
if (respType == ResponseType.SUCCESS_RESPONSE) {
int requestId = buffer.readInt();
int messageSize = frameLength - RESPONSE_HEADER_LENGTH;
if (messageSize < 0) {
callback.onRequestFailure(
requestId,
new CorruptMessageException(
"Invalid response frame length "
+ frameLength
+ " which is must greater than "
+ RESPONSE_HEADER_LENGTH));
}
ApiMethod apiMethod = callback.getRequestApiMethod(requestId);
if (apiMethod == null) {
callback.onRequestFailure(
requestId,
new IllegalStateException(
"Unknown request, this might be caused by the"
+ " request has been timeout."));
return;
}
ApiMessage response = apiMethod.getResponseConstructor().get();
if (response.isLazilyParsed()) {
// copy the buffer into a heap buffer, this can avoid the network buffer
// being released before the bytes fields of the response are lazily parsed.
ByteBuf copiedBuffer = Unpooled.buffer(messageSize, messageSize);
copiedBuffer.writeBytes(buffer, messageSize);
// response parsed from the copied buffer can be safely cached in user queues.
response.parseFrom(copiedBuffer, messageSize);
} else {
response.parseFrom(buffer, messageSize);
// eagerly release the buffer to make the buffer recycle faster
buffer.release();
hasReleased = true;
}
callback.onRequestResult(requestId, response);
} else if (respType == ResponseType.ERROR_RESPONSE) {
int requestId = buffer.readInt();
int messageSize = frameLength - RESPONSE_HEADER_LENGTH;
if (messageSize < 0) {
callback.onRequestFailure(
requestId,
new CorruptMessageException(
"Invalid response frame length "
+ frameLength
+ " which is must greater than "
+ RESPONSE_HEADER_LENGTH));
}
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.parseFrom(buffer, messageSize);
ApiError error = ApiError.fromErrorMessage(errorResponse);
callback.onRequestFailure(requestId, error.exception());
} else if (respType == ResponseType.SERVER_FAILURE) {
int messageSize = frameLength - SERVER_FAILURE_HEADER_LENGTH;
if (messageSize < 0) {
throw new CorruptMessageException(
"Invalid server failure frame length "
+ frameLength
+ " which is must greater than "
+ SERVER_FAILURE_HEADER_LENGTH);
}
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.parseFrom(buffer, messageSize);
ApiError error = ApiError.fromErrorMessage(errorResponse);
throw error.exception();
} else {
throw new IllegalStateException("Unexpected response type '" + respType + "'");
}
} catch (Throwable t1) {
try {
callback.onFailure(t1);
} catch (Throwable t2) {
LOG.error("Failed to notify callback about failure", t2);
}
} finally {
if (!hasReleased) {
buffer.release();
}
}
}