in src/DotPulsar/Internal/Reader.cs [268:299]
private static StateManager<ReaderState> CreateStateManager()
=> new(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
private SubReader<TMessage> CreateSubReader(string topic)
{
var correlationId = Guid.NewGuid();
var subscription = _readerOptions.SubscriptionName is not null ? _readerOptions.SubscriptionName : $"{_readerOptions.SubscriptionRolePrefix}-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = _readerOptions.ReaderName ?? subscription,
Durable = false,
ReadCompacted = _readerOptions.ReadCompacted,
StartMessageId = _readerOptions.StartMessageId.ToMessageIdData(),
Subscription = subscription,
Topic = topic
};
var messagePrefetchCount = _readerOptions.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(_readerOptions.Schema);
var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var subReader = new SubReader<TMessage>(correlationId, ServiceUrl, topic, _processManager, initialChannel, executor, stateManager, factory);
var process = new ReaderProcess(correlationId, stateManager, subReader);
_processManager.Add(process);
process.Start();
return subReader;
}