func()

in pulsar/consumer_partition.go [424:470]


func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn Transaction) error {
	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return errors.New("consumer state is closed")
	}

	if cmid, ok := msgID.(*chunkMessageID); ok {
		if txn == nil {
			return pc.unAckChunksTracker.ack(cmid)
		}
		return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
	}

	trackingID := toTrackingMessageID(msgID)

	if trackingID != nil && trackingID.ack() {
		// All messages in the same batch have been acknowledged, we only need to acknowledge the
		// MessageID that represents the entry that stores the whole batch
		trackingID = &trackingMessageID{
			messageID: &messageID{
				ledgerID: trackingID.ledgerID,
				entryID:  trackingID.entryID,
			},
		}
		pc.metrics.AcksCounter.Inc()
		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
	} else if !pc.options.enableBatchIndexAck {
		return nil
	}

	var err error
	if withResponse {
		if txn != nil {
			ackReq := pc.sendIndividualAckWithTxn(trackingID, txn.(*transaction))
			<-ackReq.doneCh
			err = ackReq.err
		} else {
			ackReq := pc.sendIndividualAck(trackingID)
			<-ackReq.doneCh
			err = ackReq.err
		}
	} else {
		pc.ackGroupingTracker.add(trackingID)
	}
	pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
	return err
}