func()

in sender.go [184:303]


func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (SendReceipt, error) {
	const (
		maxDeliveryTagLength   = 32
		maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
	)
	if len(msg.DeliveryTag) > maxDeliveryTagLength {
		return SendReceipt{}, &Error{
			Condition:   ErrCondMessageSizeExceeded,
			Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
		}
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	s.buf.Reset()
	err := msg.Marshal(&s.buf)
	if err != nil {
		return SendReceipt{}, err
	}

	if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
		return SendReceipt{}, &Error{
			Condition:   ErrCondMessageSizeExceeded,
			Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
		}
	}

	senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
	if opts != nil {
		if opts.Settled && senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeUnsettled {
			return SendReceipt{}, errors.New("can't send message as settled when sender settlement mode is unsettled")
		} else if opts.Settled {
			senderSettled = true
		}
	}

	var (
		maxPayloadSize = int64(s.l.session.conn.peerMaxFrameSize) - maxTransferFrameHeader
	)

	deliveryTag := msg.DeliveryTag
	if len(deliveryTag) == 0 {
		// use uint64 encoded as []byte as deliveryTag
		deliveryTag = make([]byte, 8)
		binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
		s.nextDeliveryTag++
	}

	fr := frames.PerformTransfer{
		Handle:        s.l.outputHandle,
		DeliveryID:    &needsDeliveryID,
		DeliveryTag:   deliveryTag,
		MessageFormat: &msg.Format,
		More:          s.buf.Len() > 0,
	}

	for fr.More {
		buf, _ := s.buf.Next(maxPayloadSize)
		fr.Payload = append([]byte(nil), buf...)
		fr.More = s.buf.Len() > 0
		if !fr.More {
			// SSM=settled: overrides RSM; no acks.
			// SSM=unsettled: sender should wait for receiver to ack
			// RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only)
			// RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only)

			// mark final transfer as settled when sender mode is settled
			fr.Settled = senderSettled

			// set done on last frame
			fr.Done = make(chan encoding.DeliveryState, 1)
		}

		// NOTE: we MUST send a copy of fr here since we modify it post send

		frameCtx := frameContext{
			Ctx:  ctx,
			Done: make(chan struct{}),
		}

		select {
		case s.transfers <- transferEnvelope{FrameCtx: &frameCtx, InputHandle: s.l.inputHandle, Frame: fr}:
			// frame was sent to our mux
		case <-s.l.done:
			return SendReceipt{}, s.l.doneErr
		case <-ctx.Done():
			return SendReceipt{}, &Error{Condition: ErrCondTransferLimitExceeded, Description: fmt.Sprintf("credit limit exceeded for sending link %s", s.l.key.name)}
		}

		select {
		case <-frameCtx.Done:
			if frameCtx.Err != nil {
				if !fr.More {
					select {
					case s.rollback <- struct{}{}:
						// the write never happened so signal the mux to roll back the delivery count and link credit
					case <-s.l.close:
						// the link is going down
					}
				}
				return SendReceipt{}, frameCtx.Err
			}
			// frame was written to the network
		case <-s.l.done:
			return SendReceipt{}, s.l.doneErr
		}

		// clear values that are only required on first message
		fr.DeliveryID = nil
		fr.DeliveryTag = nil
		fr.MessageFormat = nil
	}

	return SendReceipt{
		l:    &s.l,
		tag:  deliveryTag,
		done: fr.Done,
	}, nil
}