public IConsumer CreateConsumer()

in src/DotPulsar/PulsarClient.cs [88:127]


    public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
    {
        ThrowIfDisposed();

        var correlationId = Guid.NewGuid();
        var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}";
        var subscribe = new CommandSubscribe
        {
            ConsumerName = consumerName,
            InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
            PriorityLevel = options.PriorityLevel,
            ReadCompacted = options.ReadCompacted,
            ReplicateSubscriptionState = options.ReplicateSubscriptionState,
            Subscription = options.SubscriptionName,
            Topic = options.Topic,
            Type = (CommandSubscribe.SubType) options.SubscriptionType
        };

        foreach (var property in options.SubscriptionProperties)
        {
            var keyValue = new KeyValue { Key = property.Key, Value = property.Value };
            subscribe.SubscriptionProperties.Add(keyValue);
        }

        var messagePrefetchCount = options.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(options.Schema);
        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
        var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
        if (options.StateChangedHandler is not null)
            _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
        var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
        _processManager.Add(process);
        process.Start();
        return consumer;
    }