in src/DotPulsar/Internal/Reader.cs [92:142]
private async Task Monitor()
{
_numberOfPartitions = Convert.ToInt32(await _connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
_isPartitionedTopic = _numberOfPartitions != 0;
var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
_receiveTasks = new Task<IMessage<TMessage>>[numberOfSubReaders];
_subReaders = new SubReader<TMessage>[numberOfSubReaders];
var monitoringTasks = new Task<ReaderState>[numberOfSubReaders];
var states = new ReaderState[numberOfSubReaders];
_subReaderIndex = _isPartitionedTopic ? -1 : 0;
for (var i = 0; i < numberOfSubReaders; i++)
{
_receiveTasks[i] = _emptyTaskCompletionSource.Task;
var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : Topic;
_subReaders[i] = CreateSubReader(topicName);
monitoringTasks[i] = _subReaders[i].State.OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
}
_allSubReadersAreReady = true;
_semaphoreSlim.Release();
while (true)
{
await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
for (var i = 0; i < numberOfSubReaders; ++i)
{
var task = monitoringTasks[i];
if (!task.IsCompleted)
continue;
var state = task.Result;
states[i] = state;
monitoringTasks[i] = _subReaders[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (!_isPartitionedTopic)
_state.SetState(states[0]);
else if (states.Any(x => x == ReaderState.Faulted))
_state.SetState(ReaderState.Faulted);
else if (states.All(x => x == ReaderState.Connected))
_state.SetState(ReaderState.Connected);
else if (states.All(x => x == ReaderState.ReachedEndOfTopic))
_state.SetState(ReaderState.ReachedEndOfTopic);
else if (states.All(x => x == ReaderState.Disconnected))
_state.SetState(ReaderState.Disconnected);
else
_state.SetState(ReaderState.PartiallyConnected);
}
}