in src/hbase/client/async-connection.cc [25:53]
void AsyncConnectionImpl::Init() {
connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
// start thread pools
auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
/*
* We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly.
* Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments
* in async-rpc-retrying-caller.cc.
*/
retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
std::shared_ptr<Codec> codec = nullptr;
if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
std::string(KeyValueCodec::kJavaClassName)) {
codec = std::make_shared<hbase::KeyValueCodec>();
} else {
LOG(WARNING) << "Not using RPC Cell Codec";
}
rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_,
connection_conf_->connect_timeout());
location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_,
rpc_client_->connection_pool());
caller_factory_ =
std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
}