in pulsar/consumer_partition.go [1519:1563]
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")
go func() {
for {
select {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
}
}
}()
for {
for i := range pc.eventsCh {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
case *ackWithTxnRequest:
pc.internalAckWithTxn(v)
case []*pb.MessageIdData:
pc.internalAckList(v)
case *redeliveryRequest:
pc.internalRedeliver(v)
case *unsubscribeRequest:
pc.internalUnsubscribe(v)
case *getLastMsgIDRequest:
pc.internalGetLastMessageID(v)
case *seekRequest:
pc.internalSeek(v)
case *seekByTimeRequest:
pc.internalSeekByTime(v)
case *closeRequest:
pc.internalClose(v)
return
}
}
}
}