in cs/playground/AsyncStress/Program.cs [120:278]
private static async Task ProfileStore<TStore, TKey, TValue>(TStore store, Func<int, TKey> keyGen, Func<int, TValue> valueGen)
where TStore : IFasterWrapper<TKey, TValue>
{
static string threadingModeString(ThreadingMode threadingMode)
=> threadingMode switch
{
ThreadingMode.Single => "Single threading",
ThreadingMode.ParallelFor => "Parallel.For issuing async operations",
ThreadingMode.Chunks => $"Chunks partitioned across {numChunks} tasks",
_ => throw new ApplicationException($"Unknown threading mode {threadingMode}")
};
Console.WriteLine(" Creating database");
(TKey, TValue)[] database = new (TKey, TValue)[numOperations];
TKey[] keys = new TKey[numOperations];
for (int i = 0; i < numOperations; i++)
{
database[i] = (keyGen(i), valueGen(i));
keys[i] = database[i].Item1;
}
Console.WriteLine(" Creation complete");
Assert.True(numOperations % numChunks == 0, $"Number of operations {numOperations:N0} should be a multiple of number of chunks {numChunks}");
int chunkSize = numOperations / numChunks;
async ValueTask doUpdates()
{
if (insertThreadingMode == ThreadingMode.Single)
{
if (useRmw)
{
for (int i = 0; i < numOperations; i++)
await store.RMWAsync(database[i].Item1, database[i].Item2);
}
else
{
for (int i = 0; i < numOperations; i++)
await store.UpsertAsync(database[i].Item1, database[i].Item2);
}
}
else if (insertThreadingMode == ThreadingMode.ParallelFor)
{
var writeTasks = new ValueTask[numOperations];
if (useRmw)
Parallel.For(0, numOperations, i => writeTasks[i] = store.RMWAsync(database[i].Item1, database[i].Item2));
else
Parallel.For(0, numOperations, i => writeTasks[i] = store.UpsertAsync(database[i].Item1, database[i].Item2));
foreach (var task in writeTasks)
await task;
}
else if (insertThreadingMode == ThreadingMode.Chunks)
{
var chunkTasks = new ValueTask[numChunks];
if (useRmw)
{
for (int i = 0; i < numChunks; i++)
chunkTasks[i] = store.RMWChunkAsync(database, i * chunkSize, chunkSize);
}
else
{
for (int i = 0; i < numChunks; i++)
chunkTasks[i] = store.UpsertChunkAsync(database, i * chunkSize, chunkSize);
}
foreach (var chunkTask in chunkTasks)
await chunkTask;
}
else
throw new InvalidOperationException($"Invalid threading mode {insertThreadingMode}");
}
// Insert
if (insertThreadingMode == ThreadingMode.None)
{
throw new ApplicationException("Cannot Skip initial population");
}
else
{
Console.WriteLine($" Inserting {numOperations:N0} records via {(useRmw ? "RMW" : "Upsert")} with {threadingModeString(insertThreadingMode)} ...");
var sw = Stopwatch.StartNew();
await doUpdates();
sw.Stop();
Console.WriteLine($" Insertion complete in {sw.ElapsedMilliseconds / 1000.0:N3} sec; TailAddress = {store.TailAddress}, Pending = {store.PendingCount:N0}");
}
store.ClearPendingCount();
// Read
Console.WriteLine();
if (operationThreadingMode == ThreadingMode.None)
{
Console.WriteLine(" Skipping Operations");
}
else
{
var opString = (useRmw, useUpsert) switch
{
(true, _) => "RMW",
(_, true) => "Upsert",
_ => "Read"
};
Console.WriteLine($" Performing {numOperations:N0} {opString}s with {threadingModeString(operationThreadingMode)} (OS buffering: {store.UseOsReadBuffering}) ...");
(Status, TValue)[] results = new (Status, TValue)[numOperations];
var sw = Stopwatch.StartNew();
if (useRmw || useUpsert)
{
// Just update with the same values.
await doUpdates();
sw.Stop();
Console.WriteLine($" Operations complete in {sw.ElapsedMilliseconds / 1000.0:N3} sec; TailAddress = {store.TailAddress}, Pending = {store.PendingCount:N0}");
Console.WriteLine(" Operation was not Read so skipping verification");
}
else
{
if (operationThreadingMode == ThreadingMode.Single)
{
for (int i = 0; i < numOperations; i++)
results[i] = await store.ReadAsync(database[i].Item1);
}
else if (operationThreadingMode == ThreadingMode.ParallelFor)
{
var readTasks = new ValueTask<(Status, TValue)>[numOperations];
Parallel.For(0, numOperations, i => readTasks[i] = store.ReadAsync(database[i].Item1));
for (int i = 0; i < numOperations; i++)
results[i] = await readTasks[i];
}
else if (operationThreadingMode == ThreadingMode.Chunks)
{
var chunkTasks = new ValueTask<(Status, TValue)[]>[numChunks];
for (int i = 0; i < numChunks; i++)
chunkTasks[i] = store.ReadChunkAsync(keys, i * chunkSize, chunkSize);
for (int i = 0; i < numChunks; i++)
{
var result = await chunkTasks[i];
Array.Copy(result, 0, results, i * chunkSize, chunkSize);
}
}
else
throw new InvalidOperationException($"Invalid threading mode {operationThreadingMode}");
sw.Stop();
Console.WriteLine($" {opString}s complete in {sw.ElapsedMilliseconds / 1000.0:N3} sec");
Console.WriteLine(" Verifying read results ...");
Parallel.For(0, numOperations, i =>
{
Assert.Equal(Status.OK, results[i].Item1);
Assert.Equal(database[i].Item2, results[i].Item2);
});
Console.WriteLine(" Results verified");
}
}
store.Dispose();
}