protected Object decodeBody()

in dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java [92:229]


    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(true);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
            try {
                if (status == Response.OK) {
                    Object data;
                    if (res.isEvent()) {
                        byte[] eventPayload = CodecSupport.getPayload(is);
                        if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                            // heart beat response data is always null;
                            data = null;
                        } else {
                            ObjectInput in = CodecSupport.deserialize(
                                    channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
                            data = decodeEventData(channel, in, eventPayload);
                        }
                    } else {
                        DecodeableRpcResult result;
                        Invocation inv = (Invocation) getRequestData(channel, res, id);
                        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                            if (customByteAccessor != null) {
                                result = customByteAccessor.getRpcResult(
                                        channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
                            } else {
                                result = new DecodeableRpcResult(
                                        channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
                            }
                            result.decode();
                        } else {
                            if (customByteAccessor != null) {
                                result = customByteAccessor.getRpcResult(
                                        channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
                            } else {
                                result = new DecodeableRpcResult(
                                        channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
                            }
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            return res;
        } else {
            // decode request.
            Request req;
            try {
                Object data;
                if ((flag & FLAG_EVENT) != 0) {
                    byte[] eventPayload = CodecSupport.getPayload(is);
                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                        // heart beat response data is always null;
                        req = new HeartBeatRequest(id);
                        req.setVersion(Version.getProtocolVersion());
                        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
                        ((HeartBeatRequest) req).setProto(proto);
                        data = null;
                    } else {
                        req = new Request(id);
                        req.setVersion(Version.getProtocolVersion());
                        req.setTwoWay((flag & FLAG_TWOWAY) != 0);

                        ObjectInput in = CodecSupport.deserialize(
                                channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
                        data = decodeEventData(channel, in, eventPayload);
                    }
                    req.setEvent(true);
                } else {
                    req = new Request(id);
                    req.setVersion(Version.getProtocolVersion());
                    req.setTwoWay((flag & FLAG_TWOWAY) != 0);

                    // get data length.
                    int len = Bytes.bytes2int(header, 12);
                    req.setPayload(len);

                    DecodeableRpcInvocation inv;
                    if (isDecodeDataInIoThread(channel)) {
                        if (customByteAccessor != null) {
                            inv = customByteAccessor.getRpcInvocation(
                                    channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                        } else {
                            inv = new DecodeableRpcInvocation(
                                    frameworkModel,
                                    channel,
                                    req,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    proto);
                        }
                        inv.decode();
                    } else {
                        if (customByteAccessor != null) {
                            inv = customByteAccessor.getRpcInvocation(
                                    channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                        } else {
                            inv = new DecodeableRpcInvocation(
                                    frameworkModel,
                                    channel,
                                    req,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    proto);
                        }
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req = new HeartBeatRequest(id);
                req.setBroken(true);
                req.setData(t);
            }

            return req;
        }
    }