in mcrouter/routes/FailoverRoute.h [237:355]
ReplyT<Request> doRoute(const Request& req, size_t& childIndex) {
auto& proxy = fiber_local<RouterInfo>::getSharedCtx()->proxy();
bool conditionalFailover = false;
bool allFailed = false;
SCOPE_EXIT {
if (conditionalFailover) {
proxy.stats().increment(failover_conditional_stat);
proxy.stats().increment(failover_conditional_count_stat);
}
if (allFailed) {
proxy.stats().increment(failover_all_failed_stat);
proxy.stats().increment(failover_all_failed_count_stat);
proxy.stats().increment(failoverPolicy_.getFailoverFailedStat());
}
};
auto policyCtx = failoverPolicy_.context(req);
auto iter = failoverPolicy_.begin(req);
auto normalReply = iter->route(req, policyCtx);
if (rateLimiter_) {
rateLimiter_->bumpTotalReqs();
}
if (fiber_local<RouterInfo>::getSharedCtx()->failoverDisabled()) {
return normalReply;
}
if (failoverTagging_) {
setFailoverHopCount(normalReply, getFailoverHopCount(req));
}
if (LIKELY(processReply(
normalReply, req, conditionalFailover, iter, *policyCtx))) {
return normalReply;
}
if (++iter == failoverPolicy_.end(req)) {
if (isErrorResult(*normalReply.result_ref())) {
allFailed = true;
}
return normalReply;
}
proxy.stats().increment(failover_all_stat);
proxy.stats().increment(failoverPolicy_.getFailoverStat());
childIndex = 0;
// Failover
return fiber_local<RouterInfo>::runWithLocals([this,
iter,
&req,
&proxy,
&normalReply,
&policyCtx,
&childIndex,
&allFailed,
&conditionalFailover]() {
fiber_local<RouterInfo>::addRequestClass(RequestClass::kFailover);
auto doFailover = [this, &req, &proxy, &normalReply, &policyCtx](
auto& child) {
uint32_t cnt =
failoverTagging_ ? fiber_local<RouterInfo>::incFailoverCount() : 0;
auto failoverReply = child->route(req, policyCtx);
if (failoverTagging_) {
setFailoverHopCount(failoverReply, getFailoverHopCount(req) + cnt);
}
FailoverContext failoverContext(
child.getTrueIndex(),
targets_.size() - 1,
req,
failoverPolicy_.getFailureDomainsEnabled(),
normalReply,
failoverReply);
logFailover(proxy, failoverContext);
carbon::setIsFailoverIfPresent(failoverReply, true);
return failoverReply;
};
auto cur = iter;
// set the index of the child that generated the reply.
SCOPE_EXIT {
childIndex = cur.getTrueIndex();
};
auto nx = cur;
// Passive iterator does not do routing
nx.setPassive();
auto incFailureDomainStat =
[&proxy](ReplyT<Request>& nReply, ReplyT<Request>& fReply) {
if (nReply.destination() && fReply.destination() &&
(nReply.destination()->getFailureDomain() ==
fReply.destination()->getFailureDomain())) {
proxy.stats().increment(failover_same_failure_domain_stat, 1);
}
};
ReplyT<Request> failoverReply;
for (++nx; nx != failoverPolicy_.end(req) &&
policyCtx->numTries_ < failoverPolicy_.maxErrorTries();
++cur, ++nx) {
failoverReply = doFailover(cur);
incFailureDomainStat(normalReply, failoverReply);
if (LIKELY(processReply(
failoverReply, req, conditionalFailover, cur, *policyCtx))) {
return failoverReply;
}
}
if (policyCtx->numTries_ < failoverPolicy_.maxErrorTries()) {
failoverReply = doFailover(cur);
incFailureDomainStat(normalReply, failoverReply);
if (isErrorResult(*failoverReply.result_ref())) {
allFailed = true;
}
}
proxy.stats().increment(
failover_num_collisions_stat, cur.getStats().num_collisions);
proxy.stats().increment(
failover_num_failed_domain_collisions_stat,
cur.getStats().num_failed_domain_collisions);
return failoverReply;
});
}