in pulsar/internal/connection.go [380:436]
func (c *connection) run() {
pingSendTicker := time.NewTicker(c.keepAliveInterval)
pingCheckTicker := time.NewTicker(c.keepAliveInterval)
defer func() {
// stop tickers
pingSendTicker.Stop()
pingCheckTicker.Stop()
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue
// https://github.com/apache/pulsar-client-go/issues/239
c.failPendingRequests(errConnectionClosed)
c.Close()
}()
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck(pingCheckTicker)
c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
cap(c.incomingRequestsCh), len(c.incomingRequestsCh))
go func() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
if req == nil {
return // TODO: this never gonna be happen
}
c.internalSendRequest(req)
}
}
}()
for {
select {
case <-c.closeCh:
return
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
case data := <-c.writeRequestsCh:
if data == nil {
return
}
c.internalWriteData(data)
case <-pingSendTicker.C:
c.sendPing()
}
}
}