in src/DotPulsar/PulsarClient.cs [132:162]
public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var subscription = $"Reader-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = options.ReaderName ?? subscription,
Durable = false,
ReadCompacted = options.ReadCompacted,
StartMessageId = options.StartMessageId.ToMessageIdData(),
Subscription = subscription,
Topic = options.Topic
};
var messagePrefetchCount = options.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(options.Schema);
var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
var process = new ReaderProcess(correlationId, stateManager, reader);
_processManager.Add(process);
process.Start();
return reader;
}