in src/hbase/client/async-batch-rpc-retrying-caller.cc [246:319]
void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
int64_t locate_timeout_ns;
if (operation_timeout_ns_.count() > 0) {
locate_timeout_ns = RemainingTimeNs();
if (locate_timeout_ns <= 0) {
FailAll(actions, tries);
return;
}
} else {
locate_timeout_ns = -1L;
}
GetRegionLocations(actions, locate_timeout_ns)
.then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
ActionsByServer actions_by_server;
std::vector<std::shared_ptr<Action>> locate_failed;
for (uint64_t i = 0; i < loc.size(); ++i) {
auto action = actions[i];
if (loc[i].hasValue()) {
auto region_loc = loc[i].value();
// Add it to actions_by_server;
auto search =
actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
if (search != actions_by_server.end()) {
search->second->AddActionsByRegion(region_loc, action);
} else {
auto server_request = std::make_shared<ServerRequest>(region_loc);
server_request->AddActionsByRegion(region_loc, action);
auto server_name = std::make_shared<ServerName>(region_loc->server_name());
actions_by_server[server_name] = server_request;
}
VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
<< table_name_->ShortDebugString() << "] found in ["
<< region_loc->region_name() << "]; RS["
<< region_loc->server_name().host_name() << ":"
<< region_loc->server_name().port() << "];";
} else if (loc[i].hasException()) {
folly::exception_wrapper ew = loc[i].exception();
VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
<< "for index:" << i << "; tries: " << tries
<< "; max_attempts_: " << max_attempts_;
// We might receive runtime error from location-cache.cc too, we are doing FailOne and
// continue next one
if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
} else {
AddError(action, loc[i].exception(), nullptr);
locate_failed.push_back(action);
}
}
}
if (!actions_by_server.empty()) {
Send(actions_by_server, tries);
}
if (!locate_failed.empty()) {
TryResubmit(locate_failed, tries);
}
})
.onError([=](const folly::exception_wrapper &ew) {
VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
<< "tries: " << tries << "; max_attempts_: " << max_attempts_;
std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
FailAll(actions, tries, ew, nullptr);
} else {
TryResubmit(actions, tries);
}
});
return;
}