public async ValueTask Enqueue()

in src/DotPulsar/Internal/Producer.cs [226:315]


    public async ValueTask Enqueue(
        MessageMetadata metadata,
        TMessage message,
        Func<MessageId, ValueTask>? onMessageSent = default,
        CancellationToken cancellationToken = default)
        => await InternalSend(metadata, message, false, null, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false);

    private async ValueTask InternalSend(
        MessageMetadata metadata,
        TMessage message,
        bool sendOpCancelable,
        TaskCompletionSource<MessageId>? tcs = default,
        Func<MessageId, ValueTask>? onMessageSent = default,
        Action<Exception>? onFailed = default,
        CancellationToken cancellationToken = default)
    {
        ThrowIfDisposed();

        var autoAssignSequenceId = metadata.SequenceId == 0;
        if (autoAssignSequenceId)
            metadata.SequenceId = _sequenceId.FetchNext();

        var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _activityTags);
        if (activity is not null && _attachTraceInfoToMessages)
        {
            metadata[Constants.TraceParent] = activity.Id;
            if (activity.TraceStateString is not null)
                metadata[Constants.TraceState] = activity.TraceStateString;
        }

        var startTimestamp = DotPulsarMeter.MessageSentEnabled ? Stopwatch.GetTimestamp() : 0;

        try
        {
            var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
            var subProducer = _producers[partition];
            var data = _options.Schema.Encode(message);

            tcs ??= new TaskCompletionSource<MessageId>();
            var sendOp = new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None);
            await subProducer.Send(sendOp, cancellationToken).ConfigureAwait(false);

            _ = tcs.Task.ContinueWith(async task =>
            {
                if (startTimestamp != 0)
                    DotPulsarMeter.MessageSent(startTimestamp, _meterTags);

                if (task.IsFaulted || task.IsCanceled)
                {
                    Exception exception = task.IsCanceled ? new OperationCanceledException() : task.Exception!;
                    FailActivity(exception, activity);

                    if (autoAssignSequenceId)
                        metadata.SequenceId = 0;

                    try
                    {
                        onFailed?.Invoke(exception);
                    }
                    catch
                    {
                        // Ignore
                    }

                    return;
                }

                CompleteActivity(task.Result, data.Length, activity);

                try
                {
                    if (onMessageSent is not null)
                        await onMessageSent.Invoke(task.Result).ConfigureAwait(false);
                }
                catch (Exception)
                {
                    // ignored
                }
            }, CancellationToken.None).ConfigureAwait(false);
        }
        catch (Exception exception)
        {
            FailActivity(exception, activity);

            if (autoAssignSequenceId)
                metadata.SequenceId = 0;

            throw;
        }
    }