PairSharedBuffer Commands::newSend()

in lib/Commands.cc [194:272]


PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand& cmd, ChecksumType checksumType,
                                   const SendArguments& args) {
    cmd.set_type(BaseCommand::SEND);
    CommandSend* send = cmd.mutable_send();
    send->set_producer_id(args.producerId);
    send->set_sequence_id(args.sequenceId);
    const auto& metadata = args.metadata;
    if (metadata.has_num_messages_in_batch()) {
        send->set_num_messages(metadata.num_messages_in_batch());
    }
    if (metadata.has_chunk_id()) {
        send->set_is_chunk(true);
    }

    // / Wire format
    // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

    int cmdSize = cmd.ByteSizeLong();
    int msgMetadataSize = metadata.ByteSizeLong();
    const auto& payload = args.payload;
    int payloadSize = payload.readableBytes();

    int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0;
    bool includeChecksum = magicAndChecksumLength > 0;
    int headerContentSize =
        4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize;  // cmdLength + cmdSize + magicLength +
    // checksumSize + msgMetadataLength + msgMetadataSize
    int totalSize = headerContentSize + payloadSize;
    int checksumReaderIndex = -1;

    // By default, headers refers a static buffer whose capacity is 64KB, which can be reused for headers to
    // avoid frequent memory allocation. However, if users configure many properties, the size could be great
    // that results a buffer overflow. In this case, we can only allocate a new larger buffer.
    originalHeaders.reset();
    auto headers = originalHeaders;
    if (headers.writableBytes() < (4 /* header length */ + headerContentSize)) {
        headers = SharedBuffer::allocate(4 + headerContentSize);
    }

    headers.writeUnsignedInt(totalSize);  // External frame

    // Write cmd
    headers.writeUnsignedInt(cmdSize);
    cmd.SerializeToArray(headers.mutableData(), cmdSize);
    headers.bytesWritten(cmdSize);

    // Create checksum placeholder
    if (includeChecksum) {
        headers.writeUnsignedShort(magicCrc32c);
        checksumReaderIndex = headers.writerIndex();
        headers.skipBytes(checksumSize);  // skip 4 bytes of checksum
    }

    // Write metadata
    headers.writeUnsignedInt(msgMetadataSize);
    metadata.SerializeToArray(headers.mutableData(), msgMetadataSize);
    headers.bytesWritten(msgMetadataSize);

    PairSharedBuffer composite;
    composite.set(0, headers);
    composite.set(1, payload);

    // Write checksum at created checksum-placeholder
    if (includeChecksum) {
        int writeIndex = headers.writerIndex();
        int metadataStartIndex = checksumReaderIndex + checksumSize;
        uint32_t metadataChecksum =
            computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
        uint32_t computedChecksum =
            computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
        // set computed checksum
        headers.setWriterIndex(checksumReaderIndex);
        headers.writeUnsignedInt(computedChecksum);
        headers.setWriterIndex(writeIndex);
    }

    cmd.clear_send();
    return composite;
}