private async Task Monitor()

in src/DotPulsar/Internal/Consumer.cs [103:188]


    private async Task Monitor()
    {
        var userDefinedTopics = new List<string>(_consumerOptions.Topics);

        if (!string.IsNullOrEmpty(_consumerOptions.Topic))
            userDefinedTopics.Add(_consumerOptions.Topic);

        var topics = new List<string>();
        foreach (var topic in userDefinedTopics)
        {
            var numberOfPartitions = await _connectionPool.GetNumberOfPartitions(topic, _cts.Token).ConfigureAwait(false);
            if (numberOfPartitions == 0)
            {
                topics.Add(topic);
                continue;
            }

            for (var i = 0; i < numberOfPartitions; ++i)
            {
                topics.Add(GetPartitionedTopicName(topic, i));
            }
        }

        var pattern = _consumerOptions.TopicsPattern;
        if (pattern is not null)
        {
            var mode = (CommandGetTopicsOfNamespace.Mode) _consumerOptions.RegexSubscriptionMode;
            var foundTopics = await _connectionPool.GetTopicsOfNamespace(mode, pattern, _cts.Token).ConfigureAwait(false);
            topics.AddRange(foundTopics);
            if (topics.Count == 0)
                throw new TopicNotFoundException($"No topics were found using the pattern '{pattern}'");
        }

        _numberOfSubConsumers = topics.Count;
        var monitoringTasks = new Task<ConsumerStateChanged>[_numberOfSubConsumers];
        var states = new ConsumerState[_numberOfSubConsumers];

        for (var i = 0; i < _numberOfSubConsumers; ++i)
        {
            var topic = topics[i];
            var subConsumer = CreateSubConsumer(topic);
            _subConsumers[topic] = subConsumer;
            monitoringTasks[i] = subConsumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask();
        }

        if (_numberOfSubConsumers == 1)
            _singleSubConsumer = _subConsumers.First().Value;

        _receiveEnumerator = _subConsumers.GetEnumerator();
        _allSubConsumersAreReady = true;
        _semaphoreSlim.Release();

        while (true)
        {
            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);

            for (var i = 0; i < _numberOfSubConsumers; ++i)
            {
                var task = monitoringTasks[i];
                if (!task.IsCompleted)
                    continue;

                var consumerStateChanged = task.Result;
                var state = consumerStateChanged.ConsumerState;
                states[i] = state;
                monitoringTasks[i] = consumerStateChanged.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
            }

            if (_singleSubConsumer is not null)
                _state.SetState(states[0]);
            else if (states.Any(x => x == ConsumerState.Faulted))
                _state.SetState(ConsumerState.Faulted);
            else if (states.All(x => x == ConsumerState.Active))
                _state.SetState(ConsumerState.Active);
            else if (states.All(x => x == ConsumerState.Inactive))
                _state.SetState(ConsumerState.Inactive);
            else if (states.All(x => x == ConsumerState.ReachedEndOfTopic))
                _state.SetState(ConsumerState.ReachedEndOfTopic);
            else if (states.All(x => x == ConsumerState.Disconnected))
                _state.SetState(ConsumerState.Disconnected);
            else if (states.Any(x => x == ConsumerState.Disconnected))
                _state.SetState(ConsumerState.PartiallyConnected);
            else
                _state.SetState(ConsumerState.Inactive);
        }
    }