private ScaleStatus GetScaleStatusCore()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaGenericTopicScaler.cs [65:178]


        private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] metrics)
        {
            var status = new ScaleStatus
            {
                Vote = ScaleVote.None,
            };

            const int NumberOfSamplesToConsider = 5;

            // At least 5 samples are required to make a scale decision for the rest of the checks.
            if (metrics == null || metrics.Length < NumberOfSamplesToConsider)
            {
                return status;
            }

            var lastMetrics = metrics.Last();
            long totalLag = lastMetrics.TotalLag;
            long partitionCount = lastMetrics.PartitionCount;
            long lagThreshold = this.lagThreshold;

            // We shouldn't assign more workers than there are partitions
            // This check is first, because it is independent of load or number of samples.
            if (partitionCount > 0 && partitionCount < workerCount)
            {
                status.Vote = ScaleVote.ScaleIn;

                if (logger.IsEnabled(LogLevel.Information))
                {
                    logger.LogInformation($"Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.");
                }

                return status;
            }

            // Check to see if the Kafka consumer has been empty for a while. Only if all metrics samples are empty do we scale down.
            bool partitionIsIdle = metrics.All(p => p.TotalLag == 0);

            if (partitionIsIdle)
            {
                status.Vote = ScaleVote.ScaleIn;
                if (logger.IsEnabled(LogLevel.Information))
                {
                    logger.LogInformation($"Topic '{topicName}', for consumer group {consumerGroup}' is idle.");
                }

                return status;
            }

            // Maintain a minimum ratio of 1 worker per lagThreshold --1,000 unprocessed message.
            if (totalLag > workerCount * lagThreshold)
            {
                if (workerCount < partitionCount)
                {
                    status.Vote = ScaleVote.ScaleOut;

                    if (logger.IsEnabled(LogLevel.Information))
                    {
                        logger.LogInformation($"Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.");
                    }
                }
                return status;
            }

            // Samples are in chronological order. Check for a continuous increase in unprocessed message count.
            // If detected, this results in an automatic scale out for the site container.
            if (metrics[0].TotalLag > 0)
            {
                if (workerCount < partitionCount)
                {
                    bool queueLengthIncreasing = IsTrueForLast(
                        metrics,
                        NumberOfSamplesToConsider,
                        (prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > 0;

                    if (queueLengthIncreasing)
                    {
                        status.Vote = ScaleVote.ScaleOut;

                        if (logger.IsEnabled(LogLevel.Information))
                        {
                            logger.LogInformation($"Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.");
                        }
                        return status;
                    }
                }
            }

            if (workerCount > 1)
            {
                bool queueLengthDecreasing = IsTrueForLast(
                    metrics,
                    NumberOfSamplesToConsider,
                    (prev, next) => prev.TotalLag > next.TotalLag);

                if (queueLengthDecreasing)
                {
                    // Only vote down if the new workerCount / totalLag < threshold
                    // Example: 4 workers, only scale in if totalLag <= 2999 (3000 < (3 * 1000))
                    var proposedWorkerCount = workerCount - 1;
                    var proposedLagPerWorker = totalLag / proposedWorkerCount;
                    if (proposedLagPerWorker < lagThreshold)
                    {
                        status.Vote = ScaleVote.ScaleIn;

                        if (logger.IsEnabled(LogLevel.Information))
                        {
                            logger.LogInformation($"Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.");
                        }
                    }
                }
            }

            return status;
        }