void ConsumerImpl::messageReceived()

in lib/ConsumerImpl.cc [538:641]


void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
                                   bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
                                   proto::MessageMetadata& metadata, SharedBuffer& payload) {
    LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());

    if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
        // Message was discarded or not consumed due to decryption failure
        return;
    }

    if (!isChecksumValid) {
        // Message discarded for checksum error
        discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
        return;
    }

    auto redeliveryCount = msg.redelivery_count();
    const bool isMessageUndecryptable =
        metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
        config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;

    const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
    if (!isMessageUndecryptable && !isChunkedMessage) {
        if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
            // Message was discarded on decompression error
            return;
        }
    }

    const auto& messageIdData = msg.message_id();
    auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build();

    // Only a non-batched messages can be a chunk
    if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
        auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId);
        if (optionalPayload) {
            payload = optionalPayload.value();
        } else {
            return;
        }
    }

    Message m(messageId, brokerEntryMetadata, metadata, payload);
    m.impl_->cnx_ = cnx.get();
    m.impl_->setTopicName(getTopicPtr());
    m.impl_->setRedeliveryCount(msg.redelivery_count());

    if (metadata.has_schema_version()) {
        m.impl_->setSchemaVersion(metadata.schema_version());
    }

    LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
    LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
                        << metadata.has_num_messages_in_batch());

    uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
    if (this->ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
        LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer.");
        increaseAvailablePermits(cnx, numOfMessageReceived);
        return;
    }

    if (metadata.has_num_messages_in_batch()) {
        BitSet::Data words(msg.ack_set_size());
        for (int i = 0; i < words.size(); i++) {
            words[i] = msg.ack_set(i);
        }
        BitSet ackSet{std::move(words)};
        Lock lock(mutex_);
        numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count());
    } else {
        // try convert key value data.
        m.impl_->convertPayloadToKeyValue(config_.getSchema());

        const auto startMessageId = startMessageId_.get();
        if (isPersistent_ && startMessageId &&
            m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
            m.getMessageId().entryId() == startMessageId.value().entryId() &&
            isPriorEntryIndex(m.getMessageId().entryId())) {
            LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: "
                                << startMessageId.value());
            return;
        }
        if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
            possibleSendToDeadLetterTopicMessages_.put(m.getMessageId(), std::vector<Message>{m});
            if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
                redeliverUnacknowledgedMessages({m.getMessageId()});
                increaseAvailablePermits(cnx);
                return;
            }
        }
        executeNotifyCallback(m);
    }

    if (messageListener_) {
        if (!messageListenerRunning_) {
            return;
        }
        // Trigger message listener callback in a separate thread
        while (numOfMessageReceived--) {
            listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
        }
    }
}