in src/DotPulsar/Internal/Producer.cs [55:92]
public Producer(
Uri serviceUrl,
ProducerOptions<TMessage> options,
ProcessManager processManager,
IHandleException exceptionHandler,
IConnectionPool connectionPool,
ICompressorFactory? compressorFactory)
{
_operationName = $"{options.Topic} send";
_activityTags = new KeyValuePair<string, object?>[]
{
new("messaging.destination", options.Topic),
new("messaging.destination_kind", "topic"),
new("messaging.system", "pulsar"),
new("messaging.url", serviceUrl),
};
_meterTags = new KeyValuePair<string, object?>[]
{
new("topic", options.Topic)
};
_attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
_sequenceId = new SequenceId(options.InitialSequenceId);
_state = CreateStateManager();
ServiceUrl = serviceUrl;
Topic = options.Topic;
_isDisposed = 0;
_options = options;
_exceptionHandler = exceptionHandler;
_connectionPool = connectionPool;
_compressorFactory = compressorFactory;
_processManager = processManager;
_messageRouter = options.MessageRouter;
_cts = new CancellationTokenSource();
_executor = new Executor(Guid.Empty, this, _exceptionHandler);
_producers = new ConcurrentDictionary<int, SubProducer>();
SendChannel = new SendChannel<TMessage>(this);
_ = Setup();
}