in src/client/storage/StorageClientImpl.cc [1144:1300]
CoTryTask<void> sendOpsWithRetry(ClientRequestContext &requestCtx,
UpdateChannelAllocator &chanAllocator,
const std::vector<Op *> &ops,
auto &&sendOps,
const RetryOptions &options) {
ExponentialBackoffRetry backoffRetry(options.init_wait_time().asMs(),
options.max_wait_time().asMs(),
options.max_retry_time().asMs());
std::chrono::milliseconds requestTimeout;
uint32_t retryCount = 0;
std::vector<Op *> pendingOps(begin(ops), end(ops));
for (auto &op : pendingOps) op->resetResult();
while (true) {
requestTimeout = backoffRetry.getWaitTime();
if (requestTimeout.count() == 0) {
XLOGF(ERR,
"Give up retrying {}/{} ops {} after #{} retries, "
"elapsed time: {}, max retry time: {}, user: {}/{}, usercall: #{}",
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
retryCount,
backoffRetry.getElapsedTime(),
options.max_retry_time().asMs(),
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
for (const auto &op : pendingOps) {
releaseChannelsForOp(chanAllocator, op);
if (op->statusCode() == StorageClientCode::kNotInitialized) {
op->result = StorageClientCode::kTimeout;
}
}
goto exit;
}
XLOGF_IF(INFO,
retryCount > 0,
"Sending request with {}/{} ops {}, #{} retry, "
"timeout {}, elapsed time: {}, max retry time: {}, user: {}/{}, usercall: #{}",
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
retryCount,
requestTimeout,
backoffRetry.getElapsedTime(),
options.max_retry_time().asMs(),
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
requestCtx.retryCount = retryCount;
requestCtx.requestTimeout = Duration(requestTimeout);
requestCtx.initDebugFlags();
auto requestStartTime = SteadyClock::now();
co_await sendOps(pendingOps);
auto requestLatency = std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - requestStartTime);
auto waitTime = std::max(std::chrono::milliseconds(0), requestTimeout - requestLatency);
XLOGF_IF(INFO,
(retryCount > 0 || requestLatency > requestTimeout * 2),
"Completed request with {}/{} ops {}, #{} retry, timeout {}, latency {}, user: {}/{}, usercall: #{}",
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
retryCount,
requestTimeout,
requestLatency,
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
std::vector<Op *> remainingOps;
std::unordered_set<TargetOnChain> failedTargets;
for (const auto op : pendingOps) {
if constexpr (requires { op->result.statusCode; }) {
op->result.statusCode = StatusCodeConversion::convertToStorageClientCode(op->result.statusCode);
} else {
op->result.lengthInfo = StatusCodeConversion::convertToStorageClientCode(op->result.lengthInfo);
}
if (!options.retry_permanent_error() && isPermanentError(op->statusCode())) {
releaseChannelsForOp(chanAllocator, op);
} else if (op->statusCode() != StatusCode::kOK) {
remainingOps.push_back(op);
if (isTemporarilyUnavailable(op->statusCode()))
failedTargets.emplace(op->routingTarget.targetInfo.targetId,
op->routingTarget.chainId,
op->routingTarget.chainVer);
if (isFastRetryError(op->statusCode())) waitTime = std::min(waitTime, options.init_wait_time().asMs() / 2);
}
}
for (const auto &failedTarget : failedTargets) {
const auto &[targetId, chainId, chainVer] = failedTarget;
requestCtx.numFailures[failedTarget]++;
XLOGF(INFO,
"Cannot access storage target {} on {}@{} for {} times during processing of {}/{} ops {}, "
"user: {}/{}, usercall: #{}",
targetId,
chainId,
chainVer,
requestCtx.numFailures[failedTarget],
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
}
if (remainingOps.empty()) {
XLOGF_IF(INFO,
retryCount > 0,
"All {}/{} pending ops {} processed after #{} retries, elapsed time: {}, user: {}/{}, usercall: #{}",
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
retryCount,
backoffRetry.getElapsedTime(),
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
goto exit;
}
num_retried_ops.addSample(remainingOps.size(), requestCtx.requestTagSet);
num_retried_ops_per_user.addSample(remainingOps.size(), requestCtx.userTagSet);
pendingOps.swap(remainingOps);
XLOGF(INFO,
"Waiting {} before #{} retrying {}/{} remaining ops {}, user: {}/{}, usercall: #{}",
waitTime,
++retryCount,
pendingOps.size(),
ops.size(),
fmt::ptr(&ops),
requestCtx.userInfo.uid,
requestCtx.userInfo.gid,
requestCtx.userCallId);
if (waitTime.count() > 0) co_await folly::coro::sleep(waitTime);
}
exit:
co_return Void{};
}