in src/DotPulsar/Internal/ProducerChannel.cs [62:110]
public async Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, TaskCompletionSource<BaseCommand> responseTcs, CancellationToken cancellationToken)
{
var sendPackage = _sendPackagePool.Get();
var resetSchema = false;
var resetCompression = false;
try
{
metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
metadata.ProducerName = _name;
if (metadata.SchemaVersion is null && _schemaVersion is not null)
{
metadata.SchemaVersion = _schemaVersion;
resetSchema = true;
}
sendPackage.Command ??= new CommandSend { ProducerId = _id, NumMessages = 1 };
sendPackage.Command.SequenceId = metadata.SequenceId;
sendPackage.Metadata = metadata;
if (_compressorFactory is null || metadata.Compression != CompressionType.None)
sendPackage.Payload = payload;
else
{
sendPackage.Metadata.Compression = _compressorFactory.CompressionType;
sendPackage.Metadata.UncompressedSize = (uint) payload.Length;
using var compressor = _compressorFactory.Create();
sendPackage.Payload = compressor.Compress(payload);
resetCompression = true;
}
await _connection.Send(sendPackage, responseTcs, cancellationToken).ConfigureAwait(false);
}
finally
{
if (resetSchema)
metadata.SchemaVersion = null;
if (resetCompression)
{
metadata.Compression = CompressionType.None;
metadata.UncompressedSize = 0;
}
_sendPackagePool.Return(sendPackage);
}
}