public RpcMessage decodeFrame()

in core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java [76:143]


    public RpcMessage decodeFrame(ByteBuf in) {
        ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0();
        if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) {
            throw new IllegalArgumentException("Nothing to decode.");
        }

        in.markReaderIndex();
        short protocol = in.readShort();
        int flag = (int) in.readShort();

        boolean isHeartbeat = (ProtocolConstantsV0.FLAG_HEARTBEAT & flag) > 0;
        boolean isRequest = (ProtocolConstantsV0.FLAG_REQUEST & flag) > 0;
        boolean isSeataCodec = (ProtocolConstantsV0.FLAG_SEATA_CODEC & flag) > 0;
        rpcMessage.setSeataCodec(isSeataCodec);

        short bodyLength = 0;
        short typeCode = 0;
        if (!isSeataCodec) {
            bodyLength = in.readShort();
        } else {
            typeCode = in.readShort();
        }
        long msgId = in.readLong();
        rpcMessage.setId(msgId);
        if (isHeartbeat) {
            rpcMessage.setAsync(true);
            rpcMessage.setHeartbeat(isHeartbeat);
            rpcMessage.setRequest(isRequest);
            if (isRequest) {
                rpcMessage.setBody(HeartbeatMessage.PING);
            } else {
                rpcMessage.setBody(HeartbeatMessage.PONG);
            }

            return rpcMessage.protocolMsg2RpcMsg();
        }

        if (bodyLength > 0 && in.readableBytes() < bodyLength) {
            in.resetReaderIndex();
            throw new IllegalArgumentException("readableBytes < bodyLength");
        }

        rpcMessage.setAsync((ProtocolConstantsV0.FLAG_ASYNC & flag) > 0);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(isRequest);

        try {
            int length = in.readableBytes();
            byte[] bs = new byte[length];
            in.readBytes(bs);

            // fill messageType in v0
            byte[] bs2 = new byte[2 + length];
            bs2[0] = (byte) (0x00FF & (typeCode >> 8));
            bs2[1] = (byte) (0x00FF & typeCode);
            System.arraycopy(bs, 0, bs2, 2, length);
            byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
            Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), ProtocolConstants.VERSION_0);
            rpcMessage.setBody(serializer.deserialize(bs2));
        } catch (Exception e) {
            LOGGER.error("decode error", e);
            throw e;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + msgId);
        }
        return rpcMessage.protocolMsg2RpcMsg();
    }