in pulsar/consumer_partition.go [830:879]
func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withResponse bool) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
}
if !pc.isAllowAckCumulative() {
return errors.Wrap(ErrInvalidAck, "cumulative ack is not allowed for the Shared/KeyShared subscription type")
}
// chunk message id will be converted to tracking message id
trackingID := toTrackingMessageID(msgID)
if trackingID == nil {
return errors.New("failed to convert trackingMessageID")
}
var msgIDToAck *trackingMessageID
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
msgIDToAck = trackingID
} else if !trackingID.tracker.hasPrevBatchAcked() {
// get previous batch message id
msgIDToAck = trackingID.prev()
trackingID.tracker.setPrevBatchAcked()
} else {
// waiting for all the msgs are acked in this batch
return nil
}
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
var ackReq *ackRequest
if withResponse {
ackReq = pc.sendCumulativeAck(msgIDToAck)
<-ackReq.doneCh
} else {
pc.ackGroupingTracker.addCumulative(msgIDToAck)
}
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
if cmid, ok := msgID.(*chunkMessageID); ok {
pc.unAckChunksTracker.remove(cmid)
}
if ackReq == nil {
return nil
}
return ackReq.err
}