in src/hbase/client/async-scan-rpc-retrying-caller.cc [299:352]
void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
<< " for scanner id = " << scanner_id_ << " for "
<< region_location_->region_info().ShortDebugString()
<< " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
<< ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
<< " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
<< error.what().toStdString();
}
bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
exceptions_->push_back(twec);
if (tries_ >= max_retries_) {
CompleteExceptionally(!scanner_closed);
return;
}
int64_t delay_ns;
if (scan_timeout_nanos_.count() > 0) {
int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
if (max_delay_ns <= 0) {
CompleteExceptionally(!scanner_closed);
return;
}
delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
} else {
delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
}
if (scanner_closed) {
CompleteWhenError(false);
return;
}
if (ExceptionUtil::IsScannerOutOfOrder(error)) {
CompleteWhenError(true);
return;
}
if (!ExceptionUtil::ShouldRetry(error)) {
CompleteExceptionally(true);
return;
}
tries_++;
auto self(shared_from_this());
conn_->retry_executor()->add([&]() {
retry_timer_->scheduleTimeoutFn(
[self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
});
}