private long GetTotalLag()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaMetricsProvider.cs [120:160]


        private long GetTotalLag(List<TopicPartition> allPartitions, TimeSpan operationTimeout)
        {
            long totalLag = 0;
            var ownedCommittedOffset = consumer.Committed(allPartitions, operationTimeout);
            var partitionWithHighestLag = Partition.Any;
            long highestPartitionLag = 0L;
            // List of partitions that the consumer is reading from.
            var currentPartitions = LoadAssignedPartitions();
            // List of partitions that the consumer is not reading from.
            var unassignedPartitions = allPartitions.Except(currentPartitions).ToList();

            foreach (var topicPartition in currentPartitions)
            {
                var watermark = consumer.GetWatermarkOffsets(topicPartition);
                var committed = ownedCommittedOffset.FirstOrDefault(x => x.Partition == topicPartition.Partition);

                bool bothWatermarksUnset = watermark.High.Value == Offset.Unset && watermark.Low.Value == Offset.Unset;
                bool lowWatermarkZeroAndCommittedIsUnSet = watermark.Low.Value == 0 && committed.Offset.Value == Offset.Unset;
                // if GetWatermarkOffsets fails to return valid values, use QueryWatermarkOffsets.
                if (bothWatermarksUnset || lowWatermarkZeroAndCommittedIsUnSet)
                {
                    watermark = consumer.QueryWatermarkOffsets(topicPartition, operationTimeout);
                }

                UpdateTotalLag(watermark, committed, ref totalLag, ref partitionWithHighestLag, ref highestPartitionLag);
            }
            foreach (var topicPartition in unassignedPartitions)
            {
                var watermark = consumer.QueryWatermarkOffsets(topicPartition, operationTimeout);
                var committed = ownedCommittedOffset.FirstOrDefault(x => x.Partition == topicPartition.Partition);

                UpdateTotalLag(watermark, committed, ref totalLag, ref partitionWithHighestLag, ref highestPartitionLag);
            }

            // This log is only for customer reference to show calculation of total lag.
            if (partitionWithHighestLag != Partition.Any)
            {
                logger.LogInformation($"Total lag in '{this.topicName}' is {totalLag}, highest partition lag found in {partitionWithHighestLag.Value} with value of {highestPartitionLag}.");
            }
            return totalLag;
        }