in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala [792:840]
def decodeControlPacket(maxPacketSize: Int): Either[DecodeError, ControlPacket] =
try {
val b = v.getByte & 0xFF
v.decodeRemainingLength() match {
case Right(l) if l <= maxPacketSize =>
(ControlPacketType(b >> 4), ControlPacketFlags(b & 0xF)) match {
case (ControlPacketType.Reserved1, ControlPacketFlags.ReservedGeneral) =>
Right(Reserved1)
case (ControlPacketType.Reserved2, ControlPacketFlags.ReservedGeneral) =>
Right(Reserved2)
case (ControlPacketType.CONNECT, ControlPacketFlags.ReservedGeneral) =>
v.decodeConnect()
case (ControlPacketType.CONNACK, ControlPacketFlags.ReservedGeneral) =>
v.decodeConnAck()
case (ControlPacketType.PUBLISH, flags) =>
v.decodePublish(l, flags)
case (ControlPacketType.PUBACK, ControlPacketFlags.ReservedGeneral) =>
v.decodePubAck()
case (ControlPacketType.PUBREC, ControlPacketFlags.ReservedGeneral) =>
v.decodePubRec()
case (ControlPacketType.PUBREL, ControlPacketFlags.ReservedPubRel) =>
v.decodePubRel()
case (ControlPacketType.PUBCOMP, ControlPacketFlags.ReservedGeneral) =>
v.decodePubComp()
case (ControlPacketType.SUBSCRIBE, ControlPacketFlags.ReservedSubscribe) =>
v.decodeSubscribe(l)
case (ControlPacketType.SUBACK, ControlPacketFlags.ReservedGeneral) =>
v.decodeSubAck(l)
case (ControlPacketType.UNSUBSCRIBE, ControlPacketFlags.ReservedUnsubscribe) =>
v.decodeUnsubscribe(l)
case (ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck) =>
v.decodeUnsubAck()
case (ControlPacketType.PINGREQ, ControlPacketFlags.ReservedGeneral) =>
Right(PingReq)
case (ControlPacketType.PINGRESP, ControlPacketFlags.ReservedGeneral) =>
Right(PingResp)
case (ControlPacketType.DISCONNECT, ControlPacketFlags.ReservedGeneral) =>
Right(Disconnect)
case (packetType, flags) =>
Left(UnknownPacketType(packetType, flags))
}
case Right(l) =>
Left(InvalidPacketSize(l, maxPacketSize))
case Left(BufferUnderflow) => Left(BufferUnderflow)
case other @ Left(_) => throw new MatchError(other)
}
} catch {
case _: NoSuchElementException => Left(BufferUnderflow)
}