void ClientConnection::processIncomingBuffer()

in lib/ClientConnection.cc [633:753]


void ClientConnection::processIncomingBuffer() {
    // Process all the available frames from the incoming buffer
    while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) {
        // Extract message frames from incoming buffer
        // At this point we have at least 4 bytes in the buffer
        uint32_t frameSize = incomingBuffer_.readUnsignedInt();

        if (frameSize > incomingBuffer_.readableBytes()) {
            // We don't have the entire frame yet
            const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes();

            // Rollback the reading of frameSize (when the frame will be complete,
            // we'll read it again
            incomingBuffer_.rollback(sizeof(uint32_t));

            if (bytesToReceive <= incomingBuffer_.writableBytes()) {
                // The rest of the frame still fits in the current buffer
                asyncReceive(incomingBuffer_.asio_buffer(),
                             customAllocReadHandler(std::bind(&ClientConnection::handleRead,
                                                              shared_from_this(), std::placeholders::_1,
                                                              std::placeholders::_2, bytesToReceive)));
                return;
            } else {
                // Need to allocate a buffer big enough for the frame
                uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t));
                incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);

                asyncReceive(incomingBuffer_.asio_buffer(),
                             customAllocReadHandler(std::bind(&ClientConnection::handleRead,
                                                              shared_from_this(), std::placeholders::_1,
                                                              std::placeholders::_2, bytesToReceive)));
                return;
            }
        }

        // At this point,  we have at least one complete frame available in the buffer
        uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
        proto::BaseCommand incomingCmd;
        if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
            LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
            close();
            return;
        }

        incomingBuffer_.consume(cmdSize);

        if (incomingCmd.type() == BaseCommand::MESSAGE) {
            // Parse message metadata and extract payload
            proto::MessageMetadata msgMetadata;
            proto::BrokerEntryMetadata brokerEntryMetadata;

            // read checksum
            uint32_t remainingBytes = frameSize - (cmdSize + 4);

            auto readerIndex = incomingBuffer_.readerIndex();
            if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
                // broker entry metadata is present
                uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
                if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
                    LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()
                                         << ", message ledger id "
                                         << incomingCmd.message().message_id().ledgerid() << ", entry id "
                                         << incomingCmd.message().message_id().entryid()
                                         << "] Error parsing broker entry metadata");
                    close();
                    return;
                }
                incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
                remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
            } else {
                incomingBuffer_.setReaderIndex(readerIndex);
            }

            bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);

            uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
            if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
                LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()  //
                                     << ", message ledger id "
                                     << incomingCmd.message().message_id().ledgerid()  //
                                     << ", entry id " << incomingCmd.message().message_id().entryid()
                                     << "] Error parsing message metadata");
                close();
                return;
            }

            incomingBuffer_.consume(metadataSize);
            remainingBytes -= (4 + metadataSize);

            uint32_t payloadSize = remainingBytes;
            SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
            incomingBuffer_.consume(payloadSize);
            handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
                                  payload);
        } else {
            handleIncomingCommand(incomingCmd);
        }
    }
    if (incomingBuffer_.readableBytes() > 0) {
        // We still have 1 to 3 bytes from the next frame
        assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));

        // Restart with a new buffer and copy the few bytes at the beginning
        incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);

        // At least we need to read 4 bytes to have the complete frame size
        uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();

        asyncReceive(
            incomingBuffer_.asio_buffer(),
            customAllocReadHandler(std::bind(&ClientConnection::handleRead, shared_from_this(),
                                             std::placeholders::_1, std::placeholders::_2, minReadSize)));
        return;
    }

    // We have read everything we had in the buffer
    // Rollback the indexes to reuse the same buffer
    incomingBuffer_.reset();

    readNextCommand();
}