func()

in conn.go [1058:1114]


func (c *Conn) openAMQP(ctx context.Context) (stateFunc, error) {
	// send open frame
	open := &frames.PerformOpen{
		ContainerID:  c.containerID,
		Hostname:     c.hostname,
		MaxFrameSize: c.maxFrameSize,
		ChannelMax:   c.channelMax,
		IdleTimeout:  c.idleTimeout / 2, // per spec, advertise half our idle timeout
		Properties:   c.properties,
	}
	fr := frames.Frame{
		Type:    frames.TypeAMQP,
		Body:    open,
		Channel: 0,
	}
	debug.Log(1, "TX (openAMQP %p): %s", c, fr)
	timeout, err := c.getWriteTimeout(ctx)
	if err != nil {
		return nil, err
	}
	if err = c.writeFrame(timeout, fr); err != nil {
		return nil, err
	}

	// get the response
	fr, err = c.readSingleFrame()
	if err != nil {
		return nil, err
	}
	debug.Log(1, "RX (openAMQP %p): %s", c, fr)
	o, ok := fr.Body.(*frames.PerformOpen)
	if !ok {
		return nil, fmt.Errorf("openAMQP: unexpected frame type %T", fr.Body)
	}

	// update peer settings
	if o.MaxFrameSize > 0 {
		c.peerMaxFrameSize = o.MaxFrameSize
	}
	if o.IdleTimeout > 0 {
		// TODO: reject very small idle timeouts
		c.peerIdleTimeout = o.IdleTimeout
	}
	if o.ChannelMax < c.channelMax {
		c.channelMax = o.ChannelMax
	}

	if len(o.Properties) > 0 {
		c.peerProperties = map[string]any{}
		for k, v := range o.Properties {
			c.peerProperties[string(k)] = v
		}
	}

	// connection established, exit state machine
	return nil, nil
}