fn decode()

in mqtt/mqtt3/src/proto/packet.rs [960:1030]


    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let (first_byte, src) = loop {
            match &mut self.decoder_state {
                PacketDecoderState::Empty => {
                    let first_byte = match src.try_get_u8() {
                        Ok(first_byte) => first_byte,
                        Err(_) => return Ok(None),
                    };
                    self.decoder_state = PacketDecoderState::HaveFirstByte {
                        first_byte,
                        remaining_length: Default::default(),
                    };
                }

                PacketDecoderState::HaveFirstByte {
                    first_byte,
                    remaining_length,
                } => match remaining_length.decode(src)? {
                    Some(remaining_length) => {
                        self.decoder_state = PacketDecoderState::HaveFixedHeader {
                            first_byte: *first_byte,
                            remaining_length,
                        }
                    }
                    None => return Ok(None),
                },

                PacketDecoderState::HaveFixedHeader {
                    first_byte,
                    remaining_length,
                } => {
                    if src.len() < *remaining_length {
                        return Ok(None);
                    }

                    let first_byte = *first_byte;
                    let src = src.split_to(*remaining_length);
                    self.decoder_state = PacketDecoderState::Empty;
                    break (first_byte, src);
                }
            }
        };

        let packet_type = first_byte & 0xF0;
        let flags = first_byte & 0x0F;
        match packet_type {
            ConnAck::PACKET_TYPE => Ok(Some(Packet::ConnAck(ConnAck::decode(flags, src)?))),
            Connect::PACKET_TYPE => Ok(Some(Packet::Connect(Connect::decode(flags, src)?))),
            Disconnect::PACKET_TYPE => {
                Ok(Some(Packet::Disconnect(Disconnect::decode(flags, src)?)))
            }
            PingReq::PACKET_TYPE => Ok(Some(Packet::PingReq(PingReq::decode(flags, src)?))),
            PingResp::PACKET_TYPE => Ok(Some(Packet::PingResp(PingResp::decode(flags, src)?))),
            PubAck::PACKET_TYPE => Ok(Some(Packet::PubAck(PubAck::decode(flags, src)?))),
            PubComp::PACKET_TYPE => Ok(Some(Packet::PubComp(PubComp::decode(flags, src)?))),
            Publish::PACKET_TYPE => Ok(Some(Packet::Publish(Publish::decode(flags, src)?))),
            PubRec::PACKET_TYPE => Ok(Some(Packet::PubRec(PubRec::decode(flags, src)?))),
            PubRel::PACKET_TYPE => Ok(Some(Packet::PubRel(PubRel::decode(flags, src)?))),
            SubAck::PACKET_TYPE => Ok(Some(Packet::SubAck(SubAck::decode(flags, src)?))),
            Subscribe::PACKET_TYPE => Ok(Some(Packet::Subscribe(Subscribe::decode(flags, src)?))),
            UnsubAck::PACKET_TYPE => Ok(Some(Packet::UnsubAck(UnsubAck::decode(flags, src)?))),
            Unsubscribe::PACKET_TYPE => {
                Ok(Some(Packet::Unsubscribe(Unsubscribe::decode(flags, src)?)))
            }
            packet_type => Err(super::DecodeError::UnrecognizedPacket {
                packet_type,
                flags,
                remaining_length: src.len(),
            }),
        }
    }