in lib/ConsumerImpl.cc [538:641]
void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& metadata, SharedBuffer& payload) {
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
// Message was discarded or not consumed due to decryption failure
return;
}
if (!isChecksumValid) {
// Message discarded for checksum error
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
return;
}
auto redeliveryCount = msg.redelivery_count();
const bool isMessageUndecryptable =
metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
if (!isMessageUndecryptable && !isChunkedMessage) {
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
// Message was discarded on decompression error
return;
}
}
const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build();
// Only a non-batched messages can be a chunk
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId);
if (optionalPayload) {
payload = optionalPayload.value();
} else {
return;
}
}
Message m(messageId, brokerEntryMetadata, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(getTopicPtr());
m.impl_->setRedeliveryCount(msg.redelivery_count());
if (metadata.has_schema_version()) {
m.impl_->setSchemaVersion(metadata.schema_version());
}
LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
<< metadata.has_num_messages_in_batch());
uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
if (this->ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer.");
increaseAvailablePermits(cnx, numOfMessageReceived);
return;
}
if (metadata.has_num_messages_in_batch()) {
BitSet::Data words(msg.ack_set_size());
for (int i = 0; i < words.size(); i++) {
words[i] = msg.ack_set(i);
}
BitSet ackSet{std::move(words)};
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count());
} else {
// try convert key value data.
m.impl_->convertPayloadToKeyValue(config_.getSchema());
const auto startMessageId = startMessageId_.get();
if (isPersistent_ && startMessageId &&
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
m.getMessageId().entryId() == startMessageId.value().entryId() &&
isPriorEntryIndex(m.getMessageId().entryId())) {
LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: "
<< startMessageId.value());
return;
}
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages_.put(m.getMessageId(), std::vector<Message>{m});
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({m.getMessageId()});
increaseAvailablePermits(cnx);
return;
}
}
executeNotifyCallback(m);
}
if (messageListener_) {
if (!messageListenerRunning_) {
return;
}
// Trigger message listener callback in a separate thread
while (numOfMessageReceived--) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
}
}
}