in Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoTriggerListener.cs [91:157]
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace)
//;
.Project<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>>(
Builders<ChangeStreamDocument<BsonDocument>>.Projection
.Include(x => x.FullDocument)
.Include(x => x.DocumentKey)
.Include("_id")
.Include("ns")
);
var changeStreamOption = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
switch (this._triggerLevel)
{
case MonitorLevel.Cluster:
this._cursor = await this._reference.client.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
case MonitorLevel.Database:
this._database = this._reference.client.GetDatabase(this._reference.databaseName);
this._cursor = await this._database.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
case MonitorLevel.Collection:
this._database = this._reference.client.GetDatabase(this._reference.databaseName);
this._collection = this._database.GetCollection<BsonDocument>(this._reference.collectionName);
this._cursor = await this._collection.WatchAsync(
pipeline, changeStreamOption, cancellationToken);
break;
default:
throw new InvalidOperationException("Unknown trigger level.");
}
IBsonSerializer<BsonDocument> documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<BsonDocument>();
_ = Task.Run(async () =>
{
while (!this._cancellationTokenSource.Token.IsCancellationRequested)
{
while (await this._cursor.MoveNextAsync(this._cancellationTokenSource.Token))
{
var batch = this._cursor.Current;
foreach (var bsonDoc in batch)
{
var change = new ChangeStreamDocument<BsonDocument>(bsonDoc, documentSerializer);
await _workerPool.SendAsync(change, this._cancellationTokenSource.Token);
}
}
}
}, this._cancellationTokenSource.Token);
this._logger.LogDebug(Events.OnListenerStarted, "MongoDB trigger listener started.");
}
catch (Exception ex)
{
this._logger.LogError(Events.OnListenerStartError, $"Starting the listener failed. Exception: {ex.Message}");
throw;
}
}