in pulsar/consumer_partition.go [1760:1798]
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")
for {
select {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
case event := <-pc.eventsCh:
switch v := event.(type) {
case *ackRequest:
pc.internalAck(v)
case *ackWithTxnRequest:
pc.internalAckWithTxn(v)
case *ackListRequest:
pc.internalAckList(v)
case *redeliveryRequest:
pc.internalRedeliver(v)
case *unsubscribeRequest:
pc.internalUnsubscribe(v)
case *getLastMsgIDRequest:
pc.internalGetLastMessageID(v)
case *seekRequest:
pc.internalSeek(v)
case *seekByTimeRequest:
pc.internalSeekByTime(v)
case *closeRequest:
pc.internalClose(v)
return
}
}
}
}