in internal/encoding/decode.go [718:831]
func readComposite(r *buffer.Buffer) (any, error) {
buf := r.Bytes()
if len(buf) < 2 {
return nil, errors.New("invalid length for composite")
}
// compsites start with 0x0
if AMQPType(buf[0]) != 0x0 {
return nil, fmt.Errorf("invalid composite header %#02x", buf[0])
}
var compositeType uint64
switch AMQPType(buf[1]) {
case TypeCodeSmallUlong:
if len(buf) < 3 {
return nil, errors.New("invalid length for smallulong")
}
compositeType = uint64(buf[2])
case TypeCodeUlong:
if len(buf) < 10 {
return nil, errors.New("invalid length for ulong")
}
compositeType = binary.BigEndian.Uint64(buf[2:])
}
if compositeType > math.MaxUint8 {
// try as described type
var dt DescribedType
err := dt.Unmarshal(r)
return dt, err
}
switch AMQPType(compositeType) {
// Error
case TypeCodeError:
t := new(Error)
err := t.Unmarshal(r)
return t, err
// Lifetime Policies
case TypeCodeDeleteOnClose:
t := DeleteOnClose
err := t.Unmarshal(r)
return t, err
case TypeCodeDeleteOnNoMessages:
t := DeleteOnNoMessages
err := t.Unmarshal(r)
return t, err
case TypeCodeDeleteOnNoLinks:
t := DeleteOnNoLinks
err := t.Unmarshal(r)
return t, err
case TypeCodeDeleteOnNoLinksOrMessages:
t := DeleteOnNoLinksOrMessages
err := t.Unmarshal(r)
return t, err
// Delivery States
case TypeCodeStateAccepted:
t := new(StateAccepted)
err := t.Unmarshal(r)
return t, err
case TypeCodeStateModified:
t := new(StateModified)
err := t.Unmarshal(r)
return t, err
case TypeCodeStateReceived:
t := new(StateReceived)
err := t.Unmarshal(r)
return t, err
case TypeCodeStateRejected:
t := new(StateRejected)
err := t.Unmarshal(r)
return t, err
case TypeCodeStateReleased:
t := new(StateReleased)
err := t.Unmarshal(r)
return t, err
case TypeCodeOpen,
TypeCodeBegin,
TypeCodeAttach,
TypeCodeFlow,
TypeCodeTransfer,
TypeCodeDisposition,
TypeCodeDetach,
TypeCodeEnd,
TypeCodeClose,
TypeCodeSource,
TypeCodeTarget,
TypeCodeMessageHeader,
TypeCodeDeliveryAnnotations,
TypeCodeMessageAnnotations,
TypeCodeMessageProperties,
TypeCodeApplicationProperties,
TypeCodeApplicationData,
TypeCodeAMQPSequence,
TypeCodeAMQPValue,
TypeCodeFooter,
TypeCodeSASLMechanism,
TypeCodeSASLInit,
TypeCodeSASLChallenge,
TypeCodeSASLResponse,
TypeCodeSASLOutcome:
return nil, fmt.Errorf("readComposite unmarshal not implemented for %#02x", compositeType)
default:
// try as described type
var dt DescribedType
err := dt.Unmarshal(r)
return dt, err
}
}