private ScaleStatus GetScaleStatusCore()

in extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs [201:288]


    private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[] metrics)
    {
        // We require minimum 5 samples to estimate the trend of variations in message count with certain reliability.
        // These samples roughly cover the timespan of past 40 seconds.
        int minSamplesForScaling = 5;

        // TODO: Make this value configurable.
        // Upper limit on the count of messages that needs to be maintained per worker.
        int maxMessagesPerWorker = 1000;

        var status = new ScaleStatus
        {
            Vote = ScaleVote.None,
        };

        // Do not make a scale decision unless we have enough samples.
        if (metrics == null || metrics.Length < minSamplesForScaling)
        {
            this.logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for {this.logDetails}.");
            return status;
        }

        // Consider only the most recent batch of samples in the rest of the method.
        var latestMetrics = new RabbitMQTriggerMetrics[minSamplesForScaling];
        Array.Copy(metrics, metrics.Length - minSamplesForScaling, latestMetrics, 0, minSamplesForScaling);
        metrics = latestMetrics;

        string counts = string.Join(", ", metrics.Select(metric => metric.MessageCount));
        this.logger.LogInformation($"Message counts: [{counts}], worker count: {workerCount}, maximum messages per worker: {maxMessagesPerWorker}.");

        // Add worker if the count of messages per worker exceeds the maximum limit.
        long lastMessageCount = metrics.Last().MessageCount;
        if (lastMessageCount > workerCount * maxMessagesPerWorker)
        {
            status.Vote = ScaleVote.ScaleOut;
            this.logger.LogInformation($"Requesting scale-out: Found too many messages for {this.logDetails} relative to the number of workers.");
            return status;
        }

        // Check if there is a continuous increase or decrease in the count of messages.
        bool isIncreasing = true;
        bool isDecreasing = true;
        for (int index = 0; index < metrics.Length - 1; index++)
        {
            isIncreasing = isIncreasing && metrics[index].MessageCount < metrics[index + 1].MessageCount;
            isDecreasing = isDecreasing && (metrics[index + 1].MessageCount == 0 || metrics[index].MessageCount > metrics[index + 1].MessageCount);
        }

        if (isIncreasing)
        {
            // Scale out only if the expected count of messages would exceed the combined limit after 30 seconds.
            DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30);
            RabbitMQTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime);
            long expectedMessageCount = (2 * metrics[metrics.Length - 1].MessageCount) - referenceMetric.MessageCount;

            if (expectedMessageCount > workerCount * maxMessagesPerWorker)
            {
                status.Vote = ScaleVote.ScaleOut;
                this.logger.LogInformation($"Requesting scale-out: Found the messages for {this.logDetails} to be continuously increasing" +
                    " and may exceed the maximum limit set for the workers.");
                return status;
            }
            else
            {
                this.logger.LogDebug($"Avoiding scale-out: Found the messages for {this.logDetails} to be increasing" +
                    " but they may not exceed the maximum limit set for the workers.");
            }
        }

        if (isDecreasing)
        {
            // Scale in only if the count of messages will not exceed the combined limit post the scale-in operation.
            if (lastMessageCount <= (workerCount - 1) * maxMessagesPerWorker)
            {
                status.Vote = ScaleVote.ScaleIn;
                this.logger.LogInformation($"Requesting scale-in: Found {this.logDetails} to be either idle or the messages to be continuously decreasing.");
                return status;
            }
            else
            {
                this.logger.LogDebug($"Avoiding scale-in: Found the messages for {this.logDetails} to be decreasing" +
                    " but they are high enough to require all existing workers for processing.");
            }
        }

        this.logger.LogInformation($"Requesting no-scaling: Found {this.logDetails} to not require scaling.");
        return status;
    }