src/DotPulsar/Internal/Consumer.cs (392 lines of code) (raw):

/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace DotPulsar.Internal; using DotPulsar.Abstractions; using DotPulsar.Exceptions; using DotPulsar.Extensions; using DotPulsar.Internal.Abstractions; using DotPulsar.Internal.Compression; using DotPulsar.Internal.PulsarApi; public sealed class Consumer<TMessage> : IConsumer<TMessage> { private readonly IConnectionPool _connectionPool; private readonly ProcessManager _processManager; private readonly StateManager<ConsumerState> _state; private readonly ConsumerOptions<TMessage> _consumerOptions; private readonly CancellationTokenSource _cts; private readonly IHandleException _exceptionHandler; private readonly Executor _executor; private readonly SemaphoreSlim _semaphoreSlim; private readonly AsyncLock _lock; private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers; private readonly LinkedList<Task<IMessage<TMessage>>> _receiveTasks; private Dictionary<string, SubConsumer<TMessage>>.Enumerator _receiveEnumerator; private SubConsumer<TMessage>? _singleSubConsumer; private bool _allSubConsumersAreReady; private int _isDisposed; private int _numberOfSubConsumers; private Exception? _faultException; public Uri ServiceUrl { get; } public string SubscriptionName { get; } public SubscriptionType SubscriptionType { get; } public string Topic { get; } public IState<ConsumerState> State => _state; public Consumer( Uri serviceUrl, ConsumerOptions<TMessage> consumerOptions, IConnectionPool connectionPool, IHandleException exceptionHandler) { _lock = new AsyncLock(); _state = CreateStateManager(); ServiceUrl = serviceUrl; SubscriptionName = consumerOptions.SubscriptionName; SubscriptionType = consumerOptions.SubscriptionType; if (!string.IsNullOrEmpty(consumerOptions.Topic)) Topic = consumerOptions.Topic; else if (consumerOptions.TopicsPattern is not null) Topic = consumerOptions.TopicsPattern.ToString(); else Topic = string.Join(",", consumerOptions.Topics); _receiveTasks = []; _cts = new CancellationTokenSource(); _exceptionHandler = exceptionHandler; _semaphoreSlim = new SemaphoreSlim(1); _processManager = new ProcessManager(); _executor = new Executor(Guid.Empty, _processManager, _exceptionHandler); _consumerOptions = consumerOptions; _connectionPool = connectionPool; _exceptionHandler = exceptionHandler; _allSubConsumersAreReady = false; _isDisposed = 0; _subConsumers = []; _receiveEnumerator = _subConsumers.GetEnumerator(); _singleSubConsumer = null; _ = Setup(); } private async Task Setup() { try { await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false); await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false); } catch (Exception exception) { if (_cts.IsCancellationRequested) return; _faultException = exception; _state.SetState(ConsumerState.Faulted); _semaphoreSlim.Release(); } } private async Task Monitor() { var userDefinedTopics = new List<string>(_consumerOptions.Topics); if (!string.IsNullOrEmpty(_consumerOptions.Topic)) userDefinedTopics.Add(_consumerOptions.Topic); var topics = new List<string>(); foreach (var topic in userDefinedTopics) { var numberOfPartitions = await _connectionPool.GetNumberOfPartitions(topic, _cts.Token).ConfigureAwait(false); if (numberOfPartitions == 0) { topics.Add(topic); continue; } for (var i = 0; i < numberOfPartitions; ++i) { topics.Add(GetPartitionedTopicName(topic, i)); } } var pattern = _consumerOptions.TopicsPattern; if (pattern is not null) { var mode = (CommandGetTopicsOfNamespace.Mode) _consumerOptions.RegexSubscriptionMode; var foundTopics = await _connectionPool.GetTopicsOfNamespace(mode, pattern, _cts.Token).ConfigureAwait(false); topics.AddRange(foundTopics); if (topics.Count == 0) throw new TopicNotFoundException($"No topics were found using the pattern '{pattern}'"); } _numberOfSubConsumers = topics.Count; var monitoringTasks = new Task<ConsumerStateChanged>[_numberOfSubConsumers]; var states = new ConsumerState[_numberOfSubConsumers]; for (var i = 0; i < _numberOfSubConsumers; ++i) { var topic = topics[i]; var subConsumer = CreateSubConsumer(topic); _subConsumers[topic] = subConsumer; monitoringTasks[i] = subConsumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask(); } if (_numberOfSubConsumers == 1) _singleSubConsumer = _subConsumers.First().Value; _receiveEnumerator = _subConsumers.GetEnumerator(); _allSubConsumersAreReady = true; _semaphoreSlim.Release(); while (true) { await Task.WhenAny(monitoringTasks).ConfigureAwait(false); for (var i = 0; i < _numberOfSubConsumers; ++i) { var task = monitoringTasks[i]; if (!task.IsCompleted) continue; var consumerStateChanged = task.Result; var state = consumerStateChanged.ConsumerState; states[i] = state; monitoringTasks[i] = consumerStateChanged.Consumer.StateChangedFrom(state, _cts.Token).AsTask(); } if (_singleSubConsumer is not null) _state.SetState(states[0]); else if (states.Any(x => x == ConsumerState.Faulted)) _state.SetState(ConsumerState.Faulted); else if (states.All(x => x == ConsumerState.Active)) _state.SetState(ConsumerState.Active); else if (states.All(x => x == ConsumerState.Inactive)) _state.SetState(ConsumerState.Inactive); else if (states.All(x => x == ConsumerState.ReachedEndOfTopic)) _state.SetState(ConsumerState.ReachedEndOfTopic); else if (states.All(x => x == ConsumerState.Disconnected)) _state.SetState(ConsumerState.Disconnected); else if (states.Any(x => x == ConsumerState.Disconnected)) _state.SetState(ConsumerState.PartiallyConnected); else _state.SetState(ConsumerState.Inactive); } } public async ValueTask DisposeAsync() { if (Interlocked.Exchange(ref _isDisposed, 1) != 0) return; _cts.Cancel(); _cts.Dispose(); await _processManager.DisposeAsync().ConfigureAwait(false); foreach (var subConsumer in _subConsumers) { await subConsumer.Value.DisposeAsync().ConfigureAwait(false); } await _lock.DisposeAsync().ConfigureAwait(false); _state.SetState(ConsumerState.Closed); } public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) return await _singleSubConsumer.Receive(cancellationToken).ConfigureAwait(false); using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { while (true) { var receiveTaskNode = _receiveTasks.First; while (receiveTaskNode is not null) { if (receiveTaskNode.Value.IsCompleted) { _receiveTasks.Remove(receiveTaskNode); return receiveTaskNode.Value.Result; } receiveTaskNode = receiveTaskNode.Next; } for (var i = 0; i < _numberOfSubConsumers; ++i) { if (!_receiveEnumerator.MoveNext()) { _receiveEnumerator = _subConsumers.GetEnumerator(); _receiveEnumerator.MoveNext(); } var subConsumer = _receiveEnumerator.Current.Value; var receiveTask = subConsumer.Receive(_cts.Token); if (receiveTask.IsCompleted) return receiveTask.Result; _receiveTasks.AddLast(receiveTask.AsTask()); } var tcs = new TaskCompletionSource<IMessage<TMessage>>(); using var registration = cancellationToken.Register(() => tcs.TrySetCanceled()); _receiveTasks.AddLast(tcs.Task); await Task.WhenAny(_receiveTasks).ConfigureAwait(false); _receiveTasks.RemoveLast(); cancellationToken.ThrowIfCancellationRequested(); } } } public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) await _singleSubConsumer.Acknowledge(messageId, cancellationToken).ConfigureAwait(false); else await _subConsumers[messageId.Topic].Acknowledge(messageId, cancellationToken).ConfigureAwait(false); } public async ValueTask Acknowledge(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.Acknowledge(messageIds, cancellationToken).ConfigureAwait(false); return; } var groupedMessageIds = messageIds.GroupBy(messageIds => messageIds.Topic); var acknowledgeTasks = new List<Task>(); foreach (var group in groupedMessageIds) { acknowledgeTasks.Add(_subConsumers[group.Key].Acknowledge(group, cancellationToken).AsTask()); } await Task.WhenAll(acknowledgeTasks).ConfigureAwait(false); } public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) await _singleSubConsumer.AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false); else await _subConsumers[messageId.Topic].AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false); } public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false); return; } var groupedMessageIds = messageIds.GroupBy(messageIds => messageIds.Topic); var redeliverTasks = new List<Task>(); foreach (var group in groupedMessageIds) { redeliverTasks.Add(_subConsumers[group.Key].RedeliverUnacknowledgedMessages(group, cancellationToken).AsTask()); } await Task.WhenAll(redeliverTasks).ConfigureAwait(false); } public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false); return; } var redeliverTasks = new List<Task>(_numberOfSubConsumers); foreach (var subConsumer in _subConsumers) { redeliverTasks.Add(subConsumer.Value.RedeliverUnacknowledgedMessages(cancellationToken).AsTask()); } await Task.WhenAll(redeliverTasks).ConfigureAwait(false); } public async ValueTask Unsubscribe(CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.Unsubscribe(cancellationToken).ConfigureAwait(false); } else { var unsubscribeTasks = new List<Task>(_numberOfSubConsumers); foreach (var subConsumer in _subConsumers) { var getLastMessageIdTask = subConsumer.Value.Unsubscribe(cancellationToken); unsubscribeTasks.Add(getLastMessageIdTask.AsTask()); } await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false); } _state.SetState(ConsumerState.Unsubscribed); } public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.Seek(messageId, cancellationToken).ConfigureAwait(false); return; } var seekTasks = new List<Task>(_numberOfSubConsumers); foreach (var subConsumer in _subConsumers) { var getLastMessageIdTask = subConsumer.Value.Seek(messageId, cancellationToken); seekTasks.Add(getLastMessageIdTask.AsTask()); } await Task.WhenAll(seekTasks).ConfigureAwait(false); } public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) { await _singleSubConsumer.Seek(publishTime, cancellationToken).ConfigureAwait(false); return; } var seekTasks = new List<Task>(_numberOfSubConsumers); foreach (var subConsumer in _subConsumers) { var getLastMessageIdTask = subConsumer.Value.Seek(publishTime, cancellationToken); seekTasks.Add(getLastMessageIdTask.AsTask()); } await Task.WhenAll(seekTasks).ConfigureAwait(false); } public async ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationToken cancellationToken) { await Guard(cancellationToken).ConfigureAwait(false); if (_singleSubConsumer is not null) return [await _singleSubConsumer.GetLastMessageId(cancellationToken).ConfigureAwait(false)]; var getLastMessageIdsTasks = new List<Task<MessageId>>(_numberOfSubConsumers); foreach (var subConsumer in _subConsumers) { var getLastMessageIdTask = subConsumer.Value.GetLastMessageId(cancellationToken); getLastMessageIdsTasks.Add(getLastMessageIdTask.AsTask()); } //await all of the tasks. await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false); //collect MessageIds var messageIds = new List<MessageId>(); for (var i = 0; i < _subConsumers.Count; i++) { messageIds.Add(getLastMessageIdsTasks[i].Result); } return messageIds; } private SubConsumer<TMessage> CreateSubConsumer(string topic) { var correlationId = Guid.NewGuid(); var consumerName = _consumerOptions.ConsumerName ?? $"Consumer-{correlationId:N}"; var subscribe = new CommandSubscribe { ConsumerName = consumerName, InitialPosition = (CommandSubscribe.InitialPositionType) _consumerOptions.InitialPosition, PriorityLevel = _consumerOptions.PriorityLevel, ReadCompacted = _consumerOptions.ReadCompacted, ReplicateSubscriptionState = _consumerOptions.ReplicateSubscriptionState, Subscription = _consumerOptions.SubscriptionName, Topic = topic, Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType }; if (_consumerOptions.Schema.SchemaInfo.Type != SchemaType.None) subscribe.Schema = _consumerOptions.Schema.SchemaInfo.PulsarSchema; foreach (var property in _consumerOptions.SubscriptionProperties) { var keyValue = new KeyValue { Key = property.Key, Value = property.Value }; subscribe.SubscriptionProperties.Add(keyValue); } var messagePrefetchCount = _consumerOptions.MessagePrefetchCount; var messageFactory = new MessageFactory<TMessage>(_consumerOptions.Schema); var batchHandler = new BatchHandler<TMessage>(true, messageFactory); var decompressorFactories = CompressionFactories.DecompressorFactories(); var consumerChannelFactory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic); var stateManager = CreateStateManager(); var initialChannel = new NotReadyChannel<TMessage>(); var executor = new Executor(correlationId, _processManager, _exceptionHandler); var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic, _processManager, initialChannel, executor, stateManager, consumerChannelFactory); var process = new ConsumerProcess(correlationId, stateManager, subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover); _processManager.Add(process); process.Start(); return subConsumer; } private static string GetPartitionedTopicName(string topic, int partitionNumber) => $"{topic}-partition-{partitionNumber}"; private static StateManager<ConsumerState> CreateStateManager() => new(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); private async Task Guard(CancellationToken cancellationToken) { if (!_allSubConsumersAreReady) { await _semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); _semaphoreSlim.Release(); } if (_faultException is not null) throw new ConsumerFaultedException(_faultException); } }