in src/hbase/client/async-batch-rpc-retrying-caller.cc [396:441]
void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
const ActionsByRegion &actions_by_region, int32_t tries,
const std::shared_ptr<ServerName> server_name,
const std::unique_ptr<hbase::MultiResponse> multi_response) {
std::vector<std::shared_ptr<Action>> failed_actions;
const auto region_results = multi_response->RegionResults();
for (const auto &action_by_region : actions_by_region) {
auto region_result_itr = region_results.find(action_by_region.first);
if (region_result_itr != region_results.end()) {
for (const auto &action : action_by_region.second->actions()) {
OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
failed_actions);
}
} else if (region_result_itr == region_results.end()) {
auto region_exc = multi_response->RegionException(action_by_region.first);
if (region_exc == nullptr) {
// FailAll actions for this particular region as inconsistent server response. So we raise
// this exception to the application
std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
" sent us neither results nor exceptions for " +
action_by_region.first;
VLOG(1) << err_msg;
auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
FailAll(action_by_region.second->actions(), tries, ew, server_name);
} else {
// Eg: org.apache.hadoop.hbase.NotServingRegionException:
LogException(tries, action_by_region.second, *region_exc, server_name);
if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
return;
}
location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
*region_exc);
AddError(action_by_region.second->actions(), *region_exc, server_name);
for (const auto &action : action_by_region.second->actions()) {
failed_actions.push_back(action);
}
}
}
}
if (!failed_actions.empty()) {
TryResubmit(failed_actions, tries);
}
return;
}