boost::optional ConsumerImpl::processMessageChunk()

in lib/ConsumerImpl.cc [422:496]


boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
                                                                const proto::MessageMetadata& metadata,
                                                                const proto::MessageIdData& messageIdData,
                                                                const ClientConnectionPtr& cnx,
                                                                MessageId& messageId) {
    const auto chunkId = metadata.chunk_id();
    const auto uuid = metadata.uuid();
    LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
                                                 << ", messageId: " << messageId << ") of "
                                                 << payload.readableBytes() << " bytes");

    Lock lock(chunkProcessMutex_);

    // Lazy task scheduling to expire incomplete chunk message
    bool expected = false;
    if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
        expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
        triggerCheckExpiredChunkedTimer();
    }

    auto it = chunkedMessageCache_.find(uuid);

    if (chunkId == 0 && it == chunkedMessageCache_.end()) {
        if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
            chunkedMessageCache_.removeOldestValues(
                chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
                [this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
                    for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
                        discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
                    }
                });
        }
        it = chunkedMessageCache_.putIfAbsent(
            uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
    }

    auto& chunkedMsgCtx = it->second;
    if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
        if (it == chunkedMessageCache_.end()) {
            LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
                                                           << ", messageId: " << messageId << ")");
        } else {
            LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
                      << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
            chunkedMessageCache_.remove(uuid);
        }
        lock.unlock();
        increaseAvailablePermits(cnx);
        trackMessage(messageId);
        return boost::none;
    }

    chunkedMsgCtx.appendChunk(messageId, payload);
    if (!chunkedMsgCtx.isCompleted()) {
        lock.unlock();
        increaseAvailablePermits(cnx);
        return boost::none;
    }

    ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
    chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
    chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
    messageId = chunkMsgId->build();

    LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
                                                    << ", sequenceId: " << metadata.sequence_id());

    auto wholePayload = chunkedMsgCtx.getBuffer();
    chunkedMessageCache_.remove(uuid);
    if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
        return wholePayload;
    } else {
        return boost::none;
    }
}