func()

in conn.go [790:871]


func (c *Conn) connWriter() {
	defer func() {
		close(c.txDone)
		c.close()
	}()

	var (
		// keepalives are sent at a rate of 1/2 idle timeout
		keepaliveInterval = c.peerIdleTimeout / 2
		// 0 disables keepalives
		keepalivesEnabled = keepaliveInterval > 0
		// set if enable, nil if not; nil channels block forever
		keepalive <-chan time.Time
	)

	if keepalivesEnabled {
		ticker := time.NewTicker(keepaliveInterval)
		defer ticker.Stop()
		keepalive = ticker.C
	}

	var err error
	for {
		if err != nil {
			debug.Log(0, "TX (connWriter %p): terminal error: %v", c, err)
			c.txErr = err
			return
		}

		select {
		// frame write request
		case env := <-c.txFrame:
			timeout, ctxErr := c.getWriteTimeout(env.FrameCtx.Ctx)
			if ctxErr != nil {
				debug.Log(1, "TX (connWriter %p) getWriteTimeout: %s: %s", c, ctxErr.Error(), env.Frame)
				if env.FrameCtx.Done != nil {
					// the error MUST be set before closing the channel
					env.FrameCtx.Err = ctxErr
					close(env.FrameCtx.Done)
				}
				continue
			}

			debug.Log(0, "TX (connWriter %p) timeout %s: %s", c, timeout, env.Frame)
			err = c.writeFrame(timeout, env.Frame)
			if err == nil && env.FrameCtx.Done != nil {
				close(env.FrameCtx.Done)
			}
			// in the event of write failure, Conn will close and a
			// *ConnError will be propagated to all of the sessions/link.

		// keepalive timer
		case <-keepalive:
			debug.Log(3, "TX (connWriter %p): sending keep-alive frame", c)
			_ = c.net.SetWriteDeadline(time.Now().Add(c.writeTimeout))
			if _, err = c.net.Write(keepaliveFrame); err != nil {
				err = &ConnError{inner: err}
			}
			// It would be slightly more efficient in terms of network
			// resources to reset the timer each time a frame is sent.
			// However, keepalives are small (8 bytes) and the interval
			// is usually on the order of minutes. It does not seem
			// worth it to add extra operations in the write path to
			// avoid. (To properly reset a timer it needs to be stopped,
			// possibly drained, then reset.)

		// connection complete
		case <-c.rxtxExit:
			// send close performative.  note that the spec says we
			// SHOULD wait for the ack but we don't HAVE to, in order
			// to be resilient to bad actors etc.  so we just send
			// the close performative and exit.
			fr := frames.Frame{
				Type: frames.TypeAMQP,
				Body: &frames.PerformClose{},
			}
			debug.Log(1, "TX (connWriter %p): %s", c, fr)
			c.txErr = c.writeFrame(c.writeTimeout, fr)
			return
		}
	}
}