public async Task Send()

in src/DotPulsar/Internal/ProducerChannel.cs [62:110]


    public async Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, TaskCompletionSource<BaseCommand> responseTcs, CancellationToken cancellationToken)
    {
        var sendPackage = _sendPackagePool.Get();
        var resetSchema = false;
        var resetCompression = false;

        try
        {
            metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
            metadata.ProducerName = _name;

            if (metadata.SchemaVersion is null && _schemaVersion is not null)
            {
                metadata.SchemaVersion = _schemaVersion;
                resetSchema = true;
            }

            sendPackage.Command ??= new CommandSend { ProducerId = _id, NumMessages = 1 };

            sendPackage.Command.SequenceId = metadata.SequenceId;
            sendPackage.Metadata = metadata;

            if (_compressorFactory is null || metadata.Compression != CompressionType.None)
                sendPackage.Payload = payload;
            else
            {
                sendPackage.Metadata.Compression = _compressorFactory.CompressionType;
                sendPackage.Metadata.UncompressedSize = (uint) payload.Length;
                using var compressor = _compressorFactory.Create();
                sendPackage.Payload = compressor.Compress(payload);
                resetCompression = true;
            }

            await _connection.Send(sendPackage, responseTcs, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            if (resetSchema)
                metadata.SchemaVersion = null;

            if (resetCompression)
            {
                metadata.Compression = CompressionType.None;
                metadata.UncompressedSize = 0;
            }

            _sendPackagePool.Return(sendPackage);
        }
    }