in mqtt/mqtt3/src/proto/packet.rs [960:1030]
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (first_byte, src) = loop {
match &mut self.decoder_state {
PacketDecoderState::Empty => {
let first_byte = match src.try_get_u8() {
Ok(first_byte) => first_byte,
Err(_) => return Ok(None),
};
self.decoder_state = PacketDecoderState::HaveFirstByte {
first_byte,
remaining_length: Default::default(),
};
}
PacketDecoderState::HaveFirstByte {
first_byte,
remaining_length,
} => match remaining_length.decode(src)? {
Some(remaining_length) => {
self.decoder_state = PacketDecoderState::HaveFixedHeader {
first_byte: *first_byte,
remaining_length,
}
}
None => return Ok(None),
},
PacketDecoderState::HaveFixedHeader {
first_byte,
remaining_length,
} => {
if src.len() < *remaining_length {
return Ok(None);
}
let first_byte = *first_byte;
let src = src.split_to(*remaining_length);
self.decoder_state = PacketDecoderState::Empty;
break (first_byte, src);
}
}
};
let packet_type = first_byte & 0xF0;
let flags = first_byte & 0x0F;
match packet_type {
ConnAck::PACKET_TYPE => Ok(Some(Packet::ConnAck(ConnAck::decode(flags, src)?))),
Connect::PACKET_TYPE => Ok(Some(Packet::Connect(Connect::decode(flags, src)?))),
Disconnect::PACKET_TYPE => {
Ok(Some(Packet::Disconnect(Disconnect::decode(flags, src)?)))
}
PingReq::PACKET_TYPE => Ok(Some(Packet::PingReq(PingReq::decode(flags, src)?))),
PingResp::PACKET_TYPE => Ok(Some(Packet::PingResp(PingResp::decode(flags, src)?))),
PubAck::PACKET_TYPE => Ok(Some(Packet::PubAck(PubAck::decode(flags, src)?))),
PubComp::PACKET_TYPE => Ok(Some(Packet::PubComp(PubComp::decode(flags, src)?))),
Publish::PACKET_TYPE => Ok(Some(Packet::Publish(Publish::decode(flags, src)?))),
PubRec::PACKET_TYPE => Ok(Some(Packet::PubRec(PubRec::decode(flags, src)?))),
PubRel::PACKET_TYPE => Ok(Some(Packet::PubRel(PubRel::decode(flags, src)?))),
SubAck::PACKET_TYPE => Ok(Some(Packet::SubAck(SubAck::decode(flags, src)?))),
Subscribe::PACKET_TYPE => Ok(Some(Packet::Subscribe(Subscribe::decode(flags, src)?))),
UnsubAck::PACKET_TYPE => Ok(Some(Packet::UnsubAck(UnsubAck::decode(flags, src)?))),
Unsubscribe::PACKET_TYPE => {
Ok(Some(Packet::Unsubscribe(Unsubscribe::decode(flags, src)?)))
}
packet_type => Err(super::DecodeError::UnrecognizedPacket {
packet_type,
flags,
remaining_length: src.len(),
}),
}
}