in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaMetricsProvider.cs [37:64]
public virtual Task<KafkaTriggerMetrics> GetMetricsAsync()
{
var allPartitions = topicPartitions.Value;
if (allPartitions == null)
{
return Task.FromResult(new KafkaTriggerMetrics(0L, 0));
}
var operationTimeout = TimeSpan.FromSeconds(5);
long totalLag = 0;
try
{
totalLag = GetTotalLag(allPartitions, operationTimeout);
}
catch (Exception ex)
{
logger.LogError(ex, $"Failed to retrieve lag from topic '{this.topicName}'");
}
int paritionCount = allPartitions.Count;
var metrics = new KafkaTriggerMetrics(totalLag, paritionCount);
// Storing the metrics along with TimeStamp.
this.LastCalculatedMetrics = metrics;
return Task.FromResult(metrics);
}