in lib/ConsumerImpl.cc [692:760]
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
Message& batchedMessage, const BitSet& ackSet,
int redeliveryCount) {
auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " << batchedMessage.getMessageId());
const auto startMessageId = startMessageId_.get();
int skippedMessages = 0;
auto acker = BatchMessageAckerImpl::create(batchSize);
std::vector<Message> possibleToDeadLetter;
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
if (msg.impl_->brokerEntryMetadata.has_index()) {
msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index() - batchSize + i +
1);
}
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
possibleToDeadLetter.emplace_back(msg);
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
skippedMessages++;
continue;
}
}
if (startMessageId) {
const MessageId& msgId = msg.getMessageId();
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() &&
msgId.entryId() == startMessageId.value().entryId() &&
isPriorBatchIndex(msgId.batchIndex())) {
LOG_DEBUG(getName() << "Ignoring message from before the startMessageId"
<< msg.getMessageId());
++skippedMessages;
continue;
}
}
if (!ackSet.isEmpty() && !ackSet.get(i)) {
LOG_DEBUG(getName() << "Ignoring message from " << i
<< "th message, which has been acknowledged");
++skippedMessages;
continue;
}
executeNotifyCallback(msg);
}
if (!possibleToDeadLetter.empty()) {
possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
}
}
if (skippedMessages > 0) {
increaseAvailablePermits(cnx, skippedMessages);
}
return batchSize - skippedMessages;
}