in src/DotPulsar/Internal/Consumer.cs [423:462]
private SubConsumer<TMessage> CreateSubConsumer(string topic)
{
var correlationId = Guid.NewGuid();
var consumerName = _consumerOptions.ConsumerName ?? $"Consumer-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = consumerName,
InitialPosition = (CommandSubscribe.InitialPositionType) _consumerOptions.InitialPosition,
PriorityLevel = _consumerOptions.PriorityLevel,
ReadCompacted = _consumerOptions.ReadCompacted,
ReplicateSubscriptionState = _consumerOptions.ReplicateSubscriptionState,
Subscription = _consumerOptions.SubscriptionName,
Topic = topic,
Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType
};
if (_consumerOptions.Schema.SchemaInfo.Type != SchemaType.None)
subscribe.Schema = _consumerOptions.Schema.SchemaInfo.PulsarSchema;
foreach (var property in _consumerOptions.SubscriptionProperties)
{
var keyValue = new KeyValue { Key = property.Key, Value = property.Value };
subscribe.SubscriptionProperties.Add(keyValue);
}
var messagePrefetchCount = _consumerOptions.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(_consumerOptions.Schema);
var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var consumerChannelFactory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic);
var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic, _processManager, initialChannel, executor, stateManager, consumerChannelFactory);
var process = new ConsumerProcess(correlationId, stateManager, subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
return subConsumer;
}