func()

in pulsar/consumer_partition.go [1760:1798]


func (pc *partitionConsumer) runEventsLoop() {
	defer func() {
		pc.log.Debug("exiting events loop")
	}()
	pc.log.Debug("get into runEventsLoop")

	for {
		select {
		case <-pc.closeCh:
			pc.log.Info("close consumer, exit reconnect")
			return
		case connectionClosed := <-pc.connectClosedCh:
			pc.log.Debug("runEventsLoop will reconnect")
			pc.reconnectToBroker(connectionClosed)
		case event := <-pc.eventsCh:
			switch v := event.(type) {
			case *ackRequest:
				pc.internalAck(v)
			case *ackWithTxnRequest:
				pc.internalAckWithTxn(v)
			case *ackListRequest:
				pc.internalAckList(v)
			case *redeliveryRequest:
				pc.internalRedeliver(v)
			case *unsubscribeRequest:
				pc.internalUnsubscribe(v)
			case *getLastMsgIDRequest:
				pc.internalGetLastMessageID(v)
			case *seekRequest:
				pc.internalSeek(v)
			case *seekByTimeRequest:
				pc.internalSeekByTime(v)
			case *closeRequest:
				pc.internalClose(v)
				return
			}
		}
	}
}