internal TargetScalerResult GetScaleResultInternal()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaGenericTargetScaler.cs [72:105]


        internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, KafkaTriggerMetrics metrics)
        {
            var totalLag = metrics.TotalLag;
            var partitionCount = metrics.PartitionCount;

            // Since Kafka Extension supports only Premium plan for run time based scaling,
            // the targetWorkerCount is set to 1 even when the totalLag is 0.
            // This can be changed to 0 when the extension supports all plans.
            if (totalLag == 0)
            {
                return new TargetScalerResult
                {
                    TargetWorkerCount = 0
                };
            }

            var targetConcurrency = GetConcurrency(context, lagThreshold);

            int targetWorkerCount = (int)Math.Ceiling(totalLag / (decimal)targetConcurrency);

            targetWorkerCount = ValidateWithPartitionCount(targetWorkerCount, partitionCount);
            targetWorkerCount = ThrottleResultIfNecessary(targetWorkerCount);
            if (GetChangeInWorkerCount(targetWorkerCount) > 0)
            {
                lastScaleUpTime = DateTime.UtcNow;
            }

            logger.LogInformation($"Target worker count for function '{TargetScalerDescriptor.FunctionId}' is '{targetWorkerCount}' (Topic='{topicName}', Total Lag ='{totalLag}', Concurrency='{targetConcurrency}', Consumer Group='{consumerGroup}', Partition Count='{partitionCount}').");

            return new TargetScalerResult
            {
                TargetWorkerCount = targetWorkerCount
            };
        }