in sender.go [184:303]
func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (SendReceipt, error) {
const (
maxDeliveryTagLength = 32
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
)
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return SendReceipt{}, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.buf.Reset()
err := msg.Marshal(&s.buf)
if err != nil {
return SendReceipt{}, err
}
if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
return SendReceipt{}, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
}
}
senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
if opts != nil {
if opts.Settled && senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeUnsettled {
return SendReceipt{}, errors.New("can't send message as settled when sender settlement mode is unsettled")
} else if opts.Settled {
senderSettled = true
}
}
var (
maxPayloadSize = int64(s.l.session.conn.peerMaxFrameSize) - maxTransferFrameHeader
)
deliveryTag := msg.DeliveryTag
if len(deliveryTag) == 0 {
// use uint64 encoded as []byte as deliveryTag
deliveryTag = make([]byte, 8)
binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
s.nextDeliveryTag++
}
fr := frames.PerformTransfer{
Handle: s.l.outputHandle,
DeliveryID: &needsDeliveryID,
DeliveryTag: deliveryTag,
MessageFormat: &msg.Format,
More: s.buf.Len() > 0,
}
for fr.More {
buf, _ := s.buf.Next(maxPayloadSize)
fr.Payload = append([]byte(nil), buf...)
fr.More = s.buf.Len() > 0
if !fr.More {
// SSM=settled: overrides RSM; no acks.
// SSM=unsettled: sender should wait for receiver to ack
// RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only)
// RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only)
// mark final transfer as settled when sender mode is settled
fr.Settled = senderSettled
// set done on last frame
fr.Done = make(chan encoding.DeliveryState, 1)
}
// NOTE: we MUST send a copy of fr here since we modify it post send
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
select {
case s.transfers <- transferEnvelope{FrameCtx: &frameCtx, InputHandle: s.l.inputHandle, Frame: fr}:
// frame was sent to our mux
case <-s.l.done:
return SendReceipt{}, s.l.doneErr
case <-ctx.Done():
return SendReceipt{}, &Error{Condition: ErrCondTransferLimitExceeded, Description: fmt.Sprintf("credit limit exceeded for sending link %s", s.l.key.name)}
}
select {
case <-frameCtx.Done:
if frameCtx.Err != nil {
if !fr.More {
select {
case s.rollback <- struct{}{}:
// the write never happened so signal the mux to roll back the delivery count and link credit
case <-s.l.close:
// the link is going down
}
}
return SendReceipt{}, frameCtx.Err
}
// frame was written to the network
case <-s.l.done:
return SendReceipt{}, s.l.doneErr
}
// clear values that are only required on first message
fr.DeliveryID = nil
fr.DeliveryTag = nil
fr.MessageFormat = nil
}
return SendReceipt{
l: &s.l,
tag: deliveryTag,
done: fr.Done,
}, nil
}