in src/hbase/client/async-batch-rpc-retrying-caller.cc [338:393]
void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
int32_t tries) {
int64_t remaining_ns;
if (operation_timeout_ns_.count() > 0) {
remaining_ns = RemainingTimeNs();
if (remaining_ns <= 0) {
std::vector<std::shared_ptr<Action>> failed_actions;
for (const auto &action_by_server : actions_by_server) {
for (auto &value : action_by_server.second->actions_by_region()) {
for (const auto &failed_action : value.second->actions()) {
failed_actions.push_back(failed_action);
}
}
}
FailAll(failed_actions, tries);
return;
}
} else {
remaining_ns = std::numeric_limits<int64_t>::max();
}
std::vector<std::shared_ptr<Request>> multi_reqv;
for (const auto &action_by_server : actions_by_server)
multi_reqv.push_back(
std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
GetMultiResponse(actions_by_server)
.then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
uint64_t num = 0;
for (const auto &action_by_server : actions_by_server) {
if (completed_responses[num].hasValue()) {
auto multi_response =
ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
action_by_server.second->actions_by_region());
OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
std::move(multi_response));
} else if (completed_responses[num].hasException()) {
folly::exception_wrapper ew = completed_responses[num].exception();
VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
<< " from server for action index:" << num;
OnError(action_by_server.second->actions_by_region(), tries, ew,
action_by_server.first);
}
num++;
}
})
.onError([=](const folly::exception_wrapper &ew) {
VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
for (const auto &action_by_server : actions_by_server) {
OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
}
});
return;
}