func()

in pulsar/consumer_partition.go [730:798]


func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
	if !pc.options.ackWithResponse {
		for _, msgID := range msgIDs {
			if err := pc.AckID(msgID); err != nil {
				return err
			}
		}
		return nil
	}

	chunkedMsgIDs := make([]*chunkMessageID, 0) // we need to remove them after acknowledging
	pendingAcks := make(map[position]*bitset.BitSet)
	validMsgIDs := make([]MessageID, 0, len(msgIDs))

	// They might be complete after the whole for loop
	for _, msgID := range msgIDs {
		if msgID.PartitionIdx() != pc.partitionIdx {
			pc.log.Errorf("%v inconsistent partition index %v (current: %v)", msgID, msgID.PartitionIdx(), pc.partitionIdx)
		} else if msgID.BatchIdx() >= 0 && msgID.BatchSize() > 0 &&
			msgID.BatchIdx() >= msgID.BatchSize() {
			pc.log.Errorf("%v invalid batch index %v (size: %v)", msgID, msgID.BatchIdx(), msgID.BatchSize())
		} else {
			valid := true
			switch convertedMsgID := msgID.(type) {
			case *trackingMessageID:
				position := newPosition(msgID)
				if convertedMsgID.ack() {
					pendingAcks[position] = nil
				} else if pc.options.enableBatchIndexAck {
					pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
				}
			case *chunkMessageID:
				for _, id := range pc.unAckChunksTracker.get(convertedMsgID) {
					pendingAcks[newPosition(id)] = nil
				}
				chunkedMsgIDs = append(chunkedMsgIDs, convertedMsgID)
			case *messageID:
				pendingAcks[newPosition(msgID)] = nil
			default:
				pc.log.Errorf("invalid message id type %T: %v", msgID, msgID)
				valid = false
			}
			if valid {
				validMsgIDs = append(validMsgIDs, msgID)
			}
		}
	}

	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return toAckError(map[error][]MessageID{errors.New("consumer state is closed"): validMsgIDs})
	}

	req := &ackListRequest{
		errCh:  make(chan error),
		msgIDs: toMsgIDDataList(pendingAcks),
	}
	pc.eventsCh <- req
	if err := <-req.errCh; err != nil {
		return toAckError(map[error][]MessageID{err: validMsgIDs})
	}
	for _, id := range chunkedMsgIDs {
		pc.unAckChunksTracker.remove(id)
	}
	for _, id := range msgIDs {
		pc.options.interceptors.OnAcknowledge(pc.parentConsumer, id)
	}
	return nil
}