func()

in pulsar/consumer_partition.go [1519:1563]


func (pc *partitionConsumer) runEventsLoop() {
	defer func() {
		pc.log.Debug("exiting events loop")
	}()
	pc.log.Debug("get into runEventsLoop")

	go func() {
		for {
			select {
			case <-pc.closeCh:
				pc.log.Info("close consumer, exit reconnect")
				return
			case <-pc.connectClosedCh:
				pc.log.Debug("runEventsLoop will reconnect")
				pc.reconnectToBroker()
			}
		}
	}()

	for {
		for i := range pc.eventsCh {
			switch v := i.(type) {
			case *ackRequest:
				pc.internalAck(v)
			case *ackWithTxnRequest:
				pc.internalAckWithTxn(v)
			case []*pb.MessageIdData:
				pc.internalAckList(v)
			case *redeliveryRequest:
				pc.internalRedeliver(v)
			case *unsubscribeRequest:
				pc.internalUnsubscribe(v)
			case *getLastMsgIDRequest:
				pc.internalGetLastMessageID(v)
			case *seekRequest:
				pc.internalSeek(v)
			case *seekByTimeRequest:
				pc.internalSeekByTime(v)
			case *closeRequest:
				pc.internalClose(v)
				return
			}
		}
	}
}