bool GrpcRunner::Run()

in e2e-examples/gcs/benchmark/grpc_runner.cc [119:184]


bool GrpcRunner::Run() {
  std::function<std::shared_ptr<grpc::Channel>()> channel_creator = [&]() {
    return CreateBenchmarkGrpcChannel(parameters_);
  };
  if (parameters_.ctest > 0) {
    return run_ctest(channel_creator, parameters_);
  }
  if (parameters_.mtest > 0) {
    return run_mtest(channel_creator, parameters_);
  }

  // Initializes a gRPC channel pool.
  std::shared_ptr<StorageStubProvider> stub_pool;
  if (parameters_.cpolicy == "const") {
    stub_pool = CreateConstChannelPool(channel_creator);
  } else if (parameters_.cpolicy == "pool") {
    if (parameters_.carg <= 0) {
      std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
      return false;
    }
    stub_pool = CreateRoundRobinChannelPool(channel_creator, parameters_.carg);
  } else if (parameters_.cpolicy == "bpool") {
    if (parameters_.carg <= 0) {
      std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
      return false;
    }
    stub_pool =
        CreateRoundRobinPlusChannelPool(channel_creator, parameters_.carg);
  } else if (parameters_.cpolicy == "spool") {
    if (parameters_.carg <= 0) {
      std::cerr << "Invalid carg: " << parameters_.carg << std::endl;
      return false;
    }
    stub_pool =
        CreateSmartRoundRobinChannelPool(channel_creator, parameters_.carg);
  }

  // Spawns benchmark threads and waits until they're done.
  std::vector<std::thread> threads;
  std::vector<bool> returns(parameters_.threads);
  work_queue_.reset(new WorkQueue(parameters_.threads, parameters_.runs,
                                  parameters_.steal_work));
  for (int i = 1; i <= parameters_.threads; i++) {
    int thread_id = i;
    std::shared_ptr<StorageStubProvider> storage_stub_provider;
    if (stub_pool != nullptr) {
      storage_stub_provider = stub_pool;
    } else if (parameters_.cpolicy == "perthread") {
      storage_stub_provider = CreateConstChannelPool(channel_creator);
    } else if (parameters_.cpolicy == "percall") {
      storage_stub_provider =
          CreateCreateNewChannelStubProvider(channel_creator);
    }
    threads.emplace_back([thread_id, storage_stub_provider, &returns, this]() {
      bool r = this->DoOperation(thread_id, storage_stub_provider);
      if (!r && !parameters_.wait_threads) {
        std::cerr << "Thread id=" << thread_id << " stopped." << std::endl;
        exit(1);
      }
      returns[thread_id - 1] = r;
    });
  }
  std::for_each(threads.begin(), threads.end(),
                [](std::thread& t) { t.join(); });
  return std::all_of(returns.begin(), returns.end(), [](bool v) { return v; });
}