func()

in pulsar/consumer_partition.go [830:879]


func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withResponse bool) error {
	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return errors.New("consumer state is closed")
	}

	if !pc.isAllowAckCumulative() {
		return errors.Wrap(ErrInvalidAck, "cumulative ack is not allowed for the Shared/KeyShared subscription type")
	}

	// chunk message id will be converted to tracking message id
	trackingID := toTrackingMessageID(msgID)
	if trackingID == nil {
		return errors.New("failed to convert trackingMessageID")
	}

	var msgIDToAck *trackingMessageID
	if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
		msgIDToAck = trackingID
	} else if !trackingID.tracker.hasPrevBatchAcked() {
		// get previous batch message id
		msgIDToAck = trackingID.prev()
		trackingID.tracker.setPrevBatchAcked()
	} else {
		// waiting for all the msgs are acked in this batch
		return nil
	}

	pc.metrics.AcksCounter.Inc()
	pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)

	var ackReq *ackRequest
	if withResponse {
		ackReq = pc.sendCumulativeAck(msgIDToAck)
		<-ackReq.doneCh
	} else {
		pc.ackGroupingTracker.addCumulative(msgIDToAck)
	}

	pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)

	if cmid, ok := msgID.(*chunkMessageID); ok {
		pc.unAckChunksTracker.remove(cmid)
	}

	if ackReq == nil {
		return nil
	}
	return ackReq.err
}