public void channelRead()

in fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClientHandler.java [55:144]


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buffer = (ByteBuf) msg;
        boolean hasReleased = false;
        try {
            int frameLength = buffer.readInt();
            ResponseType respType = ResponseType.forId(buffer.readByte());
            if (respType == ResponseType.SUCCESS_RESPONSE) {
                int requestId = buffer.readInt();
                int messageSize = frameLength - RESPONSE_HEADER_LENGTH;
                if (messageSize < 0) {
                    callback.onRequestFailure(
                            requestId,
                            new CorruptMessageException(
                                    "Invalid response frame length "
                                            + frameLength
                                            + " which is must greater than "
                                            + RESPONSE_HEADER_LENGTH));
                }
                ApiMethod apiMethod = callback.getRequestApiMethod(requestId);
                if (apiMethod == null) {
                    callback.onRequestFailure(
                            requestId,
                            new IllegalStateException(
                                    "Unknown request, this might be caused by the"
                                            + " request has been timeout."));
                    return;
                }
                ApiMessage response = apiMethod.getResponseConstructor().get();
                if (response.isLazilyParsed()) {
                    // copy the buffer into a heap buffer, this can avoid the network buffer
                    // being released before the bytes fields of the response are lazily parsed.
                    ByteBuf copiedBuffer = Unpooled.buffer(messageSize, messageSize);
                    copiedBuffer.writeBytes(buffer, messageSize);
                    // response parsed from the copied buffer can be safely cached in user queues.
                    response.parseFrom(copiedBuffer, messageSize);
                } else {
                    response.parseFrom(buffer, messageSize);
                    // eagerly release the buffer to make the buffer recycle faster
                    buffer.release();
                    hasReleased = true;
                }
                callback.onRequestResult(requestId, response);

            } else if (respType == ResponseType.ERROR_RESPONSE) {
                int requestId = buffer.readInt();
                int messageSize = frameLength - RESPONSE_HEADER_LENGTH;
                if (messageSize < 0) {
                    callback.onRequestFailure(
                            requestId,
                            new CorruptMessageException(
                                    "Invalid response frame length "
                                            + frameLength
                                            + " which is must greater than "
                                            + RESPONSE_HEADER_LENGTH));
                }
                ErrorResponse errorResponse = new ErrorResponse();
                errorResponse.parseFrom(buffer, messageSize);
                ApiError error = ApiError.fromErrorMessage(errorResponse);
                callback.onRequestFailure(requestId, error.exception());

            } else if (respType == ResponseType.SERVER_FAILURE) {
                int messageSize = frameLength - SERVER_FAILURE_HEADER_LENGTH;
                if (messageSize < 0) {
                    throw new CorruptMessageException(
                            "Invalid server failure frame length "
                                    + frameLength
                                    + " which is must greater than "
                                    + SERVER_FAILURE_HEADER_LENGTH);
                }
                ErrorResponse errorResponse = new ErrorResponse();
                errorResponse.parseFrom(buffer, messageSize);
                ApiError error = ApiError.fromErrorMessage(errorResponse);
                throw error.exception();

            } else {
                throw new IllegalStateException("Unexpected response type '" + respType + "'");
            }

        } catch (Throwable t1) {
            try {
                callback.onFailure(t1);
            } catch (Throwable t2) {
                LOG.error("Failed to notify callback about failure", t2);
            }
        } finally {
            if (!hasReleased) {
                buffer.release();
            }
        }
    }