in lib/Commands.cc [194:272]
PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand& cmd, ChecksumType checksumType,
const SendArguments& args) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
send->set_producer_id(args.producerId);
send->set_sequence_id(args.sequenceId);
const auto& metadata = args.metadata;
if (metadata.has_num_messages_in_batch()) {
send->set_num_messages(metadata.num_messages_in_batch());
}
if (metadata.has_chunk_id()) {
send->set_is_chunk(true);
}
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
int cmdSize = cmd.ByteSizeLong();
int msgMetadataSize = metadata.ByteSizeLong();
const auto& payload = args.payload;
int payloadSize = payload.readableBytes();
int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0;
bool includeChecksum = magicAndChecksumLength > 0;
int headerContentSize =
4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength +
// checksumSize + msgMetadataLength + msgMetadataSize
int totalSize = headerContentSize + payloadSize;
int checksumReaderIndex = -1;
// By default, headers refers a static buffer whose capacity is 64KB, which can be reused for headers to
// avoid frequent memory allocation. However, if users configure many properties, the size could be great
// that results a buffer overflow. In this case, we can only allocate a new larger buffer.
originalHeaders.reset();
auto headers = originalHeaders;
if (headers.writableBytes() < (4 /* header length */ + headerContentSize)) {
headers = SharedBuffer::allocate(4 + headerContentSize);
}
headers.writeUnsignedInt(totalSize); // External frame
// Write cmd
headers.writeUnsignedInt(cmdSize);
cmd.SerializeToArray(headers.mutableData(), cmdSize);
headers.bytesWritten(cmdSize);
// Create checksum placeholder
if (includeChecksum) {
headers.writeUnsignedShort(magicCrc32c);
checksumReaderIndex = headers.writerIndex();
headers.skipBytes(checksumSize); // skip 4 bytes of checksum
}
// Write metadata
headers.writeUnsignedInt(msgMetadataSize);
metadata.SerializeToArray(headers.mutableData(), msgMetadataSize);
headers.bytesWritten(msgMetadataSize);
PairSharedBuffer composite;
composite.set(0, headers);
composite.set(1, payload);
// Write checksum at created checksum-placeholder
if (includeChecksum) {
int writeIndex = headers.writerIndex();
int metadataStartIndex = checksumReaderIndex + checksumSize;
uint32_t metadataChecksum =
computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
uint32_t computedChecksum =
computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
// set computed checksum
headers.setWriterIndex(checksumReaderIndex);
headers.writeUnsignedInt(computedChecksum);
headers.setWriterIndex(writeIndex);
}
cmd.clear_send();
return composite;
}