void AsyncBatchRpcRetryingCaller::Send()

in src/hbase/client/async-batch-rpc-retrying-caller.cc [338:393]


void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
                                                  int32_t tries) {
  int64_t remaining_ns;
  if (operation_timeout_ns_.count() > 0) {
    remaining_ns = RemainingTimeNs();
    if (remaining_ns <= 0) {
      std::vector<std::shared_ptr<Action>> failed_actions;
      for (const auto &action_by_server : actions_by_server) {
        for (auto &value : action_by_server.second->actions_by_region()) {
          for (const auto &failed_action : value.second->actions()) {
            failed_actions.push_back(failed_action);
          }
        }
      }
      FailAll(failed_actions, tries);
      return;
    }
  } else {
    remaining_ns = std::numeric_limits<int64_t>::max();
  }

  std::vector<std::shared_ptr<Request>> multi_reqv;
  for (const auto &action_by_server : actions_by_server)
    multi_reqv.push_back(
        std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));

  GetMultiResponse(actions_by_server)
      .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
        uint64_t num = 0;
        for (const auto &action_by_server : actions_by_server) {
          if (completed_responses[num].hasValue()) {
            auto multi_response =
                ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
                                              action_by_server.second->actions_by_region());
            OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
                       std::move(multi_response));
          } else if (completed_responses[num].hasException()) {
            folly::exception_wrapper ew = completed_responses[num].exception();
            VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
                    << " from server for action index:" << num;
            OnError(action_by_server.second->actions_by_region(), tries, ew,
                    action_by_server.first);
          }
          num++;
        }
      })
      .onError([=](const folly::exception_wrapper &ew) {
        VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
        for (const auto &action_by_server : actions_by_server) {
          OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
        }
      });
  return;
}