in link.go [124:238]
func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAttach), afterAttach func(*frames.PerformAttach)) error {
if err := l.session.freeAbandonedLinks(ctx); err != nil {
return err
}
// once the abandoned links have been cleaned up we can create our link
if err := l.session.allocateHandle(ctx, l); err != nil {
return err
}
attach := &frames.PerformAttach{
Name: l.key.name,
Handle: l.outputHandle,
ReceiverSettleMode: l.receiverSettleMode,
SenderSettleMode: l.senderSettleMode,
MaxMessageSize: l.maxMessageSize,
Source: l.source,
Target: l.target,
Properties: l.properties,
DesiredCapabilities: l.desiredCapabilities,
}
// link-specific configuration of the attach frame
beforeAttach(attach)
if err := l.txFrameAndWait(ctx, attach); err != nil {
return err
}
// wait for response
fr, err := l.waitForFrame(ctx)
if err != nil {
l.session.abandonLink(l)
return err
}
resp, ok := fr.(*frames.PerformAttach)
if !ok {
debug.Log(1, "RX (link %p): unexpected attach response frame %T", l, fr)
if err := l.session.conn.Close(); err != nil {
return err
}
return &ConnError{inner: fmt.Errorf("unexpected attach response: %#v", fr)}
}
// If the remote encounters an error during the attach it returns an Attach
// with no Source or Target. The remote then sends a Detach with an error.
//
// Note that if the application chooses not to create a terminus, the session
// endpoint will still create a link endpoint and issue an attach indicating
// that the link endpoint has no associated local terminus. In this case, the
// session endpoint MUST immediately detach the newly created link endpoint.
//
// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
if resp.Source == nil && resp.Target == nil {
// wait for detach
fr, err := l.waitForFrame(ctx)
if err != nil {
// we timed out waiting for the peer to close the link, this really isn't an abandoned link.
// however, we still need to send the detach performative to ack the peer.
l.session.abandonLink(l)
return err
}
detach, ok := fr.(*frames.PerformDetach)
if !ok {
if err := l.session.conn.Close(); err != nil {
return err
}
return &ConnError{inner: fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)}
}
// send return detach
fr = &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, fr); err != nil {
return err
}
if detach.Error == nil {
return fmt.Errorf("received detach with no error specified")
}
return detach.Error
}
if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize {
l.maxMessageSize = resp.MaxMessageSize
}
// link-specific configuration post attach
afterAttach(resp)
if err := l.setSettleModes(resp); err != nil {
// close the link as there's a mismatch on requested/supported settlement modes
dr := &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, dr); err != nil {
return err
}
return err
}
if len(resp.Properties) > 0 {
l.peerProperties = map[string]any{}
for k, v := range resp.Properties {
l.peerProperties[string(k)] = v
}
}
return nil
}