private async Task Monitor()

in src/DotPulsar/Internal/Producer.cs [112:180]


    private async Task Monitor()
    {
        var numberOfPartitions = await GetNumberOfPartitions(_cts.Token).ConfigureAwait(false);
        var isPartitionedTopic = numberOfPartitions != 0;
        var monitoringTasks = new Task<ProducerState>[isPartitionedTopic ? numberOfPartitions : 1];

        var topic = Topic;

        for (var partition = 0; partition < monitoringTasks.Length; ++partition)
        {
            if (isPartitionedTopic)
                topic = $"{Topic}-partition-{partition}";

            var producer = CreateSubProducer(topic, isPartitionedTopic ? partition : -1);
            _ = _producers.TryAdd(partition, producer);
            monitoringTasks[partition] = producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
        }

        Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);

        var connectedProducers = 0;
        var waitingForExclusive = new bool[isPartitionedTopic ? numberOfPartitions : 1];

        while (true)
        {
            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);

            for (var i = 0; i < monitoringTasks.Length; ++i)
            {
                var task = monitoringTasks[i];
                if (!task.IsCompleted)
                    continue;

                var state = task.Result;
                switch (state)
                {
                    case ProducerState.Connected:
                        if (waitingForExclusive[i])
                            waitingForExclusive[i] = false;
                        else
                            ++connectedProducers;
                        break;
                    case ProducerState.Disconnected:
                        --connectedProducers;
                        waitingForExclusive[i] = false;
                        break;
                    case ProducerState.WaitingForExclusive:
                        ++connectedProducers;
                        waitingForExclusive[i] = true;
                        break;
                    case ProducerState.Fenced:
                    case ProducerState.Faulted:
                        _state.SetState(state);
                        return;
                }

                monitoringTasks[i] = _producers[i].OnStateChangeFrom(state, _cts.Token).AsTask();
            }

            if (connectedProducers == 0)
                _state.SetState(ProducerState.Disconnected);
            else if (connectedProducers == monitoringTasks.Length && waitingForExclusive.All(x => x != true))
                _state.SetState(ProducerState.Connected);
            else if (waitingForExclusive.Any(x => x))
                _state.SetState(ProducerState.WaitingForExclusive);
            else
                _state.SetState(ProducerState.PartiallyConnected);
        }
    }