in lib/Commands.cc [196:266]
PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
send->set_producer_id(producerId);
send->set_sequence_id(sequenceId);
if (metadata.has_num_messages_in_batch()) {
send->set_num_messages(metadata.num_messages_in_batch());
}
if (metadata.has_chunk_id()) {
send->set_is_chunk(true);
}
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
int cmdSize = cmd.ByteSize();
int msgMetadataSize = metadata.ByteSize();
int payloadSize = payload.readableBytes();
int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0;
bool includeChecksum = magicAndChecksumLength > 0;
int headerContentSize =
4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength +
// checksumSize + msgMetadataLength + msgMetadataSize
int totalSize = headerContentSize + payloadSize;
int checksumReaderIndex = -1;
headers.reset();
assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize + headerLength
headers.writeUnsignedInt(totalSize); // External frame
// Write cmd
headers.writeUnsignedInt(cmdSize);
cmd.SerializeToArray(headers.mutableData(), cmdSize);
headers.bytesWritten(cmdSize);
// Create checksum placeholder
if (includeChecksum) {
headers.writeUnsignedShort(magicCrc32c);
checksumReaderIndex = headers.writerIndex();
headers.skipBytes(checksumSize); // skip 4 bytes of checksum
}
// Write metadata
headers.writeUnsignedInt(msgMetadataSize);
metadata.SerializeToArray(headers.mutableData(), msgMetadataSize);
headers.bytesWritten(msgMetadataSize);
PairSharedBuffer composite;
composite.set(0, headers);
composite.set(1, payload);
// Write checksum at created checksum-placeholder
if (includeChecksum) {
int writeIndex = headers.writerIndex();
int metadataStartIndex = checksumReaderIndex + checksumSize;
uint32_t metadataChecksum =
computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
uint32_t computedChecksum =
computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
// set computed checksum
headers.setWriterIndex(checksumReaderIndex);
headers.writeUnsignedInt(computedChecksum);
headers.setWriterIndex(writeIndex);
}
cmd.clear_send();
return composite;
}