Microsoft.Azure.Cosmos.Samples/Usage/CFPullModelAllVersionsAndDeletesMode/Program.cs (156 lines of code) (raw):
namespace CFPullModelAllVersionsAndDeletesMode
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
class Program
{
private static readonly string databaseName = "db";
private static readonly string containerName = "container";
static async Task Main()
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid EndPointUrl in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"Getting container reference for {containerName}.");
ContainerProperties properties = new ContainerProperties(containerName, partitionKeyPath: "/id");
await client.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await client.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(properties);
string allVersionsContinuationToken = await CreateAllVersionsAndDeletesChangeFeedIterator(container);
await IngestData(container);
await DeleteData(container);
await ReadAllVersionsAndDeletesChangeFeed(container, allVersionsContinuationToken);
}
}
finally
{
Console.WriteLine("End of demo.");
}
}
static async Task<string> CreateAllVersionsAndDeletesChangeFeedIterator(Container container)
{
Console.WriteLine("Creating ChangeFeedIterator to read the change feed in All Versions and Deletes mode.");
// <InitializeFeedIterator>
using (FeedIterator<dynamic> allVersionsIterator = container
.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes))
{
while (allVersionsIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await allVersionsIterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
return response.ContinuationToken;
}
}
}
// <InitializeFeedIterator>
return null;
}
static async Task IngestData(Container container)
{
Console.Clear();
Console.WriteLine("Press any key to begin ingesting data.");
Console.ReadKey(true);
Console.WriteLine("Press any key to stop.");
while (!Console.KeyAvailable)
{
Item item = GenerateItem();
await container.UpsertItemAsync(item, new PartitionKey(item.Id));
Console.Write("*");
}
}
static async Task DeleteData(Container container)
{
Console.ReadKey(true);
Console.Clear();
Console.WriteLine("Press any key to begin deleting data.");
Console.ReadKey(true);
Console.WriteLine("Press any key to stop");
int deleteItemCounter = 0;
while (!Console.KeyAvailable)
{
deleteItemCounter++;
try
{
await container.DeleteItemAsync<Item>(
partitionKey: new PartitionKey(deleteItemCounter.ToString()),
id: deleteItemCounter.ToString());
Console.Write("-");
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.NotFound)
{
// Deleting by a random id that might not exist in the container will likely throw errors that are safe to ignore for this purpose
}
}
}
static async Task ReadAllVersionsAndDeletesChangeFeed(Container container, string allVersionsContinuationToken)
{
Console.ReadKey(true);
Console.Clear();
Console.WriteLine("Press any key to start reading the change feed in All Versions and Deletes mode.");
Console.ReadKey(true);
Console.WriteLine("Press any key to stop.");
// <ReadAllVersionsAndDeletesChanges>
using (FeedIterator<dynamic> allVersionsIterator = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.ContinuationToken(allVersionsContinuationToken), ChangeFeedMode.AllVersionsAndDeletes, new ChangeFeedRequestOptions { PageSizeHint = 10 }))
{
while (allVersionsIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await allVersionsIterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(1000);
}
else
{
foreach (dynamic item in response)
{
// if operaiton is delete
if (item.metadata.operationType == "delete")
{
bool isTTL = (item.metadata?.timeToLiveExpired == null) ? false : true;
Console.WriteLine($"Operation: {item.metadata.operationType}. Item id: {item.metadata.id}. Due to ttl: {isTTL}");
}
// if operation is create or replace
else
{
Console.WriteLine($"Operation: {item.metadata.operationType}. Item id: {item.current.Id}. Current value: {item.current.Value}");
}
Console.WriteLine($"{item}");
}
}
if (Console.KeyAvailable)
{
break;
}
}
}
// <ReadAllVersionsAndDeletesChanges>
}
private static Item GenerateItem()
{
Random random = new Random();
return new Item
{
Id = random.Next(1, 999).ToString(),
Value = random.Next(1, 100000),
};
}
}
internal class Item
{
[JsonProperty("id")]
public string Id { get; set; }
public double Value { get; set; }
}
}