in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaGenericTopicScaler.cs [24:43]
internal KafkaGenericTopicScaler(string topic, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, KafkaMetricsProvider<TKey, TValue> metricsProvider, long lagThreshold, ILogger logger)
{
if (string.IsNullOrWhiteSpace(topic))
{
throw new ArgumentException("Invalid topic", nameof(topic));
}
if (string.IsNullOrWhiteSpace(consumerGroup))
{
throw new ArgumentException("Invalid consumer group", nameof(consumerGroup));
}
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
topicName = topic;
Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{topicName}-{consumerGroup}".ToLower(), functionId);
this.consumerGroup = consumerGroup;
this.lagThreshold = lagThreshold;
this.metricsProvider = metricsProvider;
this.logger.LogInformation($"Started Topic scaler - topic name: {topicName}, consumerGroup {consumerGroup}, functionID: {functionId}, lagThreshold: {lagThreshold}.");
}