private static async Task SendData()

in Hands-on lab/lab-files/TransactionGenerator/Program.cs [82:290]


        private static async Task SendData(List<Transaction> transactions,
            List<EventHubProducerClient> eventHubClients, int randomSeed, int waittime,
            CancellationToken externalCancellationToken, IProgress<Progress> progress,
            bool onlyWriteToCosmosDb)
        {
            if (transactions == null)
            {
                throw new ArgumentNullException(nameof(transactions));
            }

            if (waittime > 0)
            {
                var span = TimeSpan.FromMilliseconds(waittime);
                await Task.Delay(span, externalCancellationToken);
            }

            if (externalCancellationToken == null) throw new ArgumentNullException(nameof(externalCancellationToken));
            if (progress == null) throw new ArgumentNullException(nameof(progress));

            // Perform garbage collection prior to timing for statistics.
            GC.Collect();
            GC.WaitForPendingFinalizers();

            var internalCancellationTokenSource = new CancellationTokenSource();
            var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken, internalCancellationTokenSource.Token).Token;
            var random = new Random(randomSeed);
            var tasks = new List<Task>();
            var messages = new ConcurrentQueue<ColoredMessage>();
            var eventHubsTimer = new Stopwatch();
            var cosmosTimer = new Stopwatch();

            // Create the Cosmos DB database and container references:
            var database = _cosmosDbClient.GetDatabase(DatabaseName);
            var container = database.GetContainer(CollectionName);

            // Ensure none of what follows runs synchronously.
            await Task.FromResult(true).ConfigureAwait(false);

            // For each line, send to Event Hub and Cosmos DB.
            foreach (var transaction in transactions)
            {
                if (externalCancellationToken.IsCancellationRequested)
                {
                    return;
                }

                _totalMessages++;
                var thisRequest = _totalMessages;

                #region Write to Cosmos DB
                _cosmosRequestsMade++;
                tasks.Add(BulkheadForCosmosDbCalls.ExecuteAsync(async ct =>
                {
                    try
                    {
                        cosmosTimer.Start();

                        // Send to Cosmos DB:
                        // TODO 1: Complete this code to send the Transaction object to Cosmos DB. Capture the returned ResourceResponse object to a new variable.
                        // COMPLETE THIS CODE ... var response = await ...

                        cosmosTimer.Stop();
                        _cosmosElapsedTime = cosmosTimer.ElapsedMilliseconds;

                        // Keep running total of RUs consumed:
                        // TODO 2: Complete this code to append the number of RU/s consumed to the _cosmosRUsPerBatch variable.
                        // WRITE CODE HERE

                        _cosmosRequestsSucceededInBatch++;
                    }
                    catch (CosmosException de)
                    {
                        if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Cosmos DB request {thisRequest} eventually failed with: {de.Message}; Retry-after: {de.RetryAfter} seconds.", Color.Red));

                        _cosmosRequestsFailed++;
                    }
                    catch (Exception e)
                    {
                        if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Cosmos DB request {thisRequest} eventually failed with: {e.Message}", Color.Red));

                        _cosmosRequestsFailed++;
                    }
                }, combinedToken)
                    .ContinueWith((t, k) =>
                    {
                        if (t.IsFaulted) messages.Enqueue(new ColoredMessage($"Request to Cosmos DB failed with: {t.Exception?.Flatten().InnerExceptions.First().Message}", Color.Red));

                        _cosmosRequestsFailed++;
                    }, thisRequest, TaskContinuationOptions.NotOnRanToCompletion)
                );
                #endregion Write to Cosmos DB

                #region Write to Event Hub
                // Only send messages to Event Hub instances if onlyWriteToCosmosDb is set to false.
                if (onlyWriteToCosmosDb == false)
                {
                    _eventHubRequestsMade++;
                    tasks.Add(BulkheadForEventHubCalls.ExecuteAsync(async ct =>
                        {
                            try
                            {
                                var eventData = new EventData(new BinaryData(transaction.GetData()));
                                
                                eventHubsTimer.Start();

                                // Send to Event Hubs:
                                foreach (var eventHubClient in eventHubClients)
                                {
                                    // TODO 4: Complete code to send to Event Hub.
                                    // COMPLETE THIS CODE ... using var eventBatch = // Send eventData and set the partition key to the IpCountryCode field.
                                }

                                eventHubsTimer.Stop();
                                _eventHubElapsedTime = eventHubsTimer.ElapsedMilliseconds;

                                // TODO 5: Complete code to increment the count of number of Event Hub requests that succeeded.
                                // COMPLETE THIS CODE
                            }
                            catch (Exception e)
                            {
                                if (!ct.IsCancellationRequested) messages.Enqueue(new ColoredMessage($"Event Hubs request {thisRequest} eventually failed with: {e.Message}", Color.Red));

                                _eventHubRequestsFailed++;
                            }
                        }, combinedToken)
                        .ContinueWith((t, k) =>
                        {
                            if (t.IsFaulted) messages.Enqueue(new ColoredMessage($"Request to Event Hubs failed with: {t.Exception?.Flatten().InnerExceptions.First().Message}", Color.Red));

                            _eventHubRequestsFailed++;
                        }, thisRequest, TaskContinuationOptions.NotOnRanToCompletion)
                    );
                }
                #endregion Write to Event Hub

                if (_totalMessages % 1000 == 0)
                {
                    eventHubsTimer.Stop();
                    cosmosTimer.Stop();
                    _eventHubTotalElapsedTime += _eventHubElapsedTime;
                    _cosmosTotalElapsedTime += _cosmosElapsedTime;
                    _eventHubRequestsSucceeded += _eventHubRequestsSucceededInBatch;
                    _cosmosRequestsSucceeded += _cosmosRequestsSucceededInBatch;

                    // Calculate RUs/second/month:
                    var ruPerSecond = (_cosmosRUsPerBatch / (_cosmosElapsedTime * .001));
                    var ruPerMonth = ruPerSecond * 86400 * 30;

                    // Random delay every 1000 messages that are sent.
                    //await Task.Delay(random.Next(100, 1000), externalCancellationToken).ConfigureAwait(false);

                    // The obvious and recommended method for sending a lot of data is to do so in batches. This method can
                    // multiply the amount of data sent with each request by hundreds or thousands. However, the point of
                    // our exercise is not to maximize throughput and send as much data as possible, but to compare the
                    // relative performance between Event Hubs and Cosmos DB.

                    // Output statistics. Be on the lookout for the following:
                    //  - Compare Event Hub to Cosmos DB statistics. They should have similar processing times and successful calls.
                    //  - Inserted line shows successful inserts in this batch and throughput for writes/second with RU/s usage and estimated monthly ingestion rate added to Cosmos DB statistics.
                    //  - Processing time: Shows whether the processing time for the past 1,000 requested inserts is faster or slower than the other service.
                    //  - Total elapsed time: Running total of time taken to process all documents.
                    //      - If this value continues to be grow higher for Cosmos DB vs. Event Hubs, that's a good indicator that the Cosmos DB requests are being throttled. Consider increasing the RU/s for the container.
                    //  - Succeeded shows number of accumulative successful inserts to the service.
                    //  - Pending are items in the bulkhead queue. This amount will continue to grow if the service is unable to keep up with demand.
                    //  - Accumulative failed requests that encountered an exception.
                    messages.Enqueue(new ColoredMessage($"Total requests: requested {_totalMessages:00} ", Color.Cyan));
                    if (onlyWriteToCosmosDb == false)
                    {
                        messages.Enqueue(new ColoredMessage(string.Empty));
                        messages.Enqueue(new ColoredMessage($"Event Hub: inserted {_eventHubRequestsSucceededInBatch:00} docs @ {(_eventHubRequestsSucceededInBatch / (_eventHubElapsedTime * .001)):0.00} writes/s ", Color.White));
                        messages.Enqueue(new ColoredMessage($"Event Hub: processing time {_eventHubElapsedTime} ms ({(_eventHubElapsedTime > _cosmosElapsedTime ? "slower" : "faster")})", Color.Magenta));
                        messages.Enqueue(new ColoredMessage($"Event Hub: total elapsed time {(_eventHubTotalElapsedTime * .001):0.00} seconds ({(_eventHubTotalElapsedTime > _cosmosTotalElapsedTime ? "slower" : "faster")})", Color.Magenta));
                        messages.Enqueue(new ColoredMessage($"Event Hub: total succeeded {_eventHubRequestsSucceeded:00} ", Color.Green));
                        messages.Enqueue(new ColoredMessage($"Event Hub: total pending {_eventHubRequestsMade - _eventHubRequestsSucceeded - _eventHubRequestsFailed:00} ", Color.Yellow));
                        messages.Enqueue(new ColoredMessage($"Event Hub: total failed {_eventHubRequestsFailed:00}", Color.Red));

                        eventHubsTimer.Restart();
                        _eventHubElapsedTime = 0;
                        _eventHubRequestsSucceededInBatch = 0;
                    }

                    messages.Enqueue(new ColoredMessage(string.Empty));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: inserted {_cosmosRequestsSucceededInBatch:00} docs @ {(_cosmosRequestsSucceededInBatch / (_cosmosElapsedTime * .001)):0.00} writes/s, {ruPerSecond:0.00} RU/s ({(ruPerMonth / (1000 * 1000 * 1000)):0.00}B max monthly 1KB writes) ", Color.White));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: processing time {_cosmosElapsedTime} ms ({(_cosmosElapsedTime > _eventHubElapsedTime ? "slower" : "faster")})", Color.Magenta));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: total elapsed time {(_cosmosTotalElapsedTime * .001):0.00} seconds ({(_cosmosTotalElapsedTime > _eventHubTotalElapsedTime ? "slower" : "faster")})", Color.Magenta));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: total succeeded {_cosmosRequestsSucceeded:00} ", Color.Green));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: total pending {_cosmosRequestsMade - _cosmosRequestsSucceeded - _cosmosRequestsFailed:00} ", Color.Yellow));
                    messages.Enqueue(new ColoredMessage($"Cosmos DB: total failed {_cosmosRequestsFailed:00}", Color.Red));
                    messages.Enqueue(new ColoredMessage(string.Empty));

                    // Restart timers and reset batch settings:                    
                    cosmosTimer.Restart();
                    _cosmosElapsedTime = 0;
                    _cosmosRUsPerBatch = 0;
                    _cosmosRequestsSucceededInBatch = 0;

                    // Output all messages available right now, in one go.
                    progress.Report(ProgressWithMessages(ConsumeAsEnumerable(messages)));
                }
            }

            messages.Enqueue(new ColoredMessage("Data generation complete", Color.Magenta));
            progress.Report(ProgressWithMessages(ConsumeAsEnumerable(messages)));

            BulkheadForEventHubCalls.Dispose();
            BulkheadForCosmosDbCalls.Dispose();
            eventHubsTimer.Stop();
            cosmosTimer.Stop();
        }