in src/DotPulsar/Internal/Producer.cs [106:154]
private async Task Monitor()
{
var numberOfPartitions = await _connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
var isPartitionedTopic = numberOfPartitions != 0;
var numberOfSubProducers = isPartitionedTopic ? numberOfPartitions : 1;
var monitoringTasks = new Task<ProducerState>[numberOfSubProducers];
var states = new ProducerState[numberOfSubProducers];
for (var i = 0; i < numberOfSubProducers; ++i)
{
var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) : Topic;
var producer = CreateSubProducer(topicName, isPartitionedTopic ? i : -1);
_ = _producers.TryAdd(i, producer);
monitoringTasks[i] = producer.State.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
}
Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
for (var i = 0; i < monitoringTasks.Length; ++i)
{
var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var state = task.Result;
states[i] = state;
monitoringTasks[i] = _producers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (!isPartitionedTopic)
_state.SetState(states[0]);
else if (states.Any(x => x == ProducerState.Faulted))
_state.SetState(ProducerState.Faulted);
else if (states.Any(x => x == ProducerState.Fenced))
_state.SetState(ProducerState.Fenced);
else if (states.All(x => x == ProducerState.Connected))
_state.SetState(ProducerState.Connected);
else if (states.All(x => x == ProducerState.Disconnected))
_state.SetState(ProducerState.Disconnected);
else if (states.Any(x => x == ProducerState.Disconnected))
_state.SetState(ProducerState.PartiallyConnected);
else
_state.SetState(ProducerState.WaitingForExclusive);
}
}