in src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs [64:155]
private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics)
{
ScaleStatus status = new ScaleStatus
{
Vote = ScaleVote.None
};
const int NumberOfSamplesToConsider = 5;
// Unable to determine the correct vote with no metrics.
if (metrics == null)
{
return status;
}
// We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic)
// This check is first, because it is independent of load or number of samples.
int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0;
if (partitionCount > 0 && partitionCount < workerCount)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
$"of partitions for container ({_monitoredContainer.Id}, {partitionCount})."));
return status;
}
// At least 5 samples are required to make a scale decision for the rest of the checks.
if (metrics.Length < NumberOfSamplesToConsider)
{
return status;
}
// Maintain a minimum ratio of 1 worker per 1,000 items of remaining work.
long latestRemainingWork = metrics.Last().RemainingWork;
if (latestRemainingWork > workerCount * 1000)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({_monitoredContainer.Id}, {latestRemainingWork}) " +
$"is too high relative to the number of instances ({workerCount})."));
return status;
}
bool documentsWaiting = metrics.All(m => m.RemainingWork > 0);
if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{_monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
return status;
}
// Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down.
bool isIdle = metrics.All(m => m.RemainingWork == 0);
if (isIdle)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, string.Format($"'{_monitoredContainer.Id}' is idle."));
return status;
}
// Samples are in chronological order. Check for a continuous increase in work remaining.
// If detected, this results in an automatic scale out for the site container.
bool remainingWorkIncreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0;
if (remainingWorkIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{_monitoredContainer.Id}'.");
return status;
}
bool remainingWorkDecreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.RemainingWork > next.RemainingWork);
if (remainingWorkDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{_monitoredContainer.Id}'.");
return status;
}
_logger.LogInformation(Events.OnScaling, $"CosmosDB container '{_monitoredContainer.Id}' is steady.");
return status;
}