func serializeMessage()

in pulsar/internal/commands.go [252:327]


func serializeMessage(wb Buffer,
	cmdSend *pb.BaseCommand,
	msgMetadata *pb.MessageMetadata,
	payload Buffer,
	compressionProvider compression.Provider,
	encryptor crypto.Encryptor,
	maxMessageSize uint32,
	doCompress bool) error {
	// Wire format
	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

	// compress the payload
	var compressedPayload []byte
	if doCompress {
		compressedPayload = compressionProvider.Compress(nil, payload.ReadableSlice())
	} else {
		compressedPayload = payload.ReadableSlice()
	}

	// encrypt the compressed payload
	encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
	if err != nil {
		// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
		return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
	}

	cmdSize := uint32(proto.Size(cmdSend))
	msgMetadataSize := uint32(proto.Size(msgMetadata))
	msgSize := len(encryptedPayload) + int(msgMetadataSize)

	// the maxMessageSize check of batching message is in here
	if !(msgMetadata.GetTotalChunkMsgSize() != 0) && msgSize > int(maxMessageSize) {
		return fmt.Errorf("%w, size: %d, MaxMessageSize: %d",
			ErrExceedMaxMessageSize, msgSize, maxMessageSize)
	}

	frameSizeIdx := wb.WriterIndex()
	wb.WriteUint32(0) // Skip frame size until we now the size
	frameStartIdx := wb.WriterIndex()

	// Write cmd
	wb.WriteUint32(cmdSize)
	wb.ResizeIfNeeded(cmdSize)
	err = MarshalToSizedBuffer(cmdSend, wb.WritableSlice()[:cmdSize])
	if err != nil {
		panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
	}
	wb.WrittenBytes(cmdSize)

	// Create checksum placeholder
	wb.WriteUint16(magicCrc32c)
	checksumIdx := wb.WriterIndex()
	wb.WriteUint32(0) // skip 4 bytes of checksum

	// Write metadata
	metadataStartIdx := wb.WriterIndex()
	wb.WriteUint32(msgMetadataSize)
	wb.ResizeIfNeeded(msgMetadataSize)
	err = MarshalToSizedBuffer(msgMetadata, wb.WritableSlice()[:msgMetadataSize])
	if err != nil {
		panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err))
	}
	wb.WrittenBytes(msgMetadataSize)

	// add payload to the buffer
	wb.Write(encryptedPayload)

	// Write checksum at created checksum-placeholder
	frameEndIdx := wb.WriterIndex()
	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))

	// Set Sizes and checksum in the fixed-size header
	wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
	wb.PutUint32(checksum, checksumIdx)
	return nil
}