void AsyncBatchRpcRetryingCaller::GroupAndSend()

in src/hbase/client/async-batch-rpc-retrying-caller.cc [246:319]


void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
    const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
  int64_t locate_timeout_ns;
  if (operation_timeout_ns_.count() > 0) {
    locate_timeout_ns = RemainingTimeNs();
    if (locate_timeout_ns <= 0) {
      FailAll(actions, tries);
      return;
    }
  } else {
    locate_timeout_ns = -1L;
  }

  GetRegionLocations(actions, locate_timeout_ns)
      .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
        ActionsByServer actions_by_server;
        std::vector<std::shared_ptr<Action>> locate_failed;

        for (uint64_t i = 0; i < loc.size(); ++i) {
          auto action = actions[i];
          if (loc[i].hasValue()) {
            auto region_loc = loc[i].value();
            // Add it to actions_by_server;
            auto search =
                actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
            if (search != actions_by_server.end()) {
              search->second->AddActionsByRegion(region_loc, action);
            } else {
              auto server_request = std::make_shared<ServerRequest>(region_loc);
              server_request->AddActionsByRegion(region_loc, action);
              auto server_name = std::make_shared<ServerName>(region_loc->server_name());
              actions_by_server[server_name] = server_request;
            }
            VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
                    << table_name_->ShortDebugString() << "] found in ["
                    << region_loc->region_name() << "]; RS["
                    << region_loc->server_name().host_name() << ":"
                    << region_loc->server_name().port() << "];";
          } else if (loc[i].hasException()) {
            folly::exception_wrapper ew = loc[i].exception();
            VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
                    << "for index:" << i << "; tries: " << tries
                    << "; max_attempts_: " << max_attempts_;
            // We might receive runtime error from location-cache.cc too, we are doing FailOne and
            // continue next one
            if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
              FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
            } else {
              AddError(action, loc[i].exception(), nullptr);
              locate_failed.push_back(action);
            }
          }
        }
        if (!actions_by_server.empty()) {
          Send(actions_by_server, tries);
        }

        if (!locate_failed.empty()) {
          TryResubmit(locate_failed, tries);
        }
      })
      .onError([=](const folly::exception_wrapper &ew) {
        VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
                << "tries: " << tries << "; max_attempts_: " << max_attempts_;
        std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
        if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
          FailAll(actions, tries, ew, nullptr);
        } else {
          TryResubmit(actions, tries);
        }
      });
  return;
}