func()

in receiver.go [770:882]


func (r *Receiver) muxReceive(fr frames.PerformTransfer) {
	if !r.more {
		// this is the first transfer of a message,
		// record the delivery ID, message format,
		// and delivery Tag
		if fr.DeliveryID != nil {
			r.msg.deliveryID = *fr.DeliveryID
		}
		if fr.MessageFormat != nil {
			r.msg.Format = *fr.MessageFormat
		}
		r.msg.DeliveryTag = fr.DeliveryTag

		// these fields are required on first transfer of a message
		if fr.DeliveryID == nil {
			r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-id")
			return
		}
		if fr.MessageFormat == nil {
			r.l.closeWithError(ErrCondNotAllowed, "received message without a message-format")
			return
		}
		if fr.DeliveryTag == nil {
			r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-tag")
			return
		}
	} else {
		// this is a continuation of a multipart message
		// some fields may be omitted on continuation transfers,
		// but if they are included they must be consistent
		// with the first.

		if fr.DeliveryID != nil && *fr.DeliveryID != r.msg.deliveryID {
			msg := fmt.Sprintf(
				"received continuation transfer with inconsistent delivery-id: %d != %d",
				*fr.DeliveryID, r.msg.deliveryID,
			)
			r.l.closeWithError(ErrCondNotAllowed, msg)
			return
		}
		if fr.MessageFormat != nil && *fr.MessageFormat != r.msg.Format {
			msg := fmt.Sprintf(
				"received continuation transfer with inconsistent message-format: %d != %d",
				*fr.MessageFormat, r.msg.Format,
			)
			r.l.closeWithError(ErrCondNotAllowed, msg)
			return
		}
		if fr.DeliveryTag != nil && !bytes.Equal(fr.DeliveryTag, r.msg.DeliveryTag) {
			msg := fmt.Sprintf(
				"received continuation transfer with inconsistent delivery-tag: %q != %q",
				fr.DeliveryTag, r.msg.DeliveryTag,
			)
			r.l.closeWithError(ErrCondNotAllowed, msg)
			return
		}
	}

	// discard message if it's been aborted
	if fr.Aborted {
		r.msgBuf.Reset()
		r.msg = Message{}
		r.more = false
		return
	}

	// ensure maxMessageSize will not be exceeded
	if r.l.maxMessageSize != 0 && uint64(r.msgBuf.Len())+uint64(len(fr.Payload)) > r.l.maxMessageSize {
		r.l.closeWithError(ErrCondMessageSizeExceeded, fmt.Sprintf("received message larger than max size of %d", r.l.maxMessageSize))
		return
	}

	// add the payload the the buffer
	r.msgBuf.Append(fr.Payload)

	// mark as settled if at least one frame is settled
	r.msg.settled = r.msg.settled || fr.Settled

	// save in-progress status
	r.more = fr.More

	if fr.More {
		return
	}

	// last frame in message
	err := r.msg.Unmarshal(&r.msgBuf)
	if err != nil {
		r.l.closeWithError(ErrCondInternalError, err.Error())
		return
	}

	// send to receiver
	if !r.msg.settled {
		r.addUnsettled()
		r.msg.rcv = r
		debug.Log(3, "RX (Receiver %p): add unsettled delivery ID %d", r, r.msg.deliveryID)
	}

	q := r.messagesQ.Acquire()
	q.Enqueue(r.msg)
	msgLen := q.Len()
	r.messagesQ.Release(q)

	// reset progress
	r.msgBuf.Reset()
	r.msg = Message{}

	// decrement link-credit after entire message received
	r.l.deliveryCount++
	r.l.linkCredit--
	debug.Log(3, "RX (Receiver %p) link %s - deliveryCount: %d, linkCredit: %d, len(messages): %d", r, r.l.key.name, r.l.deliveryCount, r.l.linkCredit, msgLen)
}