private SubConsumer CreateSubConsumer()

in src/DotPulsar/Internal/Consumer.cs [423:462]


    private SubConsumer<TMessage> CreateSubConsumer(string topic)
    {
        var correlationId = Guid.NewGuid();
        var consumerName = _consumerOptions.ConsumerName ?? $"Consumer-{correlationId:N}";

        var subscribe = new CommandSubscribe
        {
            ConsumerName = consumerName,
            InitialPosition = (CommandSubscribe.InitialPositionType) _consumerOptions.InitialPosition,
            PriorityLevel = _consumerOptions.PriorityLevel,
            ReadCompacted = _consumerOptions.ReadCompacted,
            ReplicateSubscriptionState = _consumerOptions.ReplicateSubscriptionState,
            Subscription = _consumerOptions.SubscriptionName,
            Topic = topic,
            Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType
        };

        if (_consumerOptions.Schema.SchemaInfo.Type != SchemaType.None)
            subscribe.Schema = _consumerOptions.Schema.SchemaInfo.PulsarSchema;

        foreach (var property in _consumerOptions.SubscriptionProperties)
        {
            var keyValue = new KeyValue { Key = property.Key, Value = property.Value };
            subscribe.SubscriptionProperties.Add(keyValue);
        }

        var messagePrefetchCount = _consumerOptions.MessagePrefetchCount;
        var messageFactory = new MessageFactory<TMessage>(_consumerOptions.Schema);
        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
        var decompressorFactories = CompressionFactories.DecompressorFactories();
        var consumerChannelFactory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic);
        var stateManager = CreateStateManager();
        var initialChannel = new NotReadyChannel<TMessage>();
        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
        var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic, _processManager, initialChannel, executor, stateManager, consumerChannelFactory);
        var process = new ConsumerProcess(correlationId, stateManager, subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
        _processManager.Add(process);
        process.Start();
        return subConsumer;
    }