in src/DotPulsar/Internal/Producer.cs [226:315]
public async ValueTask Enqueue(
MessageMetadata metadata,
TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default,
CancellationToken cancellationToken = default)
=> await InternalSend(metadata, message, false, null, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false);
private async ValueTask InternalSend(
MessageMetadata metadata,
TMessage message,
bool sendOpCancelable,
TaskCompletionSource<MessageId>? tcs = default,
Func<MessageId, ValueTask>? onMessageSent = default,
Action<Exception>? onFailed = default,
CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
var autoAssignSequenceId = metadata.SequenceId == 0;
if (autoAssignSequenceId)
metadata.SequenceId = _sequenceId.FetchNext();
var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _activityTags);
if (activity is not null && _attachTraceInfoToMessages)
{
metadata[Constants.TraceParent] = activity.Id;
if (activity.TraceStateString is not null)
metadata[Constants.TraceState] = activity.TraceStateString;
}
var startTimestamp = DotPulsarMeter.MessageSentEnabled ? Stopwatch.GetTimestamp() : 0;
try
{
var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
var subProducer = _producers[partition];
var data = _options.Schema.Encode(message);
tcs ??= new TaskCompletionSource<MessageId>();
var sendOp = new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None);
await subProducer.Send(sendOp, cancellationToken).ConfigureAwait(false);
_ = tcs.Task.ContinueWith(async task =>
{
if (startTimestamp != 0)
DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
if (task.IsFaulted || task.IsCanceled)
{
Exception exception = task.IsCanceled ? new OperationCanceledException() : task.Exception!;
FailActivity(exception, activity);
if (autoAssignSequenceId)
metadata.SequenceId = 0;
try
{
onFailed?.Invoke(exception);
}
catch
{
// Ignore
}
return;
}
CompleteActivity(task.Result, data.Length, activity);
try
{
if (onMessageSent is not null)
await onMessageSent.Invoke(task.Result).ConfigureAwait(false);
}
catch (Exception)
{
// ignored
}
}, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception exception)
{
FailActivity(exception, activity);
if (autoAssignSequenceId)
metadata.SequenceId = 0;
throw;
}
}