auto MetaClient::retry()

in src/client/meta/MetaClient.cc [340:469]


auto MetaClient::retry(Func &&func, Req &&req, RetryConfig retryConfig, std::function<void(const Status &)> onError)
    -> std::invoke_result_t<Func, Stub::IStub, Req &&, const net::UserRequestOptions &, serde::Timestamp *> {
  req.client = clientId_;
  CHECK_REQUEST(req);

  // auto reqInfo = RequestInfo::get();
  auto opName = MetaSerde<>::getRpcName(req);
  ExponentialBackoffRetry backoff(retryConfig.retry_init_wait().asMs(),
                                  retryConfig.retry_max_wait().asMs(),
                                  retryConfig.retry_total_time().asMs());
  auto options = net::UserRequestOptions();
  options.timeout = retryConfig.rpc_timeout();

  OperationRecorder::Guard record(OperationRecorder::client(), opName, req.user.uid);
  std::optional<ServerNode> server;

  auto getServer = [&]() -> CoTryTask<void> {
    if (server.has_value()) {
      co_return Void{};
    }
    auto result = co_await getServerNode();
    if (!result.hasError()) {
      server.emplace(std::move(*result));
      co_return Void{};
    }
    auto waitTime = backoff.getWaitTime();
    XLOGF(ERR,
          "Op {} get server failed, req {}, error {}, retried {}, wait {}",
          opName,
          req,
          result.error(),
          record.retry(),
          waitTime);
    if (waitTime.count() == 0) {
      CO_RETURN_ERROR(result);
    }
    co_await folly::coro::sleep(waitTime);
    co_return Void{};
  };

  while (true) {
    CO_RETURN_ON_ERROR(co_await getServer());
    if (!server.has_value()) {
      continue;
    }

    Status error(MetaCode::kFoundBug);
    {
      SemaphoreGuard concurrentReq(concurrentReqSemaphore_);
      co_await concurrentReq.coWait();

      // if (reqInfo && reqInfo->canceled()) {
      //   XLOGF(WARN, "Op {}{} canceled by client {}", opName, req, reqInfo->describe());
      //   record.finish(Status(MetaCode::kRequestCanceled));
      //   co_return makeError(MetaCode::kRequestCanceled, "req canceled");
      // }

      serde::Timestamp timestamp;
      auto result = co_await (server->stub.get()->*func)(std::forward<Req>(req), options, &timestamp);
      RECORD_TIMESATAMP(opName, result, timestamp);
      if (ErrorHandling::success(result)) {
        // success
        record.finish(result);
        errNodes_.wlock()->erase(server->node.nodeId);
        XLOGF_IF(INFO, result.hasError(), "Op {}{}, result {}", opName, reqDescribe(req), rspDescribe(result.error()));
        XLOGF_IF(DBG, !result.hasError(), "Op {}{}, result {}", opName, req, *result);

        if (result.hasValue()) {
          CO_RETURN_ON_ERROR(co_await waitRoutingInfo(*result, retryConfig));
        }

        co_return result;
      }
      XLOGF_IF(FATAL, !result.hasError(), "Result is ok");
      error = std::move(result.error());
    }
    if (onError) {
      onError(error);
    }

    // retry
    auto waitTime = backoff.getWaitTime();
    switch (error.code()) {
      // retry fast
      case RPCCode::kRequestRefused:
        reqRejected.addSample(1, {{"tag", fmt::format("{}", server->node.nodeId.toUnderType())}});
      case RPCCode::kSendFailed:
      case MetaCode::kBusy:
        waitTime = std::min(retryConfig.retry_init_wait(), retryConfig.retry_fast());
        break;
      // retry default
      default:
        break;
    }
    auto retryable = ErrorHandling::retryable(error);
    auto serverNode = server->node;
    bool failover = false;
    if (ErrorHandling::serverError(error)) {
      serverError.addSample(1, {{"tag", fmt::format("{}", server->node.nodeId.toUnderType())}});
      failover = (++server->failure) >= retryConfig.max_failures_before_failover();
    }
    XLOGF(ERR,
          "Op {} failed on {}, req {}, error {}, retryable {}, failover {}, retried {}, elapsed {}",
          opName,
          server->node,
          req,
          error,
          retryable,
          failover,
          record.retry(),
          backoff.getElapsedTime());
    if (failover) {
      errNodes_.wlock()->insert(server->node.nodeId);
      server = std::nullopt;
    }
    if (!retryable) {
      // can't retry
      record.finish(error);
      co_return makeError(std::move(error));
    }
    if (waitTime.count() == 0 || backoff.getElapsedTime() > retryConfig.retry_total_time().asMs()) {
      XLOGF(CRITICAL, "Op {}{} failed {}, retry timeout", opName, req, error);
      record.finish(error);
      co_return makeError(std::move(error));
    }

    co_await folly::coro::sleep(waitTime);
    record.retry()++;
  }
}