in extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs [201:288]
private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[] metrics)
{
// We require minimum 5 samples to estimate the trend of variations in message count with certain reliability.
// These samples roughly cover the timespan of past 40 seconds.
int minSamplesForScaling = 5;
// TODO: Make this value configurable.
// Upper limit on the count of messages that needs to be maintained per worker.
int maxMessagesPerWorker = 1000;
var status = new ScaleStatus
{
Vote = ScaleVote.None,
};
// Do not make a scale decision unless we have enough samples.
if (metrics == null || metrics.Length < minSamplesForScaling)
{
this.logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for {this.logDetails}.");
return status;
}
// Consider only the most recent batch of samples in the rest of the method.
var latestMetrics = new RabbitMQTriggerMetrics[minSamplesForScaling];
Array.Copy(metrics, metrics.Length - minSamplesForScaling, latestMetrics, 0, minSamplesForScaling);
metrics = latestMetrics;
string counts = string.Join(", ", metrics.Select(metric => metric.MessageCount));
this.logger.LogInformation($"Message counts: [{counts}], worker count: {workerCount}, maximum messages per worker: {maxMessagesPerWorker}.");
// Add worker if the count of messages per worker exceeds the maximum limit.
long lastMessageCount = metrics.Last().MessageCount;
if (lastMessageCount > workerCount * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this.logger.LogInformation($"Requesting scale-out: Found too many messages for {this.logDetails} relative to the number of workers.");
return status;
}
// Check if there is a continuous increase or decrease in the count of messages.
bool isIncreasing = true;
bool isDecreasing = true;
for (int index = 0; index < metrics.Length - 1; index++)
{
isIncreasing = isIncreasing && metrics[index].MessageCount < metrics[index + 1].MessageCount;
isDecreasing = isDecreasing && (metrics[index + 1].MessageCount == 0 || metrics[index].MessageCount > metrics[index + 1].MessageCount);
}
if (isIncreasing)
{
// Scale out only if the expected count of messages would exceed the combined limit after 30 seconds.
DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30);
RabbitMQTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime);
long expectedMessageCount = (2 * metrics[metrics.Length - 1].MessageCount) - referenceMetric.MessageCount;
if (expectedMessageCount > workerCount * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this.logger.LogInformation($"Requesting scale-out: Found the messages for {this.logDetails} to be continuously increasing" +
" and may exceed the maximum limit set for the workers.");
return status;
}
else
{
this.logger.LogDebug($"Avoiding scale-out: Found the messages for {this.logDetails} to be increasing" +
" but they may not exceed the maximum limit set for the workers.");
}
}
if (isDecreasing)
{
// Scale in only if the count of messages will not exceed the combined limit post the scale-in operation.
if (lastMessageCount <= (workerCount - 1) * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleIn;
this.logger.LogInformation($"Requesting scale-in: Found {this.logDetails} to be either idle or the messages to be continuously decreasing.");
return status;
}
else
{
this.logger.LogDebug($"Avoiding scale-in: Found the messages for {this.logDetails} to be decreasing" +
" but they are high enough to require all existing workers for processing.");
}
}
this.logger.LogInformation($"Requesting no-scaling: Found {this.logDetails} to not require scaling.");
return status;
}