in src/DotPulsar/Internal/ConsumerChannel.cs [40:72]
public ConsumerChannel(
ulong id,
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
IConnection connection,
BatchHandler<TMessage> batchHandler,
IMessageFactory<TMessage> messageFactory,
IEnumerable<IDecompressorFactory> decompressorFactories)
{
_id = id;
_queue = queue;
_connection = connection;
_batchHandler = batchHandler;
_messageFactory = messageFactory;
_decompressors = new IDecompress[5];
foreach (var decompressorFactory in decompressorFactories)
{
_decompressors[(int) decompressorFactory.CompressionType] = decompressorFactory.Create();
}
_lock = new AsyncLock();
_cachedCommandFlow = new CommandFlow
{
ConsumerId = id,
MessagePermits = messagePrefetchCount
};
_sendWhenZero = 0;
_firstFlow = true;
}