in pulsar/consumer_partition.go [730:798]
func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
if !pc.options.ackWithResponse {
for _, msgID := range msgIDs {
if err := pc.AckID(msgID); err != nil {
return err
}
}
return nil
}
chunkedMsgIDs := make([]*chunkMessageID, 0) // we need to remove them after acknowledging
pendingAcks := make(map[position]*bitset.BitSet)
validMsgIDs := make([]MessageID, 0, len(msgIDs))
// They might be complete after the whole for loop
for _, msgID := range msgIDs {
if msgID.PartitionIdx() != pc.partitionIdx {
pc.log.Errorf("%v inconsistent partition index %v (current: %v)", msgID, msgID.PartitionIdx(), pc.partitionIdx)
} else if msgID.BatchIdx() >= 0 && msgID.BatchSize() > 0 &&
msgID.BatchIdx() >= msgID.BatchSize() {
pc.log.Errorf("%v invalid batch index %v (size: %v)", msgID, msgID.BatchIdx(), msgID.BatchSize())
} else {
valid := true
switch convertedMsgID := msgID.(type) {
case *trackingMessageID:
position := newPosition(msgID)
if convertedMsgID.ack() {
pendingAcks[position] = nil
} else if pc.options.enableBatchIndexAck {
pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
}
case *chunkMessageID:
for _, id := range pc.unAckChunksTracker.get(convertedMsgID) {
pendingAcks[newPosition(id)] = nil
}
chunkedMsgIDs = append(chunkedMsgIDs, convertedMsgID)
case *messageID:
pendingAcks[newPosition(msgID)] = nil
default:
pc.log.Errorf("invalid message id type %T: %v", msgID, msgID)
valid = false
}
if valid {
validMsgIDs = append(validMsgIDs, msgID)
}
}
}
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return toAckError(map[error][]MessageID{errors.New("consumer state is closed"): validMsgIDs})
}
req := &ackListRequest{
errCh: make(chan error),
msgIDs: toMsgIDDataList(pendingAcks),
}
pc.eventsCh <- req
if err := <-req.errCh; err != nil {
return toAckError(map[error][]MessageID{err: validMsgIDs})
}
for _, id := range chunkedMsgIDs {
pc.unAckChunksTracker.remove(id)
}
for _, id := range msgIDs {
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, id)
}
return nil
}