in Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ReadWriteQueryScenario.cs [91:232]
private async Task ExecuteOperationsAsync(
CTLConfig config,
ILogger logger,
IMetrics metrics,
string loggingContextIdentifier,
InitializationResult initializationResult,
ReadWriteQueryPercentage readWriteQueryPercentage,
CancellationToken cancellationToken)
{
logger.LogInformation("Initializing counters and metrics.");
CounterOptions readSuccessMeter = new CounterOptions { Name = "#Read Successful Operations", Context = loggingContextIdentifier };
CounterOptions readFailureMeter = new CounterOptions { Name = "#Read Unsuccessful Operations", Context = loggingContextIdentifier };
CounterOptions writeSuccessMeter = new CounterOptions { Name = "#Write Successful Operations", Context = loggingContextIdentifier };
CounterOptions writeFailureMeter = new CounterOptions { Name = "#Write Unsuccessful Operations", Context = loggingContextIdentifier };
CounterOptions querySuccessMeter = new CounterOptions { Name = "#Query Successful Operations", Context = loggingContextIdentifier };
CounterOptions queryFailureMeter = new CounterOptions { Name = "#Query Unsuccessful Operations", Context = loggingContextIdentifier };
TimerOptions readLatencyTimer = new()
{
Name = "Read latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => ReservoirProvider.GetReservoir(config)
};
TimerOptions writeLatencyTimer = new ()
{
Name = "Write latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => ReservoirProvider.GetReservoir(config)
};
TimerOptions queryLatencyTimer = new ()
{
Name = "Query latency",
MeasurementUnit = Unit.Requests,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = loggingContextIdentifier,
Reservoir = () => ReservoirProvider.GetReservoir(config)
};
SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency);
Stopwatch stopwatch = Stopwatch.StartNew();
int writeRange = readWriteQueryPercentage.ReadPercentage + readWriteQueryPercentage.WritePercentage;
List<Task> operations = new List<Task>();
for (long i = 0; ShouldContinue(stopwatch, i, config); i++)
{
await concurrencyControlSemaphore.WaitAsync(cancellationToken);
long index = i % 100;
if (index < readWriteQueryPercentage.ReadPercentage)
{
operations.Add(CTLOperationHandler<ItemResponse<Dictionary<string, string>>>.PerformOperationAsync(
createTimerContext: () => metrics.Measure.Timer.Time(readLatencyTimer),
resultProducer: new SingleExecutionResultProducer<ItemResponse<Dictionary<string, string>>>(() => this.CreateReadOperation(
operation: i,
partitionKeyAttributeName: config.CollectionPartitionKey,
containers: initializationResult.Containers,
createdDocumentsPerContainer: this.createdDocuments)),
onSuccess: () =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(readSuccessMeter);
},
onFailure: (Exception ex) =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(readFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during read operation");
},
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
operationName: "Read",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));
}
else if (index < writeRange)
{
operations.Add(CTLOperationHandler<ItemResponse<Dictionary<string, string>>>.PerformOperationAsync(
createTimerContext: () => metrics.Measure.Timer.Time(writeLatencyTimer),
resultProducer: new SingleExecutionResultProducer<ItemResponse<Dictionary<string, string>>>(() => this.CreateWriteOperation(
operation: i,
partitionKeyAttributeName: config.CollectionPartitionKey,
containers: initializationResult.Containers,
isContentResponseOnWriteEnabled: config.IsContentResponseOnWriteEnabled.Value)),
onSuccess: () =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(writeSuccessMeter);
},
onFailure: (Exception ex) =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(writeFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during write operation");
},
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
operationName: "Write",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));
}
else
{
operations.Add(CTLOperationHandler<FeedResponse<Dictionary<string, string>>>.PerformOperationAsync(
createTimerContext: () => metrics.Measure.Timer.Time(queryLatencyTimer),
resultProducer: new IteratorResultProducer<Dictionary<string, string>>(this.CreateQueryOperation(
operation: i,
containers: initializationResult.Containers)),
onSuccess: () =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(querySuccessMeter);
},
onFailure: (Exception ex) =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(queryFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during query operation");
},
logDiagnostics: (FeedResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
operationName: "Query",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));
}
}
await Task.WhenAll(operations);
stopwatch.Stop();
logger.LogInformation("[{0}] operations performed in [{1}] seconds.",
operations.Count, stopwatch.Elapsed.TotalSeconds);
}