Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoMetricsStore.cs (114 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo
{
internal static class CosmosDBMongoMetricsStore
{
private static readonly ConcurrentDictionary<string, Queue<CosmosDBMongoTriggerMetrics>> _metricsHistory
= new ConcurrentDictionary<string, Queue<CosmosDBMongoTriggerMetrics>>();
private static readonly ConcurrentDictionary<string, CosmosDBMongoTriggerMetrics> _currentMetrics
= new ConcurrentDictionary<string, CosmosDBMongoTriggerMetrics>();
private static readonly CancellationTokenSource _cleanupTokenSource = new CancellationTokenSource();
private const int MaxSampleCount = 100;
private static readonly TimeSpan CleanupInterval = TimeSpan.FromMinutes(10);
private static readonly TimeSpan MetricsSnapshotInterval = TimeSpan.FromSeconds(5);
static CosmosDBMongoMetricsStore()
{
StartCleanupTask();
StartMetricsSnapshotTask();
}
public static CosmosDBMongoTriggerMetrics GetMetrics(string functionId, string databaseName, string collectionName)
{
string key = $"{functionId}-{databaseName}-{collectionName}";
return _currentMetrics.GetOrAdd(key, _ => new CosmosDBMongoTriggerMetrics());
}
public static void AddMetrics(string functionId, string databaseName, string collectionName, CosmosDBMongoTriggerMetrics metrics)
{
string key = $"{functionId}-{databaseName}-{collectionName}";
_currentMetrics.AddOrUpdate(key, metrics, (_, existing) =>
{
// We don't need to update the existing metrics since it's the same instance
// being updated by the listener
return existing;
});
}
public static CosmosDBMongoTriggerMetrics[] GetMetricsHistory(string functionId, string databaseName, string collectionName)
{
string key = $"{functionId}-{databaseName}-{collectionName}";
if (_metricsHistory.TryGetValue(key, out var queue))
{
lock (queue)
{
return queue.ToArray();
}
}
return Array.Empty<CosmosDBMongoTriggerMetrics>();
}
private static void StartCleanupTask()
{
Task.Run(async () =>
{
while (!_cleanupTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(CleanupInterval, _cleanupTokenSource.Token);
CleanupOldMetrics();
}
}, _cleanupTokenSource.Token);
}
private static void StartMetricsSnapshotTask()
{
Task.Run(async () =>
{
while (!_cleanupTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(MetricsSnapshotInterval, _cleanupTokenSource.Token);
TakeMetricsSnapshot();
}
}, _cleanupTokenSource.Token);
}
private static void TakeMetricsSnapshot()
{
foreach (var kvp in _currentMetrics)
{
var queue = _metricsHistory.GetOrAdd(kvp.Key, _ => new Queue<CosmosDBMongoTriggerMetrics>());
var snapshot = new CosmosDBMongoTriggerMetrics
{
PendingEventsCount = kvp.Value.PendingEventsCount,
Timestamp = DateTime.UtcNow
};
lock (queue)
{
queue.Enqueue(snapshot);
while (queue.Count > MaxSampleCount)
{
queue.Dequeue();
}
}
}
}
private static void CleanupOldMetrics()
{
var keysToRemove = new List<string>();
foreach (var key in _metricsHistory.Keys)
{
if (_metricsHistory.TryGetValue(key, out var queue))
{
lock (queue)
{
if (queue.Count == 0 || (DateTime.UtcNow - queue.Last().Timestamp) > TimeSpan.FromHours(1))
{
keysToRemove.Add(key);
}
}
}
}
foreach (var key in keysToRemove)
{
_metricsHistory.TryRemove(key, out _);
_currentMetrics.TryRemove(key, out _);
}
}
}
}