in receiver.go [405:503]
func newReceiver(source string, session *Session, opts *ReceiverOptions) (*Receiver, error) {
l := newLink(session, encoding.RoleReceiver)
l.source = &frames.Source{Address: source}
l.target = new(frames.Target)
l.linkCredit = defaultLinkCredit
r := &Receiver{
l: l,
autoSendFlow: true,
receiverReady: make(chan struct{}, 1),
txDisposition: make(chan frameBodyEnvelope),
}
r.messagesQ = queue.NewHolder(queue.New[Message](int(session.incomingWindow)))
if opts == nil {
return r, nil
}
for _, v := range opts.Capabilities {
r.l.target.Capabilities = append(r.l.target.Capabilities, encoding.Symbol(v))
}
if opts.Credit > 0 {
r.l.linkCredit = uint32(opts.Credit)
} else if opts.Credit < 0 {
r.l.linkCredit = 0
r.autoSendFlow = false
}
if opts.DesiredCapabilities != nil {
r.l.desiredCapabilities = make([]encoding.Symbol, 0, len(opts.DesiredCapabilities))
for _, capabilityStr := range opts.DesiredCapabilities {
r.l.desiredCapabilities = append(r.l.desiredCapabilities, encoding.Symbol(capabilityStr))
}
}
if opts.Durability > DurabilityUnsettledState {
return nil, fmt.Errorf("invalid Durability %d", opts.Durability)
}
r.l.target.Durable = opts.Durability
if opts.DynamicAddress {
r.l.source.Address = ""
r.l.dynamicAddr = opts.DynamicAddress
}
if opts.ExpiryPolicy != "" {
if err := encoding.ValidateExpiryPolicy(opts.ExpiryPolicy); err != nil {
return nil, err
}
r.l.target.ExpiryPolicy = opts.ExpiryPolicy
}
r.l.target.Timeout = opts.ExpiryTimeout
if opts.Filters != nil {
r.l.source.Filter = make(encoding.Filter)
for _, f := range opts.Filters {
f(r.l.source.Filter)
}
}
if opts.MaxMessageSize > 0 {
r.l.maxMessageSize = opts.MaxMessageSize
}
if opts.Name != "" {
r.l.key.name = opts.Name
}
if opts.Properties != nil {
r.l.properties = make(map[encoding.Symbol]any)
for k, v := range opts.Properties {
if k == "" {
return nil, errors.New("link property key must not be empty")
}
r.l.properties[encoding.Symbol(k)] = v
}
}
if opts.RequestedSenderSettleMode != nil {
if rsm := *opts.RequestedSenderSettleMode; rsm > SenderSettleModeMixed {
return nil, fmt.Errorf("invalid RequestedSenderSettleMode %d", rsm)
}
r.l.senderSettleMode = opts.RequestedSenderSettleMode
}
if opts.SettlementMode != nil {
if rsm := *opts.SettlementMode; rsm > ReceiverSettleModeSecond {
return nil, fmt.Errorf("invalid SettlementMode %d", rsm)
}
r.l.receiverSettleMode = opts.SettlementMode
}
r.l.target.Address = opts.TargetAddress
for _, v := range opts.SourceCapabilities {
r.l.source.Capabilities = append(r.l.source.Capabilities, encoding.Symbol(v))
}
if opts.SourceDurability != DurabilityNone {
r.l.source.Durable = opts.SourceDurability
}
if opts.SourceExpiryPolicy != ExpiryPolicySessionEnd {
r.l.source.ExpiryPolicy = opts.SourceExpiryPolicy
}
if opts.SourceExpiryTimeout != 0 {
r.l.source.Timeout = opts.SourceExpiryTimeout
}
return r, nil
}