in src/hbase/client/response-converter.cc [127:203]
std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(
std::shared_ptr<Request> req, const Response& resp,
const ServerRequest::ActionsByRegion& actions_by_region) {
auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg());
auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg());
VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
int req_region_action_count = multi_req->regionaction_size();
int res_region_action_count = multi_resp->regionactionresult_size();
if (req_region_action_count != res_region_action_count) {
throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) +
" does not match response mutation result count=" +
std::to_string(res_region_action_count));
}
auto multi_response = std::make_unique<hbase::MultiResponse>();
for (int32_t num = 0; num < res_region_action_count; num++) {
hbase::pb::RegionAction actions = multi_req->regionaction(num);
hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num);
hbase::pb::RegionSpecifier rs = actions.region();
if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) {
throw std::runtime_error("We support only encoded types for protobuf multi response.");
}
auto region_name = rs.value();
if (action_result.has_exception()) {
auto ew = ResponseConverter::GetRemoteException(action_result.exception());
VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
<< region_name << "];";
multi_response->AddRegionException(region_name, ew);
continue;
}
if (actions.action_size() != action_result.resultorexception_size()) {
throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) +
", action_result.resultorexception_size=" +
std::to_string(action_result.resultorexception_size()) +
" for region " + actions.region().value());
}
auto multi_actions = actions_by_region.at(region_name)->actions();
uint64_t multi_actions_num = 0;
for (hbase::pb::ResultOrException roe : action_result.resultorexception()) {
std::shared_ptr<Result> result;
std::shared_ptr<folly::exception_wrapper> ew;
if (roe.has_exception()) {
auto ew = ResponseConverter::GetRemoteException(roe.exception());
VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region["
<< region_name << "];";
multi_response->AddRegionException(region_name, ew);
} else if (roe.has_result()) {
result = ToResult(roe.result(), resp.cell_scanner());
} else if (roe.has_service_result()) {
// TODO Not processing Coprocessor Service Result;
} else {
// Sometimes, the response is just "it was processed". Generally, this occurs for things
// like mutateRows where either we get back 'processed' (or not) and optionally some
// statistics about the regions we touched.
std::vector<std::shared_ptr<Cell>> empty_cells;
result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false,
false, false);
}
// We add the original index of the multi-action so that when populating the response back we
// do it as per the action index
multi_response->AddRegionResult(
region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew);
multi_actions_num++;
}
}
if (multi_resp->has_regionstatistics()) {
hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics();
for (int i = 0; i < stats.region_size(); i++) {
multi_response->AddStatistic(stats.region(i).value(),
std::make_shared<RegionLoadStats>(stats.stat(i)));
}
}
return multi_response;
}