bool CarbonRouterClient::sendMultiImpl()

in mcrouter/CarbonRouterClient-inl.h [110:193]


bool CarbonRouterClient<RouterInfo>::sendMultiImpl(
    size_t nreqs,
    F&& makeNextPreq,
    G&& failRemaining) {
  auto router = router_.lock();
  if (UNLIKELY(!router)) {
    return false;
  }

  auto notify = [this]() {
    assert(mode_ != ThreadMode::SameThread);
    if (mode_ == ThreadMode::FixedRemoteThread) {
      proxies_[proxyIdx_]->messageQueue_->notifyRelaxed();
    } else {
      assert(mode_ == ThreadMode::AffinitizedRemoteThread);
      for (size_t i = 0; i < proxies_.size(); ++i) {
        if (proxiesToNotify_[i]) {
          proxies_[i]->messageQueue_->notifyRelaxed();
          proxiesToNotify_[i] = false;
        }
      }
    }
  };

  if (maxOutstanding() == 0) {
    if (mode_ == ThreadMode::SameThread) {
      for (size_t i = 0; i < nreqs; ++i) {
        sendSameThread(makeNextPreq(/* inBatch */ false));
      }
    } else {
      bool delayNotification = shouldDelayNotification(nreqs);
      for (size_t i = 0; i < nreqs; ++i) {
        sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
      }
      if (delayNotification) {
        notify();
      }
    }
  } else if (maxOutstandingError()) {
    for (size_t begin = 0; begin < nreqs;) {
      auto end = begin +
          counting_sem_lazy_nonblocking(outstandingReqsSem(), nreqs - begin);
      if (begin == end) {
        failRemaining();
        break;
      }

      if (mode_ == ThreadMode::SameThread) {
        for (size_t i = begin; i < end; i++) {
          sendSameThread(makeNextPreq(/*  inBatch */ false));
        }
      } else {
        bool delayNotification = shouldDelayNotification(end - begin);
        for (size_t i = begin; i < end; i++) {
          sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
        }
        if (delayNotification) {
          notify();
        }
      }

      begin = end;
    }
  } else {
    assert(mode_ != ThreadMode::SameThread);

    size_t i = 0;
    size_t n = 0;

    while (i < nreqs) {
      n += counting_sem_lazy_wait(outstandingReqsSem(), nreqs - n);
      bool delayNotification = shouldDelayNotification(n);
      for (size_t j = i; j < n; ++j) {
        sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
      }
      if (delayNotification) {
        notify();
      }
      i = n;
    }
  }

  return true;
}