private async Task ExecuteOperationsAsync()

in Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs [91:232]


        private async Task ExecuteOperationsAsync(
            CTLConfig config,
            ILogger logger,
            IMetrics metrics,
            string loggingContextIdentifier,
            InitializationResult initializationResult,
            ReadWriteQueryPercentage readWriteQueryPercentage,
            CancellationToken cancellationToken)
        {
            logger.LogInformation("Initializing counters and metrics.");
            CounterOptions readSuccessMeter = new CounterOptions { Name = "#Read Successful Operations", Context = loggingContextIdentifier };
            CounterOptions readFailureMeter = new CounterOptions { Name = "#Read Unsuccessful Operations", Context = loggingContextIdentifier };
            CounterOptions writeSuccessMeter = new CounterOptions { Name = "#Write Successful Operations", Context = loggingContextIdentifier };
            CounterOptions writeFailureMeter = new CounterOptions { Name = "#Write Unsuccessful Operations", Context = loggingContextIdentifier };
            CounterOptions querySuccessMeter = new CounterOptions { Name = "#Query Successful Operations", Context = loggingContextIdentifier };
            CounterOptions queryFailureMeter = new CounterOptions { Name = "#Query Unsuccessful Operations", Context = loggingContextIdentifier };

            TimerOptions readLatencyTimer = new()
            {
                Name = "Read latency",
                MeasurementUnit = Unit.Requests,
                DurationUnit = TimeUnit.Milliseconds,
                RateUnit = TimeUnit.Seconds,
                Context = loggingContextIdentifier,
                Reservoir = () => ReservoirProvider.GetReservoir(config)
            };

            TimerOptions writeLatencyTimer = new ()
            {
                Name = "Write latency",
                MeasurementUnit = Unit.Requests,
                DurationUnit = TimeUnit.Milliseconds,
                RateUnit = TimeUnit.Seconds,
                Context = loggingContextIdentifier,
                Reservoir = () => ReservoirProvider.GetReservoir(config)
            };

            TimerOptions queryLatencyTimer = new ()
            {
                Name = "Query latency",
                MeasurementUnit = Unit.Requests,
                DurationUnit = TimeUnit.Milliseconds,
                RateUnit = TimeUnit.Seconds,
                Context = loggingContextIdentifier,
                Reservoir = () => ReservoirProvider.GetReservoir(config)
            };

            SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency);
            Stopwatch stopwatch = Stopwatch.StartNew();
            int writeRange = readWriteQueryPercentage.ReadPercentage + readWriteQueryPercentage.WritePercentage;
            List<Task> operations = new List<Task>();
            for (long i = 0; ShouldContinue(stopwatch, i, config); i++)
            {
                await concurrencyControlSemaphore.WaitAsync(cancellationToken);
                long index = i % 100;
                if (index < readWriteQueryPercentage.ReadPercentage)
                {
                    operations.Add(CTLOperationHandler<ItemResponse<Dictionary<string, string>>>.PerformOperationAsync(
                        createTimerContext: () => metrics.Measure.Timer.Time(readLatencyTimer),
                        resultProducer: new SingleExecutionResultProducer<ItemResponse<Dictionary<string, string>>>(() => this.CreateReadOperation(
                            operation: i,
                            partitionKeyAttributeName: config.CollectionPartitionKey,
                            containers: initializationResult.Containers,
                            createdDocumentsPerContainer: this.createdDocuments)),
                        onSuccess: () =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(readSuccessMeter);
                        },
                        onFailure: (Exception ex) =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(readFailureMeter);
                            Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during read operation");
                        },
                        logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
                            logger: logger,
                            operationName: "Read",
                            timerContextLatency: latency,
                            config: config,
                            cosmosDiagnostics: response.Diagnostics)));
                }
                else if (index < writeRange)
                {
                    operations.Add(CTLOperationHandler<ItemResponse<Dictionary<string, string>>>.PerformOperationAsync(
                        createTimerContext: () => metrics.Measure.Timer.Time(writeLatencyTimer),
                        resultProducer: new SingleExecutionResultProducer<ItemResponse<Dictionary<string, string>>>(() => this.CreateWriteOperation(
                            operation: i,
                            partitionKeyAttributeName: config.CollectionPartitionKey,
                            containers: initializationResult.Containers,
                            isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled.Value)),
                        onSuccess: () =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(writeSuccessMeter);
                        },
                        onFailure: (Exception ex) =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(writeFailureMeter);
                            Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during write operation");
                        },
                        logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
                            logger: logger,
                            operationName: "Write",
                            timerContextLatency: latency,
                            config: config,
                            cosmosDiagnostics: response.Diagnostics)));

                }
                else
                {
                    operations.Add(CTLOperationHandler<FeedResponse<Dictionary<string, string>>>.PerformOperationAsync(
                        createTimerContext: () => metrics.Measure.Timer.Time(queryLatencyTimer),
                        resultProducer: new IteratorResultProducer<Dictionary<string, string>>(this.CreateQueryOperation(
                            operation: i,
                            containers: initializationResult.Containers)),
                        onSuccess: () =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(querySuccessMeter);
                        },
                        onFailure: (Exception ex) =>
                        {
                            concurrencyControlSemaphore.Release();
                            metrics.Measure.Counter.Increment(queryFailureMeter);
                            Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during query operation");
                        },
                        logDiagnostics: (FeedResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
                            logger: logger,
                            operationName: "Query",
                            timerContextLatency: latency,
                            config: config,
                            cosmosDiagnostics: response.Diagnostics)));
                }
            }

            await Task.WhenAll(operations);
            stopwatch.Stop();
            logger.LogInformation("[{0}] operations performed in [{1}] seconds.",
                operations.Count, stopwatch.Elapsed.TotalSeconds);
        }