in src/DotPulsar/PulsarClient.cs [88:127]
public IConsumer<TMessage> CreateConsumer<TMessage>(ConsumerOptions<TMessage> options)
{
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}";
var subscribe = new CommandSubscribe
{
ConsumerName = consumerName,
InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
PriorityLevel = options.PriorityLevel,
ReadCompacted = options.ReadCompacted,
ReplicateSubscriptionState = options.ReplicateSubscriptionState,
Subscription = options.SubscriptionName,
Topic = options.Topic,
Type = (CommandSubscribe.SubType) options.SubscriptionType
};
foreach (var property in options.SubscriptionProperties)
{
var keyValue = new KeyValue { Key = property.Key, Value = property.Value };
subscribe.SubscriptionProperties.Add(keyValue);
}
var messagePrefetchCount = options.MessagePrefetchCount;
var messageFactory = new MessageFactory<TMessage>(options.Schema);
var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
var decompressorFactories = CompressionFactories.DecompressorFactories();
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
var process = new ConsumerProcess(correlationId, stateManager, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
return consumer;
}