in mcrouter/CarbonRouterClient-inl.h [110:193]
bool CarbonRouterClient<RouterInfo>::sendMultiImpl(
size_t nreqs,
F&& makeNextPreq,
G&& failRemaining) {
auto router = router_.lock();
if (UNLIKELY(!router)) {
return false;
}
auto notify = [this]() {
assert(mode_ != ThreadMode::SameThread);
if (mode_ == ThreadMode::FixedRemoteThread) {
proxies_[proxyIdx_]->messageQueue_->notifyRelaxed();
} else {
assert(mode_ == ThreadMode::AffinitizedRemoteThread);
for (size_t i = 0; i < proxies_.size(); ++i) {
if (proxiesToNotify_[i]) {
proxies_[i]->messageQueue_->notifyRelaxed();
proxiesToNotify_[i] = false;
}
}
}
};
if (maxOutstanding() == 0) {
if (mode_ == ThreadMode::SameThread) {
for (size_t i = 0; i < nreqs; ++i) {
sendSameThread(makeNextPreq(/* inBatch */ false));
}
} else {
bool delayNotification = shouldDelayNotification(nreqs);
for (size_t i = 0; i < nreqs; ++i) {
sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
}
if (delayNotification) {
notify();
}
}
} else if (maxOutstandingError()) {
for (size_t begin = 0; begin < nreqs;) {
auto end = begin +
counting_sem_lazy_nonblocking(outstandingReqsSem(), nreqs - begin);
if (begin == end) {
failRemaining();
break;
}
if (mode_ == ThreadMode::SameThread) {
for (size_t i = begin; i < end; i++) {
sendSameThread(makeNextPreq(/* inBatch */ false));
}
} else {
bool delayNotification = shouldDelayNotification(end - begin);
for (size_t i = begin; i < end; i++) {
sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
}
if (delayNotification) {
notify();
}
}
begin = end;
}
} else {
assert(mode_ != ThreadMode::SameThread);
size_t i = 0;
size_t n = 0;
while (i < nreqs) {
n += counting_sem_lazy_wait(outstandingReqsSem(), nreqs - n);
bool delayNotification = shouldDelayNotification(n);
for (size_t j = i; j < n; ++j) {
sendRemoteThread(makeNextPreq(delayNotification), delayNotification);
}
if (delayNotification) {
notify();
}
i = n;
}
}
return true;
}