in conn.go [790:871]
func (c *Conn) connWriter() {
defer func() {
close(c.txDone)
c.close()
}()
var (
// keepalives are sent at a rate of 1/2 idle timeout
keepaliveInterval = c.peerIdleTimeout / 2
// 0 disables keepalives
keepalivesEnabled = keepaliveInterval > 0
// set if enable, nil if not; nil channels block forever
keepalive <-chan time.Time
)
if keepalivesEnabled {
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
keepalive = ticker.C
}
var err error
for {
if err != nil {
debug.Log(0, "TX (connWriter %p): terminal error: %v", c, err)
c.txErr = err
return
}
select {
// frame write request
case env := <-c.txFrame:
timeout, ctxErr := c.getWriteTimeout(env.FrameCtx.Ctx)
if ctxErr != nil {
debug.Log(1, "TX (connWriter %p) getWriteTimeout: %s: %s", c, ctxErr.Error(), env.Frame)
if env.FrameCtx.Done != nil {
// the error MUST be set before closing the channel
env.FrameCtx.Err = ctxErr
close(env.FrameCtx.Done)
}
continue
}
debug.Log(0, "TX (connWriter %p) timeout %s: %s", c, timeout, env.Frame)
err = c.writeFrame(timeout, env.Frame)
if err == nil && env.FrameCtx.Done != nil {
close(env.FrameCtx.Done)
}
// in the event of write failure, Conn will close and a
// *ConnError will be propagated to all of the sessions/link.
// keepalive timer
case <-keepalive:
debug.Log(3, "TX (connWriter %p): sending keep-alive frame", c)
_ = c.net.SetWriteDeadline(time.Now().Add(c.writeTimeout))
if _, err = c.net.Write(keepaliveFrame); err != nil {
err = &ConnError{inner: err}
}
// It would be slightly more efficient in terms of network
// resources to reset the timer each time a frame is sent.
// However, keepalives are small (8 bytes) and the interval
// is usually on the order of minutes. It does not seem
// worth it to add extra operations in the write path to
// avoid. (To properly reset a timer it needs to be stopped,
// possibly drained, then reset.)
// connection complete
case <-c.rxtxExit:
// send close performative. note that the spec says we
// SHOULD wait for the ack but we don't HAVE to, in order
// to be resilient to bad actors etc. so we just send
// the close performative and exit.
fr := frames.Frame{
Type: frames.TypeAMQP,
Body: &frames.PerformClose{},
}
debug.Log(1, "TX (connWriter %p): %s", c, fr)
c.txErr = c.writeFrame(c.writeTimeout, fr)
return
}
}
}