private async Task Monitor()

in src/DotPulsar/Internal/Producer.cs [106:154]


    private async Task Monitor()
    {
        var numberOfPartitions = await _connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
        var isPartitionedTopic = numberOfPartitions != 0;
        var numberOfSubProducers = isPartitionedTopic ? numberOfPartitions : 1;
        var monitoringTasks = new Task<ProducerState>[numberOfSubProducers];
        var states = new ProducerState[numberOfSubProducers];

        for (var i = 0; i < numberOfSubProducers; ++i)
        {
            var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) : Topic;
            var producer = CreateSubProducer(topicName, isPartitionedTopic ? i : -1);
            _ = _producers.TryAdd(i, producer);
            monitoringTasks[i] = producer.State.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
        }

        Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);

        while (true)
        {
            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);

            for (var i = 0; i < monitoringTasks.Length; ++i)
            {
                var task = monitoringTasks[i];
                if (!task.IsCompleted)
                    continue;

                var state = task.Result;
                states[i] = state;
                monitoringTasks[i] = _producers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
            }

            if (!isPartitionedTopic)
                _state.SetState(states[0]);
            else if (states.Any(x => x == ProducerState.Faulted))
                _state.SetState(ProducerState.Faulted);
            else if (states.Any(x => x == ProducerState.Fenced))
                _state.SetState(ProducerState.Fenced);
            else if (states.All(x => x == ProducerState.Connected))
                _state.SetState(ProducerState.Connected);
            else if (states.All(x => x == ProducerState.Disconnected))
                _state.SetState(ProducerState.Disconnected);
            else if (states.Any(x => x == ProducerState.Disconnected))
                _state.SetState(ProducerState.PartiallyConnected);
            else
                _state.SetState(ProducerState.WaitingForExclusive);
        }
    }