in pulsar/consumer_partition.go [424:470]
func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn Transaction) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
}
if cmid, ok := msgID.(*chunkMessageID); ok {
if txn == nil {
return pc.unAckChunksTracker.ack(cmid)
}
return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}
trackingID := toTrackingMessageID(msgID)
if trackingID != nil && trackingID.ack() {
// All messages in the same batch have been acknowledged, we only need to acknowledge the
// MessageID that represents the entry that stores the whole batch
trackingID = &trackingMessageID{
messageID: &messageID{
ledgerID: trackingID.ledgerID,
entryID: trackingID.entryID,
},
}
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
} else if !pc.options.enableBatchIndexAck {
return nil
}
var err error
if withResponse {
if txn != nil {
ackReq := pc.sendIndividualAckWithTxn(trackingID, txn.(*transaction))
<-ackReq.doneCh
err = ackReq.err
} else {
ackReq := pc.sendIndividualAck(trackingID)
<-ackReq.doneCh
err = ackReq.err
}
} else {
pc.ackGroupingTracker.add(trackingID)
}
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
return err
}