in mqtt/mqtt3/src/proto/packet.rs [154:252]
fn decode(flags: u8, mut src: bytes::BytesMut) -> Result<Self, super::DecodeError> {
if flags != 0 {
return Err(super::DecodeError::UnrecognizedPacket {
packet_type: Self::PACKET_TYPE,
flags,
remaining_length: src.len(),
});
}
let protocol_name = super::Utf8StringDecoder::default()
.decode(&mut src)?
.ok_or(super::DecodeError::IncompletePacket)?;
if protocol_name != crate::PROTOCOL_NAME {
return Err(super::DecodeError::UnrecognizedProtocolName(protocol_name));
}
let protocol_level = src.try_get_u8()?;
let connect_flags = src.try_get_u8()?;
if connect_flags & 0x01 != 0 {
return Err(super::DecodeError::ConnectReservedSet);
}
let keep_alive = Duration::from_secs(u64::from(src.try_get_u16_be()?));
let client_id = super::Utf8StringDecoder::default()
.decode(&mut src)?
.ok_or(super::DecodeError::IncompletePacket)?;
let client_id = if client_id.is_empty() {
if connect_flags & 0x02 == 0 {
return Err(super::DecodeError::ConnectZeroLengthIdWithExistingSession);
}
super::ClientId::ServerGenerated
} else if connect_flags & 0x02 == 0 {
super::ClientId::IdWithExistingSession(client_id)
} else {
super::ClientId::IdWithCleanSession(client_id)
};
let will = if connect_flags & 0x04 == 0 {
None
} else {
let topic_name = super::Utf8StringDecoder::default()
.decode(&mut src)?
.ok_or(super::DecodeError::IncompletePacket)?;
let qos = match connect_flags & 0x18 {
0x00 => QoS::AtMostOnce,
0x08 => QoS::AtLeastOnce,
0x10 => QoS::ExactlyOnce,
qos => return Err(super::DecodeError::UnrecognizedQoS(qos >> 3)),
};
let retain = connect_flags & 0x20 != 0;
let payload_len = usize::from(src.try_get_u16_be()?);
if src.len() < payload_len {
return Err(super::DecodeError::IncompletePacket);
}
let payload = src.split_to(payload_len).freeze();
Some(Publication {
topic_name,
qos,
retain,
payload,
})
};
let username = if connect_flags & 0x80 == 0 {
None
} else {
Some(
super::Utf8StringDecoder::default()
.decode(&mut src)?
.ok_or(super::DecodeError::IncompletePacket)?,
)
};
let password = if connect_flags & 0x40 == 0 {
None
} else {
Some(
super::Utf8StringDecoder::default()
.decode(&mut src)?
.ok_or(super::DecodeError::IncompletePacket)?,
)
};
Ok(Connect {
username,
password,
will,
client_id,
keep_alive,
protocol_name,
protocol_level,
})
}