in src/DotPulsar/Internal/Consumer.cs [103:188]
private async Task Monitor()
{
var userDefinedTopics = new List<string>(_consumerOptions.Topics);
if (!string.IsNullOrEmpty(_consumerOptions.Topic))
userDefinedTopics.Add(_consumerOptions.Topic);
var topics = new List<string>();
foreach (var topic in userDefinedTopics)
{
var numberOfPartitions = await _connectionPool.GetNumberOfPartitions(topic, _cts.Token).ConfigureAwait(false);
if (numberOfPartitions == 0)
{
topics.Add(topic);
continue;
}
for (var i = 0; i < numberOfPartitions; ++i)
{
topics.Add(GetPartitionedTopicName(topic, i));
}
}
var pattern = _consumerOptions.TopicsPattern;
if (pattern is not null)
{
var mode = (CommandGetTopicsOfNamespace.Mode) _consumerOptions.RegexSubscriptionMode;
var foundTopics = await _connectionPool.GetTopicsOfNamespace(mode, pattern, _cts.Token).ConfigureAwait(false);
topics.AddRange(foundTopics);
if (topics.Count == 0)
throw new TopicNotFoundException($"No topics were found using the pattern '{pattern}'");
}
_numberOfSubConsumers = topics.Count;
var monitoringTasks = new Task<ConsumerStateChanged>[_numberOfSubConsumers];
var states = new ConsumerState[_numberOfSubConsumers];
for (var i = 0; i < _numberOfSubConsumers; ++i)
{
var topic = topics[i];
var subConsumer = CreateSubConsumer(topic);
_subConsumers[topic] = subConsumer;
monitoringTasks[i] = subConsumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask();
}
if (_numberOfSubConsumers == 1)
_singleSubConsumer = _subConsumers.First().Value;
_receiveEnumerator = _subConsumers.GetEnumerator();
_allSubConsumersAreReady = true;
_semaphoreSlim.Release();
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
for (var i = 0; i < _numberOfSubConsumers; ++i)
{
var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var consumerStateChanged = task.Result;
var state = consumerStateChanged.ConsumerState;
states[i] = state;
monitoringTasks[i] = consumerStateChanged.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
}
if (_singleSubConsumer is not null)
_state.SetState(states[0]);
else if (states.Any(x => x == ConsumerState.Faulted))
_state.SetState(ConsumerState.Faulted);
else if (states.All(x => x == ConsumerState.Active))
_state.SetState(ConsumerState.Active);
else if (states.All(x => x == ConsumerState.Inactive))
_state.SetState(ConsumerState.Inactive);
else if (states.All(x => x == ConsumerState.ReachedEndOfTopic))
_state.SetState(ConsumerState.ReachedEndOfTopic);
else if (states.All(x => x == ConsumerState.Disconnected))
_state.SetState(ConsumerState.Disconnected);
else if (states.Any(x => x == ConsumerState.Disconnected))
_state.SetState(ConsumerState.PartiallyConnected);
else
_state.SetState(ConsumerState.Inactive);
}
}