in src/DotPulsar/Internal/Consumer.cs [50:83]
public Consumer(
Uri serviceUrl,
ConsumerOptions<TMessage> consumerOptions,
IConnectionPool connectionPool,
IHandleException exceptionHandler)
{
_lock = new AsyncLock();
_state = CreateStateManager();
ServiceUrl = serviceUrl;
SubscriptionName = consumerOptions.SubscriptionName;
SubscriptionType = consumerOptions.SubscriptionType;
if (!string.IsNullOrEmpty(consumerOptions.Topic))
Topic = consumerOptions.Topic;
else if (consumerOptions.TopicsPattern is not null)
Topic = consumerOptions.TopicsPattern.ToString();
else
Topic = string.Join(",", consumerOptions.Topics);
_receiveTasks = [];
_cts = new CancellationTokenSource();
_exceptionHandler = exceptionHandler;
_semaphoreSlim = new SemaphoreSlim(1);
_processManager = new ProcessManager();
_executor = new Executor(Guid.Empty, _processManager, _exceptionHandler);
_consumerOptions = consumerOptions;
_connectionPool = connectionPool;
_exceptionHandler = exceptionHandler;
_allSubConsumersAreReady = false;
_isDisposed = 0;
_subConsumers = [];
_receiveEnumerator = _subConsumers.GetEnumerator();
_singleSubConsumer = null;
_ = Setup();
}