in Hands-on lab/lab-files/TransactionGenerator/Program.cs [82:290]
private static async Task SendData(List<Transaction> transactions,
List<EventHubProducerClient> eventHubClients, int randomSeed, int waittime,
CancellationToken externalCancellationToken, IProgress<Progress> progress,
bool onlyWriteToCosmosDb)
{
if (transactions == null)
{
throw new ArgumentNullException(nameof(transactions));
}
if (waittime > 0)
{
var span = TimeSpan.FromMilliseconds(waittime);
await Task.Delay(span, externalCancellationToken);
}
if (externalCancellationToken == null) throw new ArgumentNullException(nameof(externalCancellationToken));
if (progress == null) throw new ArgumentNullException(nameof(progress));
// Perform garbage collection prior to timing for statistics.
GC.Collect();
GC.WaitForPendingFinalizers();
var internalCancellationTokenSource = new CancellationTokenSource();
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken, internalCancellationTokenSource.Token).Token;
var random = new Random(randomSeed);
var tasks = new List<Task>();
var messages = new ConcurrentQueue<ColoredMessage>();
var eventHubsTimer = new Stopwatch();
var cosmosTimer = new Stopwatch();
// Create the Cosmos DB database and container references:
var database = _cosmosDbClient.GetDatabase(DatabaseName);
var container = database.GetContainer(CollectionName);
// Ensure none of what follows runs synchronously.
await Task.FromResult(true).ConfigureAwait(false);
// For each line, send to Event Hub and Cosmos DB.
foreach (var transaction in transactions)
{
if (externalCancellationToken.IsCancellationRequested)
{
return;
}
_totalMessages++;
var thisRequest = _totalMessages;
#region Write to Cosmos DB
_cosmosRequestsMade++;
tasks.Add(BulkheadForCosmosDbCalls.ExecuteAsync(async ct =>
{
try
{
cosmosTimer.Start();
// Send to Cosmos DB:
// TODO 1: Complete this code to send the Transaction object to Cosmos DB. Capture the returned ResourceResponse object to a new variable.
// COMPLETE THIS CODE ... var response = await ...
cosmosTimer.Stop();
_cosmosElapsedTime = cosmosTimer.ElapsedMilliseconds;
// Keep running total of RUs consumed:
// TODO 2: Complete this code to append the number of RU/s consumed to the _cosmosRUsPerBatch variable.
// WRITE CODE HERE
_cosmosRequestsSucceededInBatch++;
}
catch (CosmosException de)
{
if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Cosmos DB request {thisRequest} eventually failed with: {de.Message}; Retry-after: {de.RetryAfter} seconds.", Color.Red));
_cosmosRequestsFailed++;
}
catch (Exception e)
{
if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Cosmos DB request {thisRequest} eventually failed with: {e.Message}", Color.Red));
_cosmosRequestsFailed++;
}
}, combinedToken)
.ContinueWith((t, k) =>
{
if (t.IsFaulted) messages.Enqueue(new ColoredMessage($"Request to Cosmos DB failed with: {t.Exception?.Flatten().InnerExceptions.First().Message}", Color.Red));
_cosmosRequestsFailed++;
}, thisRequest, TaskContinuationOptions.NotOnRanToCompletion)
);
#endregion Write to Cosmos DB
#region Write to Event Hub
// Only send messages to Event Hub instances if onlyWriteToCosmosDb is set to false.
if (onlyWriteToCosmosDb == false)
{
_eventHubRequestsMade++;
tasks.Add(BulkheadForEventHubCalls.ExecuteAsync(async ct =>
{
try
{
var eventData = new EventData(new BinaryData(transaction.GetData()));
eventHubsTimer.Start();
// Send to Event Hubs:
foreach (var eventHubClient in eventHubClients)
{
// TODO 4: Complete code to send to Event Hub.
// COMPLETE THIS CODE ... using var eventBatch = // Send eventData and set the partition key to the IpCountryCode field.
}
eventHubsTimer.Stop();
_eventHubElapsedTime = eventHubsTimer.ElapsedMilliseconds;
// TODO 5: Complete code to increment the count of number of Event Hub requests that succeeded.
// COMPLETE THIS CODE
}
catch (Exception e)
{
if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Event Hubs request {thisRequest} eventually failed with: {e.Message}", Color.Red));
_eventHubRequestsFailed++;
}
}, combinedToken)
.ContinueWith((t, k) =>
{
if (t.IsFaulted) messages.Enqueue(new ColoredMessage($"Request to Event Hubs failed with: {t.Exception?.Flatten().InnerExceptions.First().Message}", Color.Red));
_eventHubRequestsFailed++;
}, thisRequest, TaskContinuationOptions.NotOnRanToCompletion)
);
}
#endregion Write to Event Hub
if (_totalMessages % 1000 == 0)
{
eventHubsTimer.Stop();
cosmosTimer.Stop();
_eventHubTotalElapsedTime += _eventHubElapsedTime;
_cosmosTotalElapsedTime += _cosmosElapsedTime;
_eventHubRequestsSucceeded += _eventHubRequestsSucceededInBatch;
_cosmosRequestsSucceeded += _cosmosRequestsSucceededInBatch;
// Calculate RUs/second/month:
var ruPerSecond = (_cosmosRUsPerBatch / (_cosmosElapsedTime * .001));
var ruPerMonth = ruPerSecond * 86400 * 30;
// Random delay every 1000 messages that are sent.
//await Task.Delay(random.Next(100, 1000), externalCancellationToken).ConfigureAwait(false);
// The obvious and recommended method for sending a lot of data is to do so in batches. This method can
// multiply the amount of data sent with each request by hundreds or thousands. However, the point of
// our exercise is not to maximize throughput and send as much data as possible, but to compare the
// relative performance between Event Hubs and Cosmos DB.
// Output statistics. Be on the lookout for the following:
// - Compare Event Hub to Cosmos DB statistics. They should have similar processing times and successful calls.
// - Inserted line shows successful inserts in this batch and throughput for writes/second with RU/s usage and estimated monthly ingestion rate added to Cosmos DB statistics.
// - Processing time: Shows whether the processing time for the past 1,000 requested inserts is faster or slower than the other service.
// - Total elapsed time: Running total of time taken to process all documents.
// - If this value continues to be grow higher for Cosmos DB vs. Event Hubs, that's a good indicator that the Cosmos DB requests are being throttled. Consider increasing the RU/s for the container.
// - Succeeded shows number of accumulative successful inserts to the service.
// - Pending are items in the bulkhead queue. This amount will continue to grow if the service is unable to keep up with demand.
// - Accumulative failed requests that encountered an exception.
messages.Enqueue(new ColoredMessage($"Total requests: requested {_totalMessages:00} ", Color.Cyan));
if (onlyWriteToCosmosDb == false)
{
messages.Enqueue(new ColoredMessage(string.Empty));
messages.Enqueue(new ColoredMessage($"Event Hub: inserted {_eventHubRequestsSucceededInBatch:00} docs @ {(_eventHubRequestsSucceededInBatch / (_eventHubElapsedTime * .001)):0.00} writes/s ", Color.White));
messages.Enqueue(new ColoredMessage($"Event Hub: processing time {_eventHubElapsedTime} ms ({(_eventHubElapsedTime > _cosmosElapsedTime ? "slower" : "faster")})", Color.Magenta));
messages.Enqueue(new ColoredMessage($"Event Hub: total elapsed time {(_eventHubTotalElapsedTime * .001):0.00} seconds ({(_eventHubTotalElapsedTime > _cosmosTotalElapsedTime ? "slower" : "faster")})", Color.Magenta));
messages.Enqueue(new ColoredMessage($"Event Hub: total succeeded {_eventHubRequestsSucceeded:00} ", Color.Green));
messages.Enqueue(new ColoredMessage($"Event Hub: total pending {_eventHubRequestsMade - _eventHubRequestsSucceeded - _eventHubRequestsFailed:00} ", Color.Yellow));
messages.Enqueue(new ColoredMessage($"Event Hub: total failed {_eventHubRequestsFailed:00}", Color.Red));
eventHubsTimer.Restart();
_eventHubElapsedTime = 0;
_eventHubRequestsSucceededInBatch = 0;
}
messages.Enqueue(new ColoredMessage(string.Empty));
messages.Enqueue(new ColoredMessage($"Cosmos DB: inserted {_cosmosRequestsSucceededInBatch:00} docs @ {(_cosmosRequestsSucceededInBatch / (_cosmosElapsedTime * .001)):0.00} writes/s, {ruPerSecond:0.00} RU/s ({(ruPerMonth / (1000 * 1000 * 1000)):0.00}B max monthly 1KB writes) ", Color.White));
messages.Enqueue(new ColoredMessage($"Cosmos DB: processing time {_cosmosElapsedTime} ms ({(_cosmosElapsedTime > _eventHubElapsedTime ? "slower" : "faster")})", Color.Magenta));
messages.Enqueue(new ColoredMessage($"Cosmos DB: total elapsed time {(_cosmosTotalElapsedTime * .001):0.00} seconds ({(_cosmosTotalElapsedTime > _eventHubTotalElapsedTime ? "slower" : "faster")})", Color.Magenta));
messages.Enqueue(new ColoredMessage($"Cosmos DB: total succeeded {_cosmosRequestsSucceeded:00} ", Color.Green));
messages.Enqueue(new ColoredMessage($"Cosmos DB: total pending {_cosmosRequestsMade - _cosmosRequestsSucceeded - _cosmosRequestsFailed:00} ", Color.Yellow));
messages.Enqueue(new ColoredMessage($"Cosmos DB: total failed {_cosmosRequestsFailed:00}", Color.Red));
messages.Enqueue(new ColoredMessage(string.Empty));
// Restart timers and reset batch settings:
cosmosTimer.Restart();
_cosmosElapsedTime = 0;
_cosmosRUsPerBatch = 0;
_cosmosRequestsSucceededInBatch = 0;
// Output all messages available right now, in one go.
progress.Report(ProgressWithMessages(ConsumeAsEnumerable(messages)));
}
}
messages.Enqueue(new ColoredMessage("Data generation complete", Color.Magenta));
progress.Report(ProgressWithMessages(ConsumeAsEnumerable(messages)));
BulkheadForEventHubCalls.Dispose();
BulkheadForCosmosDbCalls.Dispose();
eventHubsTimer.Stop();
cosmosTimer.Stop();
}