Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo/Binding/CosmosDBMongoBindingAsyncCollector.cs (80 lines of code) (raw):

// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using Microsoft.Extensions.Logging; using MongoDB.Driver; using System; using System.Threading; using System.Threading.Tasks; namespace Microsoft.Azure.WebJobs.Extensions.AzureCosmosDb.Mongo { public class CosmosDBMongoBindingAsyncCollector<T> : IAsyncCollector<T> { private readonly ILogger _logger; private readonly CosmosDBMongoAttribute _attribute; private readonly MongoCollectionReference _reference; public CosmosDBMongoBindingAsyncCollector(CosmosDBMongoAttribute attribute, MongoCollectionReference reference, ILogger logger) { this._attribute = attribute; this._reference = reference; this._logger = logger; } public async Task AddAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) { if (item == null) { throw new ArgumentNullException(nameof(item)); } try { if (this._reference.createIfNotExists) { await InitializeCollection(this._reference); } await UpsertDocument(this._reference, item); this._logger.LogDebug(Events.OnBindingDataAdded, "Document upserted successfully."); } catch (Exception ex) { this._logger.LogError(Events.OnBindingDataError, $"Error upserting document: {ex.Message}"); } } public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { // no-op return Task.FromResult(0); } private async Task InitializeCollection(MongoCollectionReference reference) { if (reference.createIfNotExists) { await MongoUtility.CreateCollectionIfNotExistAsync(reference); this._logger.LogDebug(Events.OnIntializedCollection, $"Collection {reference.collectionName} created successfully."); } var database = reference.client.GetDatabase(reference.databaseName); // should throw error if _collection not exist and createifnotexists is false var collection = database.GetCollection<T>(reference.collectionName); } private async Task UpsertDocument(MongoCollectionReference reference, T doc, CancellationToken cancellationToken = default) { var database = reference.client.GetDatabase(reference.databaseName); var collection = database.GetCollection<T>(reference.collectionName); var idProperty = typeof(T).GetProperty("_id"); try { if (idProperty != null) { var idValue = idProperty.GetValue(doc); var filter = Builders<T>.Filter.Eq("_id", idValue); var update = Builders<T>.Update.Set("updatedAt", DateTime.UtcNow); var options = new UpdateOptions { IsUpsert = true }; await collection.UpdateOneAsync(filter, update, options); } else { await collection.InsertOneAsync(doc, null, cancellationToken); } } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } } }