private static StateManager CreateStateManager()

in src/DotPulsar/Internal/Reader.cs [268:299]


    private static StateManager<ReaderState> CreateStateManager()
        => new(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);

    private SubReader<TMessage> CreateSubReader(string topic)
    {
        var correlationId = Guid.NewGuid();

        var subscription = _readerOptions.SubscriptionName is not null ? _readerOptions.SubscriptionName : $"{_readerOptions.SubscriptionRolePrefix}-{correlationId:N}";

        var subscribe = new CommandSubscribe
        {
            ConsumerName = _readerOptions.ReaderName ?? subscription,
            Durable = false,
            ReadCompacted = _readerOptions.ReadCompacted,
            StartMessageId = _readerOptions.StartMessageId.ToMessageIdData(),
            Subscription = subscription,
            Topic = topic
        };
        var messagePrefetchCount = _readerOptions.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(_readerOptions.Schema);
        var batchHandler = new BatchHandler<TMessage>(false, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic);
        var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var subReader = new SubReader<TMessage>(correlationId, ServiceUrl, topic, _processManager, initialChannel, executor, stateManager, factory);
        var process = new ReaderProcess(correlationId, stateManager, subReader);
        _processManager.Add(process);
        process.Start();
        return subReader;
    }