in graphql/handler/transport/websocket.go [238:323]
func (c *wsConnection) run() {
// We create a cancellation that will shutdown the keep-alive when we leave
// this function.
ctx, cancel := context.WithCancel(c.ctx)
defer func() {
cancel()
}()
// If we're running in graphql-ws mode, create a timer that will trigger a
// keep alive message every interval
if (c.conn.Subprotocol() == "" || c.conn.Subprotocol() == graphqlwsSubprotocol) && c.KeepAlivePingInterval != 0 {
c.mu.Lock()
c.keepAliveTicker = time.NewTicker(c.KeepAlivePingInterval)
c.mu.Unlock()
go c.keepAlive(ctx)
}
// If we're running in graphql-transport-ws mode, create a timer that will trigger a
// just a pong message every interval
if c.conn.Subprotocol() == graphqltransportwsSubprotocol && c.PongOnlyInterval != 0 {
c.mu.Lock()
c.pongOnlyTicker = time.NewTicker(c.PongOnlyInterval)
c.mu.Unlock()
go c.keepAlivePongOnly(ctx)
}
// If we're running in graphql-transport-ws mode, create a timer that will
// trigger a ping message every interval and expect a pong!
if c.conn.Subprotocol() == graphqltransportwsSubprotocol && c.PingPongInterval != 0 {
c.mu.Lock()
c.pingPongTicker = time.NewTicker(c.PingPongInterval)
c.mu.Unlock()
if !c.MissingPongOk {
// Note: when the connection is closed by this deadline, the client
// will receive an "invalid close code"
_ = c.conn.SetReadDeadline(time.Now().UTC().Add(2 * c.PingPongInterval))
}
go c.ping(ctx)
}
// Close the connection when the context is cancelled.
// Will optionally send a "close reason" that is retrieved from the context.
go c.closeOnCancel(ctx)
for {
start := graphql.Now()
m, err := c.me.NextMessage()
if err != nil {
// If the connection got closed by us, don't report the error
if !errors.Is(err, net.ErrClosed) {
c.handlePossibleError(err, true)
}
return
}
switch m.t {
case startMessageType:
c.subscribe(start, &m)
case stopMessageType:
c.mu.Lock()
closer := c.active[m.id]
c.mu.Unlock()
if closer != nil {
closer()
}
case connectionCloseMessageType:
c.close(websocket.CloseNormalClosure, "terminated")
return
case pingMessageType:
c.write(&message{t: pongMessageType, payload: m.payload})
case pongMessageType:
c.mu.Lock()
c.receivedPong = true
c.mu.Unlock()
// Clear ReadTimeout -- 0 time val clears.
_ = c.conn.SetReadDeadline(time.Time{})
default:
c.sendConnectionError("unexpected message %s", m.t)
c.close(websocket.CloseProtocolError, "unexpected message")
return
}
}
}