uint64_t Commands::serializeSingleMessageInBatchWithPayload()

in lib/Commands.cc [837:891]


uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
                                                            unsigned long maxMessageSizeInBytes) {
    const auto& msgMetadata = msg.impl_->metadata;
    SingleMessageMetadata metadata;
    if (msgMetadata.has_partition_key()) {
        metadata.set_partition_key(msgMetadata.partition_key());
    }
    if (msgMetadata.has_ordering_key()) {
        metadata.set_ordering_key(msgMetadata.ordering_key());
    }

    metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
    for (int i = 0; i < msgMetadata.properties_size(); i++) {
        auto keyValue = proto::KeyValue().New();
        *keyValue = msgMetadata.properties(i);
        metadata.mutable_properties()->AddAllocated(keyValue);
    }

    if (msgMetadata.has_event_time()) {
        metadata.set_event_time(msgMetadata.event_time());
    }

    if (msgMetadata.has_sequence_id()) {
        metadata.set_sequence_id(msgMetadata.sequence_id());
    }

    // Format of batch message
    // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]

    int payloadSize = msg.impl_->payload.readableBytes();
    metadata.set_payload_size(payloadSize);

    int msgMetadataSize = metadata.ByteSize();

    unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
    if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {
        LOG_DEBUG("remaining size of batchPayLoad buffer ["
                  << batchPayLoad.writableBytes() << "] can't accomodate new payload [" << requiredSpace
                  << "] - expanding the batchPayload buffer");
        uint32_t new_size =
            std::min(batchPayLoad.readableBytes() * 2, static_cast<uint32_t>(maxMessageSizeInBytes));
        new_size = std::max(new_size, batchPayLoad.readableBytes() + static_cast<uint32_t>(requiredSpace));
        SharedBuffer buffer = SharedBuffer::allocate(new_size);
        // Adding batch created so far
        buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
        batchPayLoad = buffer;
    }
    // Adding the new message
    batchPayLoad.writeUnsignedInt(msgMetadataSize);
    metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
    batchPayLoad.bytesWritten(msgMetadataSize);
    batchPayLoad.write(msg.impl_->payload.data(), payloadSize);

    return msgMetadata.sequence_id();
}