src/DotPulsar/Internal/MessageProcessor.cs (174 lines of code) (raw):

/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace DotPulsar.Internal; using DotPulsar.Abstractions; using DotPulsar.Internal.Extensions; using Microsoft.Extensions.ObjectPool; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; public sealed class MessageProcessor<TMessage> : IDisposable { private readonly string _operationName; private readonly KeyValuePair<string, object?>[] _activityTags; private readonly KeyValuePair<string, object?>[] _meterTags; private readonly IConsumer<TMessage> _consumer; private readonly Func<IMessage<TMessage>, CancellationToken, ValueTask> _processor; private readonly LinkedList<Task> _processorTasks; private readonly ConcurrentQueue<ProcessInfo> _processingQueue; private readonly SemaphoreSlim _receiveLock; private readonly SemaphoreSlim _acknowledgeLock; private readonly ObjectPool<ProcessInfo> _processInfoPool; private readonly bool _linkTraces; private readonly bool _ensureOrderedAcknowledgment; private readonly int _maxDegreeOfParallelism; private readonly int _maxMessagesPerTask; private readonly TaskScheduler _taskScheduler; public MessageProcessor(IConsumer<TMessage> consumer, Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, ProcessingOptions options) { const string operation = "process"; _operationName = $"{consumer.Topic} {operation}"; _activityTags = new KeyValuePair<string, object?>[] { new KeyValuePair<string, object?>("messaging.destination", consumer.Topic), new KeyValuePair<string, object?>("messaging.destination_kind", "topic"), new KeyValuePair<string, object?>("messaging.operation", operation), new KeyValuePair<string, object?>("messaging.system", "pulsar"), new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl), new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName) }; _meterTags = new KeyValuePair<string, object?>[] { new KeyValuePair<string, object?>("topic", consumer.Topic), new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName) }; _consumer = consumer; _processor = processor; _processorTasks = new LinkedList<Task>(); _processingQueue = new ConcurrentQueue<ProcessInfo>(); _receiveLock = new SemaphoreSlim(1, 1); _acknowledgeLock = new SemaphoreSlim(1, 1); _processInfoPool = new DefaultObjectPool<ProcessInfo>(new DefaultPooledObjectPolicy<ProcessInfo>()); _linkTraces = options.LinkTraces; _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment; _maxDegreeOfParallelism = options.MaxDegreeOfParallelism; _maxMessagesPerTask = options.MaxMessagesPerTask; _taskScheduler = options.TaskScheduler; } public void Dispose() { _receiveLock.Dispose(); _acknowledgeLock.Dispose(); } public async ValueTask Process(CancellationToken cancellationToken) { for (var i = 1; i < _maxDegreeOfParallelism; ++i) { StartNewProcessorTask(cancellationToken); } while (true) { StartNewProcessorTask(cancellationToken); var completedTask = await Task.WhenAny(_processorTasks).ConfigureAwait(false); if (completedTask.IsFaulted) ExceptionDispatchInfo.Capture(completedTask.Exception!.InnerException!).Throw(); _processorTasks.Remove(completedTask); cancellationToken.ThrowIfCancellationRequested(); } } private async ValueTask Processor(CancellationToken cancellationToken) { var messagesProcessed = 0; var processInfo = new ProcessInfo(); var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment && _maxDegreeOfParallelism > 1; var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded; while (!cancellationToken.IsCancellationRequested) { if (needToEnsureOrderedAcknowledgement) { processInfo = _processInfoPool.Get(); await _receiveLock.WaitAsync(cancellationToken).ConfigureAwait(false); } var message = await _consumer.Receive(cancellationToken).ConfigureAwait(false); if (needToEnsureOrderedAcknowledgement) { processInfo.MessageId = message.MessageId; processInfo.IsProcessed = false; _processingQueue.Enqueue(processInfo); _receiveLock.Release(); } var activity = DotPulsarActivitySource.StartConsumerActivity(message, _operationName, _activityTags, _linkTraces); if (activity is not null && activity.IsAllDataRequested) { activity.SetMessageId(message.MessageId); activity.SetPayloadSize(message.Data.Length); activity.SetStatus(ActivityStatusCode.Ok); } var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0; try { await _processor(message, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { if (activity is not null && activity.IsAllDataRequested) activity.AddException(exception); } if (startTimestamp != 0) DotPulsarMeter.MessageProcessed(startTimestamp, _meterTags); activity?.Dispose(); if (needToEnsureOrderedAcknowledgement) { await _acknowledgeLock.WaitAsync(cancellationToken).ConfigureAwait(false); processInfo.IsProcessed = true; await AcknowledgeProcessedMessages(cancellationToken).ConfigureAwait(false); _acknowledgeLock.Release(); } else await _consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false); if (!isUnbounded && ++messagesProcessed == _maxMessagesPerTask) return; } } private async ValueTask AcknowledgeProcessedMessages(CancellationToken cancellationToken) { var messagesToAcknowledge = 0; var messageId = MessageId.Earliest; while (_processingQueue.TryPeek(out var processInfo)) { if (!processInfo.IsProcessed) break; ++messagesToAcknowledge; if (_processingQueue.TryDequeue(out processInfo)) { messageId = processInfo.MessageId; _processInfoPool.Return(processInfo); } } if (messagesToAcknowledge == 1) await _consumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false); else if (messagesToAcknowledge > 1) await _consumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false); } private void StartNewProcessorTask(CancellationToken cancellationToken) { var processorTask = Task.Factory.StartNew( async () => await Processor(cancellationToken).ConfigureAwait(false), cancellationToken, TaskCreationOptions.DenyChildAttach, _taskScheduler).Unwrap(); _processorTasks.AddLast(processorTask); } private sealed class ProcessInfo { public ProcessInfo() { MessageId = MessageId.Earliest; IsProcessed = false; } public MessageId MessageId { get; set; } public bool IsProcessed { get; set; } } }