Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedProcessorScenario.cs (162 lines of code) (raw):

//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace CosmosCTL { using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using App.Metrics; using App.Metrics.Counter; using App.Metrics.Gauge; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Logging; using Newtonsoft.Json; internal class ChangeFeedProcessorScenario : ICTLScenario { private Utils.InitializationResult initializationResult; public async Task InitializeAsync( CTLConfig config, CosmosClient cosmosClient, ILogger logger) { this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient); if (this.initializationResult.CreatedDatabase) { logger.LogInformation("Created database for execution"); } if (this.initializationResult.CreatedContainer) { logger.LogInformation("Created collection for execution"); } if (config.PreCreatedDocuments > 0) { logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments); IReadOnlyDictionary<string, IReadOnlyList<Dictionary<string, string>>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) }); this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count; } } public async Task RunAsync( CTLConfig config, CosmosClient cosmosClient, ILogger logger, IMetrics metrics, string loggingContextIdentifier, CancellationToken cancellationToken) { logger.LogInformation("Initializing counters and metrics."); CounterOptions documentCounter = new CounterOptions { Name = "#Documents received", Context = loggingContextIdentifier }; GaugeOptions leaseGauge = new GaugeOptions { Name = "#Leases created", Context = loggingContextIdentifier }; string leaseContainerId = Guid.NewGuid().ToString(); Container leaseContainer = await cosmosClient.GetDatabase(config.Database).CreateContainerAsync(leaseContainerId, "/id"); logger.LogInformation("Created lease container {0}", leaseContainerId); try { object lockObject = new object(); long documentTotal = 0; ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(config.Database, config.Collection) .GetChangeFeedProcessorBuilder<SimpleItem>("ctlProcessor", (IReadOnlyCollection<SimpleItem> docs, CancellationToken token) => { lock (lockObject) { documentTotal += docs.Count; } metrics.Measure.Counter.Increment(documentCounter, docs.Count); return Task.CompletedTask; }) .WithLeaseContainer(leaseContainer) .WithInstanceName(Guid.NewGuid().ToString()) .WithErrorNotification((string leaseToken, Exception ex) => { Utils.LogError(logger, loggingContextIdentifier, ex); return Task.CompletedTask; }) .WithStartTime(DateTime.MinValue.ToUniversalTime()) .Build(); await changeFeedProcessor.StartAsync(); logger.LogInformation("Started change feed processor"); await Task.Delay(config.RunningTimeDurationAsTimespan, cancellationToken); logger.LogInformation("Stopping change feed processor"); await changeFeedProcessor.StopAsync(); // List leases using FeedIterator<LeaseSchema> leaseIterator = leaseContainer.GetItemQueryIterator<LeaseSchema>(); int leaseTotal = 0; List<FeedRange> ranges = new List<FeedRange>(); while (leaseIterator.HasMoreResults) { FeedResponse<LeaseSchema> response = await leaseIterator.ReadNextAsync(); foreach (LeaseSchema lease in response) { if (lease.LeaseToken != null) { logger.LogInformation($"Lease for range {lease.LeaseToken} - {lease.FeedRange.EffectiveRange.Min} - {lease.FeedRange.EffectiveRange.Max}"); ranges.Add(lease.FeedRange.EffectiveRange); leaseTotal++; } } } logger.LogInformation($"Total count of leases {leaseTotal}."); metrics.Measure.Gauge.SetValue(leaseGauge, leaseTotal); string previousMin = ""; foreach (FeedRange sortedRange in ranges.OrderBy(range => range.Min)) { if (previousMin != sortedRange.Min) { Utils.LogError(logger, loggingContextIdentifier, $"Expected a sorted range with Min <{previousMin}> but encountered range <{sortedRange.Min}>:<{sortedRange.Max}>"); } previousMin = sortedRange.Max; } if (config.PreCreatedDocuments > 0) { if (this.initializationResult.InsertedDocuments != documentTotal) { Utils.LogError(logger, loggingContextIdentifier, $"Expected to receive {this.initializationResult.InsertedDocuments} documents and got {documentTotal}"); } } } catch (Exception ex) { Utils.LogError(logger, loggingContextIdentifier, ex); } finally { await leaseContainer.DeleteContainerAsync(); if (this.initializationResult.CreatedDatabase) { await cosmosClient.GetDatabase(config.Database).DeleteAsync(); } if (this.initializationResult.CreatedContainer) { await cosmosClient.GetContainer(config.Database, config.Collection).DeleteContainerAsync(); } } } private class SimpleItem { [JsonProperty("id")] public string Id { get; set; } } internal class LeaseSchema { [JsonProperty("id")] public string LeaseId { get; set; } [JsonProperty("LeaseToken")] public string LeaseToken { get; set; } [JsonProperty("FeedRange")] public Range FeedRange { get; set; } } internal class Range { [JsonProperty("Range")] public FeedRange EffectiveRange { get; set; } } internal class FeedRange { [JsonProperty("min")] public string Min { get; set; } [JsonProperty("max")] public string Max { get; set; } } } }