public IReader CreateReader()

in src/DotPulsar/PulsarClient.cs [132:162]


    public IReader<TMessage> CreateReader<TMessage>(ReaderOptions<TMessage> options)
    {
        ThrowIfDisposed();

        var correlationId = Guid.NewGuid();
        var subscription = $"Reader-{correlationId:N}";
        var subscribe = new CommandSubscribe
        {
            ConsumerName = options.ReaderName ?? subscription,
            Durable = false,
            ReadCompacted = options.ReadCompacted,
            StartMessageId = options.StartMessageId.ToMessageIdData(),
            Subscription = subscription,
            Topic = options.Topic
        };
        var messagePrefetchCount = options.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(options.Schema);
        var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
        var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
        if (options.StateChangedHandler is not null)
            _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
        var process = new ReaderProcess(correlationId, stateManager, reader);
        _processManager.Add(process);
        process.Start();
        return reader;
    }