func()

in link.go [124:238]


func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAttach), afterAttach func(*frames.PerformAttach)) error {
	if err := l.session.freeAbandonedLinks(ctx); err != nil {
		return err
	}

	// once the abandoned links have been cleaned up we can create our link
	if err := l.session.allocateHandle(ctx, l); err != nil {
		return err
	}

	attach := &frames.PerformAttach{
		Name:                l.key.name,
		Handle:              l.outputHandle,
		ReceiverSettleMode:  l.receiverSettleMode,
		SenderSettleMode:    l.senderSettleMode,
		MaxMessageSize:      l.maxMessageSize,
		Source:              l.source,
		Target:              l.target,
		Properties:          l.properties,
		DesiredCapabilities: l.desiredCapabilities,
	}

	// link-specific configuration of the attach frame
	beforeAttach(attach)

	if err := l.txFrameAndWait(ctx, attach); err != nil {
		return err
	}

	// wait for response
	fr, err := l.waitForFrame(ctx)
	if err != nil {
		l.session.abandonLink(l)
		return err
	}

	resp, ok := fr.(*frames.PerformAttach)
	if !ok {
		debug.Log(1, "RX (link %p): unexpected attach response frame %T", l, fr)
		if err := l.session.conn.Close(); err != nil {
			return err
		}
		return &ConnError{inner: fmt.Errorf("unexpected attach response: %#v", fr)}
	}

	// If the remote encounters an error during the attach it returns an Attach
	// with no Source or Target. The remote then sends a Detach with an error.
	//
	//   Note that if the application chooses not to create a terminus, the session
	//   endpoint will still create a link endpoint and issue an attach indicating
	//   that the link endpoint has no associated local terminus. In this case, the
	//   session endpoint MUST immediately detach the newly created link endpoint.
	//
	// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
	if resp.Source == nil && resp.Target == nil {
		// wait for detach
		fr, err := l.waitForFrame(ctx)
		if err != nil {
			// we timed out waiting for the peer to close the link, this really isn't an abandoned link.
			// however, we still need to send the detach performative to ack the peer.
			l.session.abandonLink(l)
			return err
		}

		detach, ok := fr.(*frames.PerformDetach)
		if !ok {
			if err := l.session.conn.Close(); err != nil {
				return err
			}
			return &ConnError{inner: fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)}
		}

		// send return detach
		fr = &frames.PerformDetach{
			Handle: l.outputHandle,
			Closed: true,
		}
		if err := l.txFrameAndWait(ctx, fr); err != nil {
			return err
		}

		if detach.Error == nil {
			return fmt.Errorf("received detach with no error specified")
		}
		return detach.Error
	}

	if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize {
		l.maxMessageSize = resp.MaxMessageSize
	}

	// link-specific configuration post attach
	afterAttach(resp)

	if err := l.setSettleModes(resp); err != nil {
		// close the link as there's a mismatch on requested/supported settlement modes
		dr := &frames.PerformDetach{
			Handle: l.outputHandle,
			Closed: true,
		}
		if err := l.txFrameAndWait(ctx, dr); err != nil {
			return err
		}
		return err
	}

	if len(resp.Properties) > 0 {
		l.peerProperties = map[string]any{}
		for k, v := range resp.Properties {
			l.peerProperties[string(k)] = v
		}
	}

	return nil
}