in src/DotPulsar/Internal/Producer.cs [112:180]
private async Task Monitor()
{
var numberOfPartitions = await GetNumberOfPartitions(_cts.Token).ConfigureAwait(false);
var isPartitionedTopic = numberOfPartitions != 0;
var monitoringTasks = new Task<ProducerState>[isPartitionedTopic ? numberOfPartitions : 1];
var topic = Topic;
for (var partition = 0; partition < monitoringTasks.Length; ++partition)
{
if (isPartitionedTopic)
topic = $"{Topic}-partition-{partition}";
var producer = CreateSubProducer(topic, isPartitionedTopic ? partition : -1);
_ = _producers.TryAdd(partition, producer);
monitoringTasks[partition] = producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
}
Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
var connectedProducers = 0;
var waitingForExclusive = new bool[isPartitionedTopic ? numberOfPartitions : 1];
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
for (var i = 0; i < monitoringTasks.Length; ++i)
{
var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var state = task.Result;
switch (state)
{
case ProducerState.Connected:
if (waitingForExclusive[i])
waitingForExclusive[i] = false;
else
++connectedProducers;
break;
case ProducerState.Disconnected:
--connectedProducers;
waitingForExclusive[i] = false;
break;
case ProducerState.WaitingForExclusive:
++connectedProducers;
waitingForExclusive[i] = true;
break;
case ProducerState.Fenced:
case ProducerState.Faulted:
_state.SetState(state);
return;
}
monitoringTasks[i] = _producers[i].OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (connectedProducers == 0)
_state.SetState(ProducerState.Disconnected);
else if (connectedProducers == monitoringTasks.Length && waitingForExclusive.All(x => x != true))
_state.SetState(ProducerState.Connected);
else if (waitingForExclusive.Any(x => x))
_state.SetState(ProducerState.WaitingForExclusive);
else
_state.SetState(ProducerState.PartiallyConnected);
}
}