CoTryTask sendOpsWithRetry()

in src/client/storage/StorageClientImpl.cc [1144:1300]


CoTryTask<void> sendOpsWithRetry(ClientRequestContext &requestCtx,
                                 UpdateChannelAllocator &chanAllocator,
                                 const std::vector<Op *> &ops,
                                 auto &&sendOps,
                                 const RetryOptions &options) {
  ExponentialBackoffRetry backoffRetry(options.init_wait_time().asMs(),
                                       options.max_wait_time().asMs(),
                                       options.max_retry_time().asMs());
  std::chrono::milliseconds requestTimeout;
  uint32_t retryCount = 0;

  std::vector<Op *> pendingOps(begin(ops), end(ops));
  for (auto &op : pendingOps) op->resetResult();

  while (true) {
    requestTimeout = backoffRetry.getWaitTime();

    if (requestTimeout.count() == 0) {
      XLOGF(ERR,
            "Give up retrying {}/{} ops {} after #{} retries, "
            "elapsed time: {}, max retry time: {}, user: {}/{}, usercall: #{}",
            pendingOps.size(),
            ops.size(),
            fmt::ptr(&ops),
            retryCount,
            backoffRetry.getElapsedTime(),
            options.max_retry_time().asMs(),
            requestCtx.userInfo.uid,
            requestCtx.userInfo.gid,
            requestCtx.userCallId);

      for (const auto &op : pendingOps) {
        releaseChannelsForOp(chanAllocator, op);
        if (op->statusCode() == StorageClientCode::kNotInitialized) {
          op->result = StorageClientCode::kTimeout;
        }
      }

      goto exit;
    }

    XLOGF_IF(INFO,
             retryCount > 0,
             "Sending request with {}/{} ops {}, #{} retry, "
             "timeout {}, elapsed time: {}, max retry time: {}, user: {}/{}, usercall: #{}",
             pendingOps.size(),
             ops.size(),
             fmt::ptr(&ops),
             retryCount,
             requestTimeout,
             backoffRetry.getElapsedTime(),
             options.max_retry_time().asMs(),
             requestCtx.userInfo.uid,
             requestCtx.userInfo.gid,
             requestCtx.userCallId);

    requestCtx.retryCount = retryCount;
    requestCtx.requestTimeout = Duration(requestTimeout);
    requestCtx.initDebugFlags();

    auto requestStartTime = SteadyClock::now();
    co_await sendOps(pendingOps);
    auto requestLatency = std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - requestStartTime);
    auto waitTime = std::max(std::chrono::milliseconds(0), requestTimeout - requestLatency);

    XLOGF_IF(INFO,
             (retryCount > 0 || requestLatency > requestTimeout * 2),
             "Completed request with {}/{} ops {}, #{} retry, timeout {}, latency {}, user: {}/{}, usercall: #{}",
             pendingOps.size(),
             ops.size(),
             fmt::ptr(&ops),
             retryCount,
             requestTimeout,
             requestLatency,
             requestCtx.userInfo.uid,
             requestCtx.userInfo.gid,
             requestCtx.userCallId);

    std::vector<Op *> remainingOps;
    std::unordered_set<TargetOnChain> failedTargets;

    for (const auto op : pendingOps) {
      if constexpr (requires { op->result.statusCode; }) {
        op->result.statusCode = StatusCodeConversion::convertToStorageClientCode(op->result.statusCode);
      } else {
        op->result.lengthInfo = StatusCodeConversion::convertToStorageClientCode(op->result.lengthInfo);
      }

      if (!options.retry_permanent_error() && isPermanentError(op->statusCode())) {
        releaseChannelsForOp(chanAllocator, op);
      } else if (op->statusCode() != StatusCode::kOK) {
        remainingOps.push_back(op);

        if (isTemporarilyUnavailable(op->statusCode()))
          failedTargets.emplace(op->routingTarget.targetInfo.targetId,
                                op->routingTarget.chainId,
                                op->routingTarget.chainVer);

        if (isFastRetryError(op->statusCode())) waitTime = std::min(waitTime, options.init_wait_time().asMs() / 2);
      }
    }

    for (const auto &failedTarget : failedTargets) {
      const auto &[targetId, chainId, chainVer] = failedTarget;
      requestCtx.numFailures[failedTarget]++;
      XLOGF(INFO,
            "Cannot access storage target {} on {}@{} for {} times during processing of {}/{} ops {}, "
            "user: {}/{}, usercall: #{}",
            targetId,
            chainId,
            chainVer,
            requestCtx.numFailures[failedTarget],
            pendingOps.size(),
            ops.size(),
            fmt::ptr(&ops),
            requestCtx.userInfo.uid,
            requestCtx.userInfo.gid,
            requestCtx.userCallId);
    }

    if (remainingOps.empty()) {
      XLOGF_IF(INFO,
               retryCount > 0,
               "All {}/{} pending ops {} processed after #{} retries, elapsed time: {}, user: {}/{}, usercall: #{}",
               pendingOps.size(),
               ops.size(),
               fmt::ptr(&ops),
               retryCount,
               backoffRetry.getElapsedTime(),
               requestCtx.userInfo.uid,
               requestCtx.userInfo.gid,
               requestCtx.userCallId);
      goto exit;
    }

    num_retried_ops.addSample(remainingOps.size(), requestCtx.requestTagSet);
    num_retried_ops_per_user.addSample(remainingOps.size(), requestCtx.userTagSet);
    pendingOps.swap(remainingOps);

    XLOGF(INFO,
          "Waiting {} before #{} retrying {}/{} remaining ops {}, user: {}/{}, usercall: #{}",
          waitTime,
          ++retryCount,
          pendingOps.size(),
          ops.size(),
          fmt::ptr(&ops),
          requestCtx.userInfo.uid,
          requestCtx.userInfo.gid,
          requestCtx.userCallId);

    if (waitTime.count() > 0) co_await folly::coro::sleep(waitTime);
  }

exit:

  co_return Void{};
}