public async Task StartAsync()

in Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Trigger/CosmosDBMongoTriggerListener.cs [91:157]


        public async Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
                    .Match(change =>
                        change.OperationType == ChangeStreamOperationType.Insert ||
                        change.OperationType == ChangeStreamOperationType.Update ||
                        change.OperationType == ChangeStreamOperationType.Replace)
                    //;
                    .Project<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>>(
                        Builders<ChangeStreamDocument<BsonDocument>>.Projection
                            .Include(x => x.FullDocument)
                            .Include(x => x.DocumentKey)
                            .Include("_id")
                            .Include("ns")
                    );

                var changeStreamOption = new ChangeStreamOptions
                    {
                        FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
                    };
                switch (this._triggerLevel)
                {
                    case MonitorLevel.Cluster:
                        this._cursor = await this._reference.client.WatchAsync(
                            pipeline, changeStreamOption, cancellationToken);
                        break;
                    case MonitorLevel.Database:
                        this._database = this._reference.client.GetDatabase(this._reference.databaseName);
                        this._cursor = await this._database.WatchAsync(
                            pipeline, changeStreamOption, cancellationToken);
                        break;
                    case MonitorLevel.Collection:
                        this._database = this._reference.client.GetDatabase(this._reference.databaseName);
                        this._collection = this._database.GetCollection<BsonDocument>(this._reference.collectionName);
                        this._cursor = await this._collection.WatchAsync(
                            pipeline, changeStreamOption, cancellationToken);
                        break;
                    default:
                        throw new InvalidOperationException("Unknown trigger level.");
                }
                IBsonSerializer<BsonDocument> documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<BsonDocument>();

                _ = Task.Run(async () =>
                {
                    while (!this._cancellationTokenSource.Token.IsCancellationRequested)
                    {
                        while (await this._cursor.MoveNextAsync(this._cancellationTokenSource.Token))
                        {
                            var batch = this._cursor.Current;
                            foreach (var bsonDoc in batch)
                            {
                                var change = new ChangeStreamDocument<BsonDocument>(bsonDoc, documentSerializer);
                                await _workerPool.SendAsync(change, this._cancellationTokenSource.Token);
                            }
                        }
                    }
                }, this._cancellationTokenSource.Token);
                this._logger.LogDebug(Events.OnListenerStarted, "MongoDB trigger listener started.");
            }
            catch (Exception ex)
            {
                this._logger.LogError(Events.OnListenerStartError, $"Starting the listener failed. Exception: {ex.Message}");
                throw;
            }
        }