in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaGenericTargetScaler.cs [72:105]
internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, KafkaTriggerMetrics metrics)
{
var totalLag = metrics.TotalLag;
var partitionCount = metrics.PartitionCount;
// Since Kafka Extension supports only Premium plan for run time based scaling,
// the targetWorkerCount is set to 1 even when the totalLag is 0.
// This can be changed to 0 when the extension supports all plans.
if (totalLag == 0)
{
return new TargetScalerResult
{
TargetWorkerCount = 0
};
}
var targetConcurrency = GetConcurrency(context, lagThreshold);
int targetWorkerCount = (int)Math.Ceiling(totalLag / (decimal)targetConcurrency);
targetWorkerCount = ValidateWithPartitionCount(targetWorkerCount, partitionCount);
targetWorkerCount = ThrottleResultIfNecessary(targetWorkerCount);
if (GetChangeInWorkerCount(targetWorkerCount) > 0)
{
lastScaleUpTime = DateTime.UtcNow;
}
logger.LogInformation($"Target worker count for function '{TargetScalerDescriptor.FunctionId}' is '{targetWorkerCount}' (Topic='{topicName}', Total Lag ='{totalLag}', Concurrency='{targetConcurrency}', Consumer Group='{consumerGroup}', Partition Count='{partitionCount}').");
return new TargetScalerResult
{
TargetWorkerCount = targetWorkerCount
};
}