Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoTargetScaler.cs (83 lines of code) (raw):

// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; namespace Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo { internal class CosmosDBMongoTargetScaler : ITargetScaler { private readonly string _functionId; private readonly string _databaseName; private readonly string _collectionName; private readonly CosmosDBMongoMetricsProvider _cosmosDBMongoMetricsProvider; private readonly TargetScalerDescriptor _targetScalerDescriptor; private readonly ILogger _logger; private int _maxWorkPerInstance; private int _maxWorkInstance; public CosmosDBMongoTargetScaler( string functionName, string databaseName, string collectionName, ILoggerFactory loggerFactory, int maxWorkPerInstance = 1000, int maxWorkInstance = 3 ) { this._functionId = functionName; this._databaseName = databaseName; this._collectionName = collectionName; this._cosmosDBMongoMetricsProvider = new CosmosDBMongoMetricsProvider(this._functionId, this._databaseName, this._collectionName, loggerFactory); this._targetScalerDescriptor = new TargetScalerDescriptor(_functionId); this._maxWorkPerInstance = maxWorkPerInstance; this._maxWorkInstance = maxWorkInstance; this._logger = loggerFactory.CreateLogger<CosmosDBMongoTargetScaler>(); } public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor; public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context) { try { CosmosDBMongoTriggerMetrics metrics = await _cosmosDBMongoMetricsProvider.GetMetricsAsync(); return GetScaleResultInternal(context, metrics.PendingEventsCount); } catch (UnauthorizedAccessException ex) { this._logger.LogError(Events.OnScalingError, $"Target scaler is not supported. Exception: {ex}"); throw new NotSupportedException("Target scaler is not supported.", ex); } } internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, long pendingWorkCount) { int concurrency; if (!context.InstanceConcurrency.HasValue) { concurrency = _maxWorkPerInstance; } else { concurrency = context.InstanceConcurrency.Value; } if (concurrency < 1) { throw new ArgumentOutOfRangeException($"Unexpected concurrency='{concurrency}' - the value must be > 0."); } int targetWorkerCount = 1; try { checked { targetWorkerCount = (int)Math.Ceiling(pendingWorkCount / (decimal)_maxWorkPerInstance); } } catch (OverflowException) { targetWorkerCount = int.MaxValue; } _logger.LogDebug($"Target worker count for function '{_functionId}-{_databaseName}-{_collectionName}' is '{targetWorkerCount}' Concurrency='{concurrency}')."); return new TargetScalerResult { TargetWorkerCount = targetWorkerCount > _maxWorkInstance ? _maxWorkInstance : targetWorkerCount, }; } } }