in conn.go [1058:1114]
func (c *Conn) openAMQP(ctx context.Context) (stateFunc, error) {
// send open frame
open := &frames.PerformOpen{
ContainerID: c.containerID,
Hostname: c.hostname,
MaxFrameSize: c.maxFrameSize,
ChannelMax: c.channelMax,
IdleTimeout: c.idleTimeout / 2, // per spec, advertise half our idle timeout
Properties: c.properties,
}
fr := frames.Frame{
Type: frames.TypeAMQP,
Body: open,
Channel: 0,
}
debug.Log(1, "TX (openAMQP %p): %s", c, fr)
timeout, err := c.getWriteTimeout(ctx)
if err != nil {
return nil, err
}
if err = c.writeFrame(timeout, fr); err != nil {
return nil, err
}
// get the response
fr, err = c.readSingleFrame()
if err != nil {
return nil, err
}
debug.Log(1, "RX (openAMQP %p): %s", c, fr)
o, ok := fr.Body.(*frames.PerformOpen)
if !ok {
return nil, fmt.Errorf("openAMQP: unexpected frame type %T", fr.Body)
}
// update peer settings
if o.MaxFrameSize > 0 {
c.peerMaxFrameSize = o.MaxFrameSize
}
if o.IdleTimeout > 0 {
// TODO: reject very small idle timeouts
c.peerIdleTimeout = o.IdleTimeout
}
if o.ChannelMax < c.channelMax {
c.channelMax = o.ChannelMax
}
if len(o.Properties) > 0 {
c.peerProperties = map[string]any{}
for k, v := range o.Properties {
c.peerProperties[string(k)] = v
}
}
// connection established, exit state machine
return nil, nil
}