boost::optional ConsumerImpl::processMessageChunk()

in lib/ConsumerImpl.cc [457:536]


boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
                                                                const proto::MessageMetadata& metadata,
                                                                const proto::MessageIdData& messageIdData,
                                                                const ClientConnectionPtr& cnx,
                                                                MessageId& messageId) {
    const auto chunkId = metadata.chunk_id();
    const auto uuid = metadata.uuid();
    LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
                                                 << ", messageId: " << messageId << ") of "
                                                 << payload.readableBytes() << " bytes");

    Lock lock(chunkProcessMutex_);

    // Lazy task scheduling to expire incomplete chunk message
    bool expected = false;
    if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
        expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
        triggerCheckExpiredChunkedTimer();
    }

    auto it = chunkedMessageCache_.find(uuid);

    if (chunkId == 0 && it == chunkedMessageCache_.end()) {
        if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
            chunkedMessageCache_.removeOldestValues(
                chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
                [this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
                    for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
                        discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
                    }
                });
        }
        it = chunkedMessageCache_.putIfAbsent(
            uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
    }

    auto& chunkedMsgCtx = it->second;
    if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
        auto startMessageId = startMessageId_.get().value_or(MessageId::earliest());
        if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() &&
            startMessageId.entryId() == messageId.entryId()) {
            // When the start message id is not inclusive, the last chunk of the previous chunked message will
            // be delivered, which is expected and we only need to filter it out.
            chunkedMessageCache_.remove(uuid);
            LOG_INFO("Filtered the chunked message before the start message id (uuid: "
                     << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
        } else if (it == chunkedMessageCache_.end()) {
            LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
                                                           << ", messageId: " << messageId << ")");
        } else {
            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
                      << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
            chunkedMessageCache_.remove(uuid);
        }
        lock.unlock();
        increaseAvailablePermits(cnx);
        trackMessage(messageId);
        return boost::none;
    }

    chunkedMsgCtx.appendChunk(messageId, payload);
    if (!chunkedMsgCtx.isCompleted()) {
        lock.unlock();
        increaseAvailablePermits(cnx);
        return boost::none;
    }

    messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();

    LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
                                                    << ", sequenceId: " << metadata.sequence_id());

    auto wholePayload = chunkedMsgCtx.getBuffer();
    chunkedMessageCache_.remove(uuid);
    if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
        return wholePayload;
    } else {
        return boost::none;
    }
}