fn decode()

in mqtt/mqtt3/src/proto/packet.rs [154:252]


    fn decode(flags: u8, mut src: bytes::BytesMut) -> Result<Self, super::DecodeError> {
        if flags != 0 {
            return Err(super::DecodeError::UnrecognizedPacket {
                packet_type: Self::PACKET_TYPE,
                flags,
                remaining_length: src.len(),
            });
        }

        let protocol_name = super::Utf8StringDecoder::default()
            .decode(&mut src)?
            .ok_or(super::DecodeError::IncompletePacket)?;
        if protocol_name != crate::PROTOCOL_NAME {
            return Err(super::DecodeError::UnrecognizedProtocolName(protocol_name));
        }

        let protocol_level = src.try_get_u8()?;

        let connect_flags = src.try_get_u8()?;
        if connect_flags & 0x01 != 0 {
            return Err(super::DecodeError::ConnectReservedSet);
        }

        let keep_alive = Duration::from_secs(u64::from(src.try_get_u16_be()?));

        let client_id = super::Utf8StringDecoder::default()
            .decode(&mut src)?
            .ok_or(super::DecodeError::IncompletePacket)?;
        let client_id = if client_id.is_empty() {
            if connect_flags & 0x02 == 0 {
                return Err(super::DecodeError::ConnectZeroLengthIdWithExistingSession);
            }
            super::ClientId::ServerGenerated
        } else if connect_flags & 0x02 == 0 {
            super::ClientId::IdWithExistingSession(client_id)
        } else {
            super::ClientId::IdWithCleanSession(client_id)
        };

        let will = if connect_flags & 0x04 == 0 {
            None
        } else {
            let topic_name = super::Utf8StringDecoder::default()
                .decode(&mut src)?
                .ok_or(super::DecodeError::IncompletePacket)?;

            let qos = match connect_flags & 0x18 {
                0x00 => QoS::AtMostOnce,
                0x08 => QoS::AtLeastOnce,
                0x10 => QoS::ExactlyOnce,
                qos => return Err(super::DecodeError::UnrecognizedQoS(qos >> 3)),
            };

            let retain = connect_flags & 0x20 != 0;

            let payload_len = usize::from(src.try_get_u16_be()?);
            if src.len() < payload_len {
                return Err(super::DecodeError::IncompletePacket);
            }
            let payload = src.split_to(payload_len).freeze();

            Some(Publication {
                topic_name,
                qos,
                retain,
                payload,
            })
        };

        let username = if connect_flags & 0x80 == 0 {
            None
        } else {
            Some(
                super::Utf8StringDecoder::default()
                    .decode(&mut src)?
                    .ok_or(super::DecodeError::IncompletePacket)?,
            )
        };

        let password = if connect_flags & 0x40 == 0 {
            None
        } else {
            Some(
                super::Utf8StringDecoder::default()
                    .decode(&mut src)?
                    .ok_or(super::DecodeError::IncompletePacket)?,
            )
        };

        Ok(Connect {
            username,
            password,
            will,
            client_id,
            keep_alive,
            protocol_name,
            protocol_level,
        })
    }