in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaMetricsProvider.cs [120:160]
private long GetTotalLag(List<TopicPartition> allPartitions, TimeSpan operationTimeout)
{
long totalLag = 0;
var ownedCommittedOffset = consumer.Committed(allPartitions, operationTimeout);
var partitionWithHighestLag = Partition.Any;
long highestPartitionLag = 0L;
// List of partitions that the consumer is reading from.
var currentPartitions = LoadAssignedPartitions();
// List of partitions that the consumer is not reading from.
var unassignedPartitions = allPartitions.Except(currentPartitions).ToList();
foreach (var topicPartition in currentPartitions)
{
var watermark = consumer.GetWatermarkOffsets(topicPartition);
var committed = ownedCommittedOffset.FirstOrDefault(x => x.Partition == topicPartition.Partition);
bool bothWatermarksUnset = watermark.High.Value == Offset.Unset && watermark.Low.Value == Offset.Unset;
bool lowWatermarkZeroAndCommittedIsUnSet = watermark.Low.Value == 0 && committed.Offset.Value == Offset.Unset;
// if GetWatermarkOffsets fails to return valid values, use QueryWatermarkOffsets.
if (bothWatermarksUnset || lowWatermarkZeroAndCommittedIsUnSet)
{
watermark = consumer.QueryWatermarkOffsets(topicPartition, operationTimeout);
}
UpdateTotalLag(watermark, committed, ref totalLag, ref partitionWithHighestLag, ref highestPartitionLag);
}
foreach (var topicPartition in unassignedPartitions)
{
var watermark = consumer.QueryWatermarkOffsets(topicPartition, operationTimeout);
var committed = ownedCommittedOffset.FirstOrDefault(x => x.Partition == topicPartition.Partition);
UpdateTotalLag(watermark, committed, ref totalLag, ref partitionWithHighestLag, ref highestPartitionLag);
}
// This log is only for customer reference to show calculation of total lag.
if (partitionWithHighestLag != Partition.Any)
{
logger.LogInformation($"Total lag in '{this.topicName}' is {totalLag}, highest partition lag found in {partitionWithHighestLag.Value} with value of {highestPartitionLag}.");
}
return totalLag;
}