uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch()

in lib/ConsumerImpl.cc [692:760]


uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
                                                          Message& batchedMessage, const BitSet& ackSet,
                                                          int redeliveryCount) {
    auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
    LOG_DEBUG("Received Batch messages of size - " << batchSize
                                                   << " -- msgId: " << batchedMessage.getMessageId());
    const auto startMessageId = startMessageId_.get();

    int skippedMessages = 0;

    auto acker = BatchMessageAckerImpl::create(batchSize);
    std::vector<Message> possibleToDeadLetter;
    for (int i = 0; i < batchSize; i++) {
        // This is a cheap copy since message contains only one shared pointer (impl_)
        Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
        msg.impl_->setRedeliveryCount(redeliveryCount);
        msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
        msg.impl_->convertPayloadToKeyValue(config_.getSchema());
        if (msg.impl_->brokerEntryMetadata.has_index()) {
            msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index() - batchSize + i +
                                                     1);
        }

        if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
            possibleToDeadLetter.emplace_back(msg);
            if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
                skippedMessages++;
                continue;
            }
        }

        if (startMessageId) {
            const MessageId& msgId = msg.getMessageId();

            // If we are receiving a batch message, we need to discard messages that were prior
            // to the startMessageId
            if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() &&
                msgId.entryId() == startMessageId.value().entryId() &&
                isPriorBatchIndex(msgId.batchIndex())) {
                LOG_DEBUG(getName() << "Ignoring message from before the startMessageId"
                                    << msg.getMessageId());
                ++skippedMessages;
                continue;
            }
        }

        if (!ackSet.isEmpty() && !ackSet.get(i)) {
            LOG_DEBUG(getName() << "Ignoring message from " << i
                                << "th message, which has been acknowledged");
            ++skippedMessages;
            continue;
        }

        executeNotifyCallback(msg);
    }

    if (!possibleToDeadLetter.empty()) {
        possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
        if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
            redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
        }
    }

    if (skippedMessages > 0) {
        increaseAvailablePermits(cnx, skippedMessages);
    }

    return batchSize - skippedMessages;
}