func()

in pulsar/internal/connection.go [380:436]


func (c *connection) run() {
	pingSendTicker := time.NewTicker(c.keepAliveInterval)
	pingCheckTicker := time.NewTicker(c.keepAliveInterval)

	defer func() {
		// stop tickers
		pingSendTicker.Stop()
		pingCheckTicker.Stop()

		// all the accesses to the pendingReqs should be happened in this run loop thread,
		// including the final cleanup, to avoid the issue
		// https://github.com/apache/pulsar-client-go/issues/239
		c.failPendingRequests(errConnectionClosed)
		c.Close()
	}()

	// All reads come from the reader goroutine
	go c.reader.readFromConnection()
	go c.runPingCheck(pingCheckTicker)

	c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
		cap(c.incomingRequestsCh), len(c.incomingRequestsCh))

	go func() {
		for {
			select {
			case <-c.closeCh:
				c.failLeftRequestsWhenClose()
				return

			case req := <-c.incomingRequestsCh:
				if req == nil {
					return // TODO: this never gonna be happen
				}
				c.internalSendRequest(req)
			}
		}
	}()

	for {
		select {
		case <-c.closeCh:
			return

		case cmd := <-c.incomingCmdCh:
			c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
		case data := <-c.writeRequestsCh:
			if data == nil {
				return
			}
			c.internalWriteData(data)

		case <-pingSendTicker.C:
			c.sendPing()
		}
	}
}