private async Task Monitor()

in src/DotPulsar/Internal/Reader.cs [92:142]


    private async Task Monitor()
    {
        _numberOfPartitions = Convert.ToInt32(await _connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
        _isPartitionedTopic = _numberOfPartitions != 0;
        var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
        _receiveTasks = new Task<IMessage<TMessage>>[numberOfSubReaders];
        _subReaders = new SubReader<TMessage>[numberOfSubReaders];
        var monitoringTasks = new Task<ReaderState>[numberOfSubReaders];
        var states = new ReaderState[numberOfSubReaders];
        _subReaderIndex = _isPartitionedTopic ? -1 : 0;

        for (var i = 0; i < numberOfSubReaders; i++)
        {
            _receiveTasks[i] = _emptyTaskCompletionSource.Task;
            var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : Topic;
            _subReaders[i] = CreateSubReader(topicName);
            monitoringTasks[i] = _subReaders[i].State.OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
        }

        _allSubReadersAreReady = true;
        _semaphoreSlim.Release();

        while (true)
        {
            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);

            for (var i = 0; i < numberOfSubReaders; ++i)
            {
                var task = monitoringTasks[i];
                if (!task.IsCompleted)
                    continue;

                var state = task.Result;
                states[i] = state;
                monitoringTasks[i] = _subReaders[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
            }

            if (!_isPartitionedTopic)
                _state.SetState(states[0]);
            else if (states.Any(x => x == ReaderState.Faulted))
                _state.SetState(ReaderState.Faulted);
            else if (states.All(x => x == ReaderState.Connected))
                _state.SetState(ReaderState.Connected);
            else if (states.All(x => x == ReaderState.ReachedEndOfTopic))
                _state.SetState(ReaderState.ReachedEndOfTopic);
            else if (states.All(x => x == ReaderState.Disconnected))
                _state.SetState(ReaderState.Disconnected);
            else
                _state.SetState(ReaderState.PartiallyConnected);
        }
    }