ReplyT doRoute()

in mcrouter/routes/FailoverRoute.h [237:355]


  ReplyT<Request> doRoute(const Request& req, size_t& childIndex) {
    auto& proxy = fiber_local<RouterInfo>::getSharedCtx()->proxy();

    bool conditionalFailover = false;
    bool allFailed = false;
    SCOPE_EXIT {
      if (conditionalFailover) {
        proxy.stats().increment(failover_conditional_stat);
        proxy.stats().increment(failover_conditional_count_stat);
      }
      if (allFailed) {
        proxy.stats().increment(failover_all_failed_stat);
        proxy.stats().increment(failover_all_failed_count_stat);
        proxy.stats().increment(failoverPolicy_.getFailoverFailedStat());
      }
    };

    auto policyCtx = failoverPolicy_.context(req);
    auto iter = failoverPolicy_.begin(req);
    auto normalReply = iter->route(req, policyCtx);
    if (rateLimiter_) {
      rateLimiter_->bumpTotalReqs();
    }
    if (fiber_local<RouterInfo>::getSharedCtx()->failoverDisabled()) {
      return normalReply;
    }
    if (failoverTagging_) {
      setFailoverHopCount(normalReply, getFailoverHopCount(req));
    }
    if (LIKELY(processReply(
            normalReply, req, conditionalFailover, iter, *policyCtx))) {
      return normalReply;
    }
    if (++iter == failoverPolicy_.end(req)) {
      if (isErrorResult(*normalReply.result_ref())) {
        allFailed = true;
      }
      return normalReply;
    }
    proxy.stats().increment(failover_all_stat);
    proxy.stats().increment(failoverPolicy_.getFailoverStat());

    childIndex = 0;
    // Failover
    return fiber_local<RouterInfo>::runWithLocals([this,
                                                   iter,
                                                   &req,
                                                   &proxy,
                                                   &normalReply,
                                                   &policyCtx,
                                                   &childIndex,
                                                   &allFailed,
                                                   &conditionalFailover]() {
      fiber_local<RouterInfo>::addRequestClass(RequestClass::kFailover);
      auto doFailover = [this, &req, &proxy, &normalReply, &policyCtx](
                            auto& child) {
        uint32_t cnt =
            failoverTagging_ ? fiber_local<RouterInfo>::incFailoverCount() : 0;
        auto failoverReply = child->route(req, policyCtx);
        if (failoverTagging_) {
          setFailoverHopCount(failoverReply, getFailoverHopCount(req) + cnt);
        }
        FailoverContext failoverContext(
            child.getTrueIndex(),
            targets_.size() - 1,
            req,
            failoverPolicy_.getFailureDomainsEnabled(),
            normalReply,
            failoverReply);
        logFailover(proxy, failoverContext);
        carbon::setIsFailoverIfPresent(failoverReply, true);
        return failoverReply;
      };

      auto cur = iter;
      // set the index of the child that generated the reply.
      SCOPE_EXIT {
        childIndex = cur.getTrueIndex();
      };
      auto nx = cur;
      // Passive iterator does not do routing
      nx.setPassive();

      auto incFailureDomainStat =
          [&proxy](ReplyT<Request>& nReply, ReplyT<Request>& fReply) {
            if (nReply.destination() && fReply.destination() &&
                (nReply.destination()->getFailureDomain() ==
                 fReply.destination()->getFailureDomain())) {
              proxy.stats().increment(failover_same_failure_domain_stat, 1);
            }
          };

      ReplyT<Request> failoverReply;
      for (++nx; nx != failoverPolicy_.end(req) &&
           policyCtx->numTries_ < failoverPolicy_.maxErrorTries();
           ++cur, ++nx) {
        failoverReply = doFailover(cur);
        incFailureDomainStat(normalReply, failoverReply);
        if (LIKELY(processReply(
                failoverReply, req, conditionalFailover, cur, *policyCtx))) {
          return failoverReply;
        }
      }
      if (policyCtx->numTries_ < failoverPolicy_.maxErrorTries()) {
        failoverReply = doFailover(cur);
        incFailureDomainStat(normalReply, failoverReply);
        if (isErrorResult(*failoverReply.result_ref())) {
          allFailed = true;
        }
      }
      proxy.stats().increment(
          failover_num_collisions_stat, cur.getStats().num_collisions);
      proxy.stats().increment(
          failover_num_failed_domain_collisions_stat,
          cur.getStats().num_failed_domain_collisions);

      return failoverReply;
    });
  }