Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoScaleMonitor.cs (112 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo
{
internal class CosmosDBMongoScaleMonitor : IScaleMonitor<CosmosDBMongoTriggerMetrics>
{
private readonly string _functionId;
private readonly string _databaseName;
private readonly string _collectionName;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly ILogger<CosmosDBMongoScaleMonitor> _logger;
private readonly CosmosDBMongoMetricsProvider _cosmosDBMongoMetricsProvider;
private int _maxWorkPerInstance;
private int _minSampleCount;
public CosmosDBMongoScaleMonitor(
string functionName,
string databaseName,
string collectionName,
ILoggerFactory loggerFactory,
int maxWorkPerInstance = 1000,
int minSampleCount = 5
)
{
_functionId = functionName;
_databaseName = databaseName;
_collectionName = collectionName;
_scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-{_databaseName}-{_collectionName}", _functionId);
_logger = loggerFactory.CreateLogger<CosmosDBMongoScaleMonitor>();
_cosmosDBMongoMetricsProvider = new CosmosDBMongoMetricsProvider(_functionId, _databaseName, _collectionName, loggerFactory);
_maxWorkPerInstance = maxWorkPerInstance;
_minSampleCount = minSampleCount;
}
public ScaleMonitorDescriptor Descriptor
{
get
{
return _scaleMonitorDescriptor;
}
}
async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await GetMetricsAsync().ConfigureAwait(false);
}
public async Task<CosmosDBMongoTriggerMetrics> GetMetricsAsync()
{
return await _cosmosDBMongoMetricsProvider.GetMetricsAsync().ConfigureAwait(false);
}
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<CosmosDBMongoTriggerMetrics>().ToArray());
}
public ScaleStatus GetScaleStatus(ScaleStatusContext<CosmosDBMongoTriggerMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}
private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBMongoTriggerMetrics[] metrics)
{
ScaleStatus status = new ScaleStatus
{
Vote = ScaleVote.None
};
// Unable to determine the correct vote with no metrics.
if (metrics == null || metrics.Length == 0)
{
return status;
}
// At least _minSampleCount samples are required to make a scale decision for the rest of the checks.
if (metrics.Length < _minSampleCount)
{
return status;
}
// Samples are in chronological order. Check for a continuous increase in message count.
// If detected, this results in an automatic scale out for the site container.
if (metrics[metrics.Length-1].PendingEventsCount > 0)
{
bool PendingWorkCountHitMaxForN =
IsTrueForLastN(
metrics,
_minSampleCount,
(prev, next) => prev.PendingEventsCount <= next.PendingEventsCount && next.PendingEventsCount >= _maxWorkPerInstance);
if (PendingWorkCountHitMaxForN)
{
status.Vote = ScaleVote.ScaleOut;
return status;
}
}
bool PendingWorkCountDecreasing =
IsTrueForLastN(
metrics,
_minSampleCount,
(prev, next) => prev.PendingEventsCount >= next.PendingEventsCount);
if (PendingWorkCountDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
return status;
}
_logger.LogDebug($"CosmosDB Mongo trigger function '{_functionId}-{_databaseName}-{_collectionName}' is steady.");
return status;
}
private static bool IsTrueForLastN(IList<CosmosDBMongoTriggerMetrics> samples, int count, Func<CosmosDBMongoTriggerMetrics, CosmosDBMongoTriggerMetrics, bool> predicate)
{
// Walks through the list from left to right starting at len(samples) - count.
for (int i = samples.Count - count - 1; i < samples.Count - 1; i++)
{
if (!predicate(samples[i], samples[i + 1]))
{
return false;
}
}
return true;
}
}
}