in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [65:94]
public KafkaListener(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
KafkaListenerConfiguration kafkaListenerConfiguration,
bool requiresKey,
IDeserializer<TValue> valueDeserializer,
IDeserializer<TKey> keyDeserializer,
ILogger logger,
string functionId,
IDrainModeManager drainModeManager)
{
this.ValueDeserializer = valueDeserializer;
this.KeyDeserializer = keyDeserializer;
this.executor = executor;
this.singleDispatch = singleDispatch;
this.options = options;
this.listenerConfiguration = kafkaListenerConfiguration;
this.requiresKey = requiresKey;
this.logger = logger;
this.listenerCancellationTokenSource = new CancellationTokenSource();
this.consumerGroup = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup;
this.topicName = this.listenerConfiguration.Topic;
this.functionId = functionId;
this.drainModeManager = drainModeManager;
this.consumer = new Lazy<IConsumer<TKey, TValue>>(() => CreateConsumer());
this.metricsProvider = new Lazy<KafkaMetricsProvider<TKey, TValue>>(CreateMetricsProvider);
this.topicScaler = new Lazy<KafkaGenericTopicScaler<TKey, TValue>>(CreateTopicScaler);
this.targetScaler = new Lazy<KafkaGenericTargetScaler<TKey, TValue>>(CreateTargetScaler);
}