Microsoft.Azure.Cosmos.Samples/Usage/CFPullModelLatestVersionMode/Program.cs (124 lines of code) (raw):
namespace CFPullModelLatestVersionMode
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
class Program
{
private static readonly string databaseName = "db";
private static readonly string containerName = "container";
static async Task Main()
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid EndPointUrl in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"Getting container reference for {containerName}.");
ContainerProperties properties = new ContainerProperties(containerName, partitionKeyPath: "/id");
await client.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await client.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(properties);
string latestVersionContinuationToken = await CreateLatestVersionChangeFeedIterator(container);
await IngestData(container);
await ReadLatestVersionChangeFeed(container, latestVersionContinuationToken);
}
}
finally
{
Console.WriteLine("End of demo.");
}
}
static async Task<string> CreateLatestVersionChangeFeedIterator(Container container)
{
Console.WriteLine("Creating ChangeFeedIterator to read the change feed in Latest Version mode.");
// <InitializeFeedIterator>
using (FeedIterator<Item> latestVersionIterator = container.GetChangeFeedIterator<Item>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion))
{
while (latestVersionIterator.HasMoreResults)
{
FeedResponse<Item> response = await latestVersionIterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
return response.ContinuationToken;
}
}
}
// <InitializeFeedIterator>
return null;
}
static async Task ReadLatestVersionChangeFeed(Container container, string latestVersionContinuationToken)
{
Console.ReadKey(true);
Console.Clear();
Console.WriteLine("Press any key to begin reading the change feed in Latest Version mode.");
Console.ReadKey(true);
Console.WriteLine("Press any key to stop.");
// <ReadLatestVersionChanges>
using (FeedIterator<Item> latestVersionIterator = container.GetChangeFeedIterator<Item>(ChangeFeedStartFrom.ContinuationToken(latestVersionContinuationToken), ChangeFeedMode.LatestVersion, new ChangeFeedRequestOptions { PageSizeHint = 10 }))
{
while (latestVersionIterator.HasMoreResults)
{
FeedResponse<Item> response = await latestVersionIterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(1000);
}
else
{
foreach (Item item in response)
{
Console.WriteLine($"Change in item: {item.Id}. New value: {item.Value}.");
}
}
if (Console.KeyAvailable)
{
break;
}
}
}
// <ReadLatestVersionChanges>
}
static async Task IngestData(Container container)
{
Console.Clear();
Console.WriteLine("Press any key to begin ingesting data.");
Console.ReadKey(true);
Console.WriteLine("Press any key to stop.");
while (!Console.KeyAvailable)
{
Item item = GenerateItem();
await container.UpsertItemAsync(item, new PartitionKey(item.Id));
Console.Write("*");
}
}
private static Item GenerateItem()
{
Random random = new Random();
return new Item
{
Id = random.Next(1, 999).ToString(),
Value = random.Next(1, 100000),
};
}
}
internal class Item
{
[JsonProperty("id")]
public string Id { get; set; }
public double Value { get; set; }
}
}