in src/hbase/client/async-rpc-retrying-caller.cc [159:200]
void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
int64_t call_timeout_ns;
if (operation_timeout_nanos_.count() > 0) {
call_timeout_ns = this->RemainingTimeNs();
if (call_timeout_ns <= 0) {
this->CompleteExceptionally();
return;
}
call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count());
} else {
call_timeout_ns = rpc_timeout_nanos_.count();
}
std::shared_ptr<RpcClient> rpc_client;
rpc_client = conn_->rpc_client();
ResetController(controller_, call_timeout_ns);
// TODO: RegionLocation should propagate through these method chains as a shared_ptr.
// Otherwise, it may get deleted underneat us. We are just copying for now.
auto loc_ptr = std::make_shared<RegionLocation>(loc);
callable_(controller_, loc_ptr, rpc_client)
.then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); })
.onError([&, loc_ptr, this](const exception_wrapper& e) {
OnError(
e,
[&, this, e]() -> std::string {
return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
loc_ptr->server_name().port()) +
" for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
table_name_->namespace_() + "::" + table_name_->qualifier() +
" failed with e.what()=" + e.what().toStdString() + ", tries = " +
std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) +
", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) +
" ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms";
},
[&, this](const exception_wrapper& error) {
conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
});
});
}