in mcrouter/lib/carbon/connection/ExternalCarbonConnectionImpl-inl.h [257:326]
void sendRequestMulti(
std::vector<std::reference_wrapper<const Request>>&& reqs,
F&& f) {
auto threadInfo = threadInfo_.lock();
if (!threadInfo) {
throw CarbonConnectionRecreateException(
"Singleton<ThreadPool> was destroyed!");
}
auto cl = client_.lock();
assert(cl);
auto ctx =
std::make_shared<std::vector<std::reference_wrapper<const Request>>>(
std::move(reqs));
for (size_t i = 0; i < ctx->size();) {
auto num = cl->limitRequests(ctx->size() - i);
if (num == 0) {
// Hit outstanding limit.
for (; i < ctx->size(); ++i) {
f((*ctx)[i].get(),
facebook::memcache::ReplyT<Request>(carbon::Result::LOCAL_ERROR));
}
break;
}
threadInfo->addTaskRemote([clientWeak = client_,
ctx,
i,
num,
f]() mutable {
auto client = clientWeak.lock();
if (!client) {
folly::fibers::runInMainContext([&ctx, i, num, &f]() mutable {
for (size_t cnt = 0; cnt < num; ++cnt, ++i) {
f((*ctx)[i].get(),
facebook::memcache::ReplyT<Request>(carbon::Result::UNKNOWN));
}
});
return;
}
for (size_t cnt = 0; cnt + 1 < num; ++cnt, ++i) {
const Request& req = (*ctx)[i];
folly::fibers::addTaskFinally(
[clientWeak, &req] {
if (auto c = clientWeak.lock()) {
return c->sendRequest(req);
}
return facebook::memcache::ReplyT<Request>(
carbon::Result::UNKNOWN);
},
[f, &req](
folly::Try<facebook::memcache::ReplyT<Request>>&& r) mutable {
f(req, std::move(r.value()));
});
}
// Send last request in a batch on this fiber.
const auto& req = (*ctx)[i].get();
auto reply = client->sendRequest(req);
folly::fibers::runInMainContext(
[&req, &f, &reply]() mutable { f(req, std::move(reply)); });
});
i += num;
}
}