in pulsar/internal/commands.go [252:327]
func serializeMessage(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
payload Buffer,
compressionProvider compression.Provider,
encryptor crypto.Encryptor,
maxMessageSize uint32,
doCompress bool) error {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
// compress the payload
var compressedPayload []byte
if doCompress {
compressedPayload = compressionProvider.Compress(nil, payload.ReadableSlice())
} else {
compressedPayload = payload.ReadableSlice()
}
// encrypt the compressed payload
encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
if err != nil {
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error: %w", err)
}
cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))
msgSize := len(encryptedPayload) + int(msgMetadataSize)
// the maxMessageSize check of batching message is in here
if !(msgMetadata.GetTotalChunkMsgSize() != 0) && msgSize > int(maxMessageSize) {
return fmt.Errorf("%w, size: %d, MaxMessageSize: %d",
ErrExceedMaxMessageSize, msgSize, maxMessageSize)
}
frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()
// Write cmd
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
err = MarshalToSizedBuffer(cmdSend, wb.WritableSlice()[:cmdSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
}
wb.WrittenBytes(cmdSize)
// Create checksum placeholder
wb.WriteUint16(magicCrc32c)
checksumIdx := wb.WriterIndex()
wb.WriteUint32(0) // skip 4 bytes of checksum
// Write metadata
metadataStartIdx := wb.WriterIndex()
wb.WriteUint32(msgMetadataSize)
wb.ResizeIfNeeded(msgMetadataSize)
err = MarshalToSizedBuffer(msgMetadata, wb.WritableSlice()[:msgMetadataSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err))
}
wb.WrittenBytes(msgMetadataSize)
// add payload to the buffer
wb.Write(encryptedPayload)
// Write checksum at created checksum-placeholder
frameEndIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
// Set Sizes and checksum in the fixed-size header
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
return nil
}