in receiver.go [770:882]
func (r *Receiver) muxReceive(fr frames.PerformTransfer) {
if !r.more {
// this is the first transfer of a message,
// record the delivery ID, message format,
// and delivery Tag
if fr.DeliveryID != nil {
r.msg.deliveryID = *fr.DeliveryID
}
if fr.MessageFormat != nil {
r.msg.Format = *fr.MessageFormat
}
r.msg.DeliveryTag = fr.DeliveryTag
// these fields are required on first transfer of a message
if fr.DeliveryID == nil {
r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-id")
return
}
if fr.MessageFormat == nil {
r.l.closeWithError(ErrCondNotAllowed, "received message without a message-format")
return
}
if fr.DeliveryTag == nil {
r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-tag")
return
}
} else {
// this is a continuation of a multipart message
// some fields may be omitted on continuation transfers,
// but if they are included they must be consistent
// with the first.
if fr.DeliveryID != nil && *fr.DeliveryID != r.msg.deliveryID {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-id: %d != %d",
*fr.DeliveryID, r.msg.deliveryID,
)
r.l.closeWithError(ErrCondNotAllowed, msg)
return
}
if fr.MessageFormat != nil && *fr.MessageFormat != r.msg.Format {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent message-format: %d != %d",
*fr.MessageFormat, r.msg.Format,
)
r.l.closeWithError(ErrCondNotAllowed, msg)
return
}
if fr.DeliveryTag != nil && !bytes.Equal(fr.DeliveryTag, r.msg.DeliveryTag) {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-tag: %q != %q",
fr.DeliveryTag, r.msg.DeliveryTag,
)
r.l.closeWithError(ErrCondNotAllowed, msg)
return
}
}
// discard message if it's been aborted
if fr.Aborted {
r.msgBuf.Reset()
r.msg = Message{}
r.more = false
return
}
// ensure maxMessageSize will not be exceeded
if r.l.maxMessageSize != 0 && uint64(r.msgBuf.Len())+uint64(len(fr.Payload)) > r.l.maxMessageSize {
r.l.closeWithError(ErrCondMessageSizeExceeded, fmt.Sprintf("received message larger than max size of %d", r.l.maxMessageSize))
return
}
// add the payload the the buffer
r.msgBuf.Append(fr.Payload)
// mark as settled if at least one frame is settled
r.msg.settled = r.msg.settled || fr.Settled
// save in-progress status
r.more = fr.More
if fr.More {
return
}
// last frame in message
err := r.msg.Unmarshal(&r.msgBuf)
if err != nil {
r.l.closeWithError(ErrCondInternalError, err.Error())
return
}
// send to receiver
if !r.msg.settled {
r.addUnsettled()
r.msg.rcv = r
debug.Log(3, "RX (Receiver %p): add unsettled delivery ID %d", r, r.msg.deliveryID)
}
q := r.messagesQ.Acquire()
q.Enqueue(r.msg)
msgLen := q.Len()
r.messagesQ.Release(q)
// reset progress
r.msgBuf.Reset()
r.msg = Message{}
// decrement link-credit after entire message received
r.l.deliveryCount++
r.l.linkCredit--
debug.Log(3, "RX (Receiver %p) link %s - deliveryCount: %d, linkCredit: %d, len(messages): %d", r, r.l.key.name, r.l.deliveryCount, r.l.linkCredit, msgLen)
}