in src/client/meta/MetaClient.cc [340:469]
auto MetaClient::retry(Func &&func, Req &&req, RetryConfig retryConfig, std::function<void(const Status &)> onError)
-> std::invoke_result_t<Func, Stub::IStub, Req &&, const net::UserRequestOptions &, serde::Timestamp *> {
req.client = clientId_;
CHECK_REQUEST(req);
// auto reqInfo = RequestInfo::get();
auto opName = MetaSerde<>::getRpcName(req);
ExponentialBackoffRetry backoff(retryConfig.retry_init_wait().asMs(),
retryConfig.retry_max_wait().asMs(),
retryConfig.retry_total_time().asMs());
auto options = net::UserRequestOptions();
options.timeout = retryConfig.rpc_timeout();
OperationRecorder::Guard record(OperationRecorder::client(), opName, req.user.uid);
std::optional<ServerNode> server;
auto getServer = [&]() -> CoTryTask<void> {
if (server.has_value()) {
co_return Void{};
}
auto result = co_await getServerNode();
if (!result.hasError()) {
server.emplace(std::move(*result));
co_return Void{};
}
auto waitTime = backoff.getWaitTime();
XLOGF(ERR,
"Op {} get server failed, req {}, error {}, retried {}, wait {}",
opName,
req,
result.error(),
record.retry(),
waitTime);
if (waitTime.count() == 0) {
CO_RETURN_ERROR(result);
}
co_await folly::coro::sleep(waitTime);
co_return Void{};
};
while (true) {
CO_RETURN_ON_ERROR(co_await getServer());
if (!server.has_value()) {
continue;
}
Status error(MetaCode::kFoundBug);
{
SemaphoreGuard concurrentReq(concurrentReqSemaphore_);
co_await concurrentReq.coWait();
// if (reqInfo && reqInfo->canceled()) {
// XLOGF(WARN, "Op {}{} canceled by client {}", opName, req, reqInfo->describe());
// record.finish(Status(MetaCode::kRequestCanceled));
// co_return makeError(MetaCode::kRequestCanceled, "req canceled");
// }
serde::Timestamp timestamp;
auto result = co_await (server->stub.get()->*func)(std::forward<Req>(req), options, ×tamp);
RECORD_TIMESATAMP(opName, result, timestamp);
if (ErrorHandling::success(result)) {
// success
record.finish(result);
errNodes_.wlock()->erase(server->node.nodeId);
XLOGF_IF(INFO, result.hasError(), "Op {}{}, result {}", opName, reqDescribe(req), rspDescribe(result.error()));
XLOGF_IF(DBG, !result.hasError(), "Op {}{}, result {}", opName, req, *result);
if (result.hasValue()) {
CO_RETURN_ON_ERROR(co_await waitRoutingInfo(*result, retryConfig));
}
co_return result;
}
XLOGF_IF(FATAL, !result.hasError(), "Result is ok");
error = std::move(result.error());
}
if (onError) {
onError(error);
}
// retry
auto waitTime = backoff.getWaitTime();
switch (error.code()) {
// retry fast
case RPCCode::kRequestRefused:
reqRejected.addSample(1, {{"tag", fmt::format("{}", server->node.nodeId.toUnderType())}});
case RPCCode::kSendFailed:
case MetaCode::kBusy:
waitTime = std::min(retryConfig.retry_init_wait(), retryConfig.retry_fast());
break;
// retry default
default:
break;
}
auto retryable = ErrorHandling::retryable(error);
auto serverNode = server->node;
bool failover = false;
if (ErrorHandling::serverError(error)) {
serverError.addSample(1, {{"tag", fmt::format("{}", server->node.nodeId.toUnderType())}});
failover = (++server->failure) >= retryConfig.max_failures_before_failover();
}
XLOGF(ERR,
"Op {} failed on {}, req {}, error {}, retryable {}, failover {}, retried {}, elapsed {}",
opName,
server->node,
req,
error,
retryable,
failover,
record.retry(),
backoff.getElapsedTime());
if (failover) {
errNodes_.wlock()->insert(server->node.nodeId);
server = std::nullopt;
}
if (!retryable) {
// can't retry
record.finish(error);
co_return makeError(std::move(error));
}
if (waitTime.count() == 0 || backoff.getElapsedTime() > retryConfig.retry_total_time().asMs()) {
XLOGF(CRITICAL, "Op {}{} failed {}, retry timeout", opName, req, error);
record.finish(error);
co_return makeError(std::move(error));
}
co_await folly::coro::sleep(waitTime);
record.retry()++;
}
}