Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoTriggerListener.cs (194 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo
{
public class CosmosDBMongoTriggerListener : IListener
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly MongoCollectionReference _reference;
private readonly ILogger _logger;
private IMongoDatabase _database;
private IMongoCollection<BsonDocument> _collection;
private MonitorLevel _triggerLevel;
private IChangeStreamCursor<BsonDocument> _cursor;
private CancellationTokenSource _cancellationTokenSource;
private bool _disposed = false;
private ActionBlock<ChangeStreamDocument<BsonDocument>> _workerPool;
private const int MaxConcurrency = 32; // Maximum number of concurrent workers
private CosmosDBMongoTriggerMetrics _currentMetrics;
private readonly object _metricsLock = new object();
public CosmosDBMongoTriggerListener(ITriggeredFunctionExecutor executor, MongoCollectionReference reference, ILogger logger)
{
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._reference = reference;
this._logger = logger;
this._cancellationTokenSource = new CancellationTokenSource();
this._currentMetrics = new CosmosDBMongoTriggerMetrics();
// Initialize the worker pool
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = MaxConcurrency,
CancellationToken = this._cancellationTokenSource.Token
};
this._workerPool = new ActionBlock<ChangeStreamDocument<BsonDocument>>(
async document => await ProcessChangeAsync(document),
executionDataflowBlockOptions);
if (string.IsNullOrEmpty(this._reference.databaseName))
{
this._triggerLevel = MonitorLevel.Cluster;
}
else if (string.IsNullOrEmpty(this._reference.collectionName))
{
this._triggerLevel = MonitorLevel.Database;
}
else
{
this._triggerLevel = MonitorLevel.Collection;
}
}
public void Cancel()
{
this.StopAsync(CancellationToken.None).Wait();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_cancellationTokenSource?.Dispose();
_cursor?.Dispose();
_workerPool?.Complete();
}
_disposed = true;
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace)
//;
.Project<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>>(
Builders<ChangeStreamDocument<BsonDocument>>.Projection
.Include(x => x.FullDocument)
.Include(x => x.DocumentKey)
.Include("_id")
.Include("ns")
);
var changeStreamOption = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
switch (this._triggerLevel)
{
case MonitorLevel.Cluster:
this._cursor = await this._reference.client.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
case MonitorLevel.Database:
this._database = this._reference.client.GetDatabase(this._reference.databaseName);
this._cursor = await this._database.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
case MonitorLevel.Collection:
this._database = this._reference.client.GetDatabase(this._reference.databaseName);
this._collection = this._database.GetCollection<BsonDocument>(this._reference.collectionName);
this._cursor = await this._collection.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
default:
throw new InvalidOperationException("Unknown trigger level.");
}
IBsonSerializer<BsonDocument> documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<BsonDocument>();
_ = Task.Run(async () =>
{
while (!this._cancellationTokenSource.Token.IsCancellationRequested)
{
while (await this._cursor.MoveNextAsync(this._cancellationTokenSource.Token))
{
var batch = this._cursor.Current;
foreach (var bsonDoc in batch)
{
var change = new ChangeStreamDocument<BsonDocument>(bsonDoc, documentSerializer);
await _workerPool.SendAsync(change, this._cancellationTokenSource.Token);
}
}
}
}, this._cancellationTokenSource.Token);
this._logger.LogDebug(Events.OnListenerStarted, "MongoDB trigger listener started.");
}
catch (Exception ex)
{
this._logger.LogError(Events.OnListenerStartError, $"Starting the listener failed. Exception: {ex.Message}");
throw;
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
_workerPool.Complete();
await _workerPool.Completion;
this._cursor.Dispose();
this._logger.LogDebug(Events.OnListenerStopped, "MongoDB trigger listener stopped.");
}
catch (Exception ex)
{
this._logger.LogError(Events.OnListenerStopError, $"Stopping the listener failed. Exception: {ex.Message}");
}
}
private async Task ProcessChangeAsync(ChangeStreamDocument<BsonDocument> change)
{
try
{
lock (_metricsLock)
{
_currentMetrics.PendingEventsCount++;
CosmosDBMongoMetricsStore.AddMetrics(_reference.functionId, _reference.databaseName, _reference.collectionName, _currentMetrics);
}
try
{
var triggerData = new TriggeredFunctionData
{
TriggerValue = change
};
var result = await this._executor.TryExecuteAsync(triggerData, this._cancellationTokenSource.Token);
if (!result.Succeeded)
{
_logger.LogWarning($"Function execution failed for document {change.DocumentKey}: {result.Exception}");
}
}
finally
{
lock (_metricsLock)
{
_currentMetrics.PendingEventsCount--;
CosmosDBMongoMetricsStore.AddMetrics(_reference.functionId, _reference.databaseName, _reference.collectionName, _currentMetrics);
}
}
}
catch (Exception ex)
{
_logger.LogError(Events.OnError, $"Error processing change. Exception: {ex.Message}");
throw;
}
}
}
}