in e2e-examples/gcs/benchmark/grpc_runner.cc [119:184]
bool GrpcRunner::Run() {
std::function<std::shared_ptr<grpc::Channel>()> channel_creator = [&]() {
return CreateBenchmarkGrpcChannel(parameters_);
};
if (parameters_.ctest > 0) {
return run_ctest(channel_creator, parameters_);
}
if (parameters_.mtest > 0) {
return run_mtest(channel_creator, parameters_);
}
// Initializes a gRPC channel pool.
std::shared_ptr<StorageStubProvider> stub_pool;
if (parameters_.cpolicy == "const") {
stub_pool = CreateConstChannelPool(channel_creator);
} else if (parameters_.cpolicy == "pool") {
if (parameters_.carg <= 0) {
std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
return false;
}
stub_pool = CreateRoundRobinChannelPool(channel_creator, parameters_.carg);
} else if (parameters_.cpolicy == "bpool") {
if (parameters_.carg <= 0) {
std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
return false;
}
stub_pool =
CreateRoundRobinPlusChannelPool(channel_creator, parameters_.carg);
} else if (parameters_.cpolicy == "spool") {
if (parameters_.carg <= 0) {
std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
return false;
}
stub_pool =
CreateSmartRoundRobinChannelPool(channel_creator, parameters_.carg);
}
// Spawns benchmark threads and waits until they're done.
std::vector<std::thread> threads;
std::vector<bool> returns(parameters_.threads);
work_queue_.reset(new WorkQueue(parameters_.threads, parameters_.runs,
parameters_.steal_work));
for (int i = 1; i <= parameters_.threads; i++) {
int thread_id = i;
std::shared_ptr<StorageStubProvider> storage_stub_provider;
if (stub_pool != nullptr) {
storage_stub_provider = stub_pool;
} else if (parameters_.cpolicy == "perthread") {
storage_stub_provider = CreateConstChannelPool(channel_creator);
} else if (parameters_.cpolicy == "percall") {
storage_stub_provider =
CreateCreateNewChannelStubProvider(channel_creator);
}
threads.emplace_back([thread_id, storage_stub_provider, &returns, this]() {
bool r = this->DoOperation(thread_id, storage_stub_provider);
if (!r && !parameters_.wait_threads) {
std::cerr << "Thread id=" << thread_id << " stopped." << std::endl;
exit(1);
}
returns[thread_id - 1] = r;
});
}
std::for_each(threads.begin(), threads.end(),
[](std::thread& t) { t.join(); });
return std::all_of(returns.begin(), returns.end(), [](bool v) { return v; });
}