in cachelib/benchmarks/MMTypeBench.cpp [56:146]
void runBench(typename MMType::Config config, BenchType benchType) {
// mutex for our lru
// Mutex mutex;
std::vector<std::thread> threads;
// thread local stats.
// std::vector<Stats> stats(FLAGS_num_threads);
// count of number of threads completed
std::atomic<unsigned int> nCompleted{0};
// main thread will wait on this to figure out when the benchmark is
// complete
std::mutex benchFinishMutex;
bool benchFinished = false;
std::condition_variable benchFinishCV;
// all benchmark threads will wait on this after completing the benchmark
// and before doing the thread cleanup.
bool cleanUpStart = false;
std::condition_variable cleanupCV;
std::mutex cleanupMutex;
// Create comman container and nodes for all threads.
// Also add nodes to the container except for tAdd benchmark.
std::unique_ptr<MMTypeBench<MMType>> myBench;
BENCHMARK_SUSPEND {
bool addNodesToContainer = (benchType != tAdd);
myBench = std::make_unique<MMTypeBench<MMType>>();
myBench->createContainer(config);
myBench->createNodes(FLAGS_num_nodes, FLAGS_num_threads,
addNodesToContainer);
}
auto runInThread = [&](unsigned int index) {
switch (benchType) {
case tAdd:
myBench->benchAdd(index);
break;
case tRemove:
myBench->benchRemove(FLAGS_num_nodes, index);
break;
case tRemoveIterator:
myBench->benchRemoveIterator(FLAGS_num_nodes);
break;
case tRecordAccessRead:
myBench->benchRecordAccessRead(FLAGS_num_nodes, index, FLAGS_num_access);
break;
case tRecordAccessWrite:
myBench->benchRecordAccessWrite(FLAGS_num_nodes, index, FLAGS_num_access);
break;
}
if (++nCompleted == FLAGS_num_threads) {
{
std::unique_lock<std::mutex> l(benchFinishMutex);
benchFinished = true;
}
benchFinishCV.notify_one();
}
std::unique_lock<std::mutex> l(cleanupMutex);
cleanupCV.wait(l, [&] { return cleanUpStart; });
};
BENCHMARK_SUSPEND {
for (unsigned int i = 0; i < FLAGS_num_threads; i++) {
threads.push_back(std::thread{runInThread, i});
}
}
BENCHMARK_SUSPEND {
// wait for benchmark to finish.
std::unique_lock<std::mutex> l(benchFinishMutex);
benchFinishCV.wait(l, [&] { return benchFinished; });
}
BENCHMARK_SUSPEND {
{
std::unique_lock<std::mutex> l(cleanupMutex);
cleanUpStart = true;
}
cleanupCV.notify_all();
for (auto& thread : threads) {
thread.join();
}
myBench->deleteContainer();
myBench.reset();
}
}