in lib/Commands.cc [837:891]
uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
unsigned long maxMessageSizeInBytes) {
const auto& msgMetadata = msg.impl_->metadata;
SingleMessageMetadata metadata;
if (msgMetadata.has_partition_key()) {
metadata.set_partition_key(msgMetadata.partition_key());
}
if (msgMetadata.has_ordering_key()) {
metadata.set_ordering_key(msgMetadata.ordering_key());
}
metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
for (int i = 0; i < msgMetadata.properties_size(); i++) {
auto keyValue = proto::KeyValue().New();
*keyValue = msgMetadata.properties(i);
metadata.mutable_properties()->AddAllocated(keyValue);
}
if (msgMetadata.has_event_time()) {
metadata.set_event_time(msgMetadata.event_time());
}
if (msgMetadata.has_sequence_id()) {
metadata.set_sequence_id(msgMetadata.sequence_id());
}
// Format of batch message
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
int payloadSize = msg.impl_->payload.readableBytes();
metadata.set_payload_size(payloadSize);
int msgMetadataSize = metadata.ByteSize();
unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {
LOG_DEBUG("remaining size of batchPayLoad buffer ["
<< batchPayLoad.writableBytes() << "] can't accomodate new payload [" << requiredSpace
<< "] - expanding the batchPayload buffer");
uint32_t new_size =
std::min(batchPayLoad.readableBytes() * 2, static_cast<uint32_t>(maxMessageSizeInBytes));
new_size = std::max(new_size, batchPayLoad.readableBytes() + static_cast<uint32_t>(requiredSpace));
SharedBuffer buffer = SharedBuffer::allocate(new_size);
// Adding batch created so far
buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
batchPayLoad = buffer;
}
// Adding the new message
batchPayLoad.writeUnsignedInt(msgMetadataSize);
metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
batchPayLoad.bytesWritten(msgMetadataSize);
batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
return msgMetadata.sequence_id();
}