in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaGenericTopicScaler.cs [65:178]
private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] metrics)
{
var status = new ScaleStatus
{
Vote = ScaleVote.None,
};
const int NumberOfSamplesToConsider = 5;
// At least 5 samples are required to make a scale decision for the rest of the checks.
if (metrics == null || metrics.Length < NumberOfSamplesToConsider)
{
return status;
}
var lastMetrics = metrics.Last();
long totalLag = lastMetrics.TotalLag;
long partitionCount = lastMetrics.PartitionCount;
long lagThreshold = this.lagThreshold;
// We shouldn't assign more workers than there are partitions
// This check is first, because it is independent of load or number of samples.
if (partitionCount > 0 && partitionCount < workerCount)
{
status.Vote = ScaleVote.ScaleIn;
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.");
}
return status;
}
// Check to see if the Kafka consumer has been empty for a while. Only if all metrics samples are empty do we scale down.
bool partitionIsIdle = metrics.All(p => p.TotalLag == 0);
if (partitionIsIdle)
{
status.Vote = ScaleVote.ScaleIn;
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Topic '{topicName}', for consumer group {consumerGroup}' is idle.");
}
return status;
}
// Maintain a minimum ratio of 1 worker per lagThreshold --1,000 unprocessed message.
if (totalLag > workerCount * lagThreshold)
{
if (workerCount < partitionCount)
{
status.Vote = ScaleVote.ScaleOut;
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.");
}
}
return status;
}
// Samples are in chronological order. Check for a continuous increase in unprocessed message count.
// If detected, this results in an automatic scale out for the site container.
if (metrics[0].TotalLag > 0)
{
if (workerCount < partitionCount)
{
bool queueLengthIncreasing = IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > 0;
if (queueLengthIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.");
}
return status;
}
}
}
if (workerCount > 1)
{
bool queueLengthDecreasing = IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.TotalLag > next.TotalLag);
if (queueLengthDecreasing)
{
// Only vote down if the new workerCount / totalLag < threshold
// Example: 4 workers, only scale in if totalLag <= 2999 (3000 < (3 * 1000))
var proposedWorkerCount = workerCount - 1;
var proposedLagPerWorker = totalLag / proposedWorkerCount;
if (proposedLagPerWorker < lagThreshold)
{
status.Vote = ScaleVote.ScaleIn;
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.");
}
}
}
}
return status;
}