in src/hbase/client/async-rpc-retrying-caller.cc [110:156]
void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
const exception_wrapper& error, Supplier<std::string> err_msg,
Consumer<exception_wrapper> update_cached_location) {
ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
exceptions_->push_back(twec);
if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
CompleteExceptionally();
return;
}
if (tries_ > start_log_errors_count_) {
LOG(WARNING) << err_msg();
} else {
VLOG(1) << err_msg();
}
int64_t delay_ns;
if (operation_timeout_nanos_.count() > 0) {
int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
if (max_delay_ns <= 0) {
CompleteExceptionally();
return;
}
delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
} else {
delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
}
update_cached_location(error);
tries_++;
/*
* The HHWheelTimer::scheduleTimeout() fails with an assertion from
* EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of
* the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry
* timer from IOThreadPool threads. It only works when executed from a single-thread pool
* (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should
* still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool.
* IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection
* establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread
* just hangs because it deadlocks itself.
*/
conn_->retry_executor()->add([=]() {
retry_timer_->scheduleTimeoutFn(
[=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); },
std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
});
}