in mcrouter/lib/network/McClientRequestContext-inl.h [134:200]
void McClientRequestContextQueue::reply(
uint64_t id,
Reply&& r,
RpcStatsContext rpcStatsContext) {
// Get the context and erase it from the queue and map.
McClientRequestContextBase* ctx{nullptr};
if (outOfOrder_) {
auto iter = getContextById(id);
if (iter != set_.end()) {
ctx = &(*iter);
if (iter->state() == State::PENDING_REPLY_QUEUE) {
pendingReplyQueue_.erase(pendingReplyQueue_.iterator_to(*iter));
set_.erase(iter);
} else if (iter->state() == State::WRITE_QUEUE) {
// We didn't get write callback yet, so need to properly handle that.
set_.erase(iter);
} else {
LOG_FAILURE(
"AsyncMcClient",
failure::Category::kOther,
"Received reply for a request in an unexpected state {}!",
static_cast<uint64_t>(iter->state()));
return;
}
auto oldState = ctx->state();
ctx->reply(std::move(r));
ctx->setRpcStatsContext(rpcStatsContext);
ctx->setState(State::COMPLETE);
if (oldState == State::PENDING_REPLY_QUEUE) {
ctx->baton_.post();
}
}
} else {
// First we're going to receive replies for timed out requests.
if (!timedOutInitializers_.empty()) {
timedOutInitializers_.pop();
} else if (!pendingReplyQueue_.empty()) {
ctx = &pendingReplyQueue_.front();
pendingReplyQueue_.pop_front();
} else if (!writeQueue_.empty()) {
ctx = &writeQueue_.front();
writeQueue_.pop_front();
} else {
// With old mc_parser it's possible to receive unexpected replies, we need
// to ignore them. But we need to log this.
LOG_FAILURE(
"AsyncMcClient",
failure::Category::kOther,
"Received unexpected reply from server!");
}
if (ctx) {
ctx->reply(std::move(r));
ctx->setRpcStatsContext(rpcStatsContext);
if (ctx->state() == State::PENDING_REPLY_QUEUE) {
ctx->setState(State::COMPLETE);
ctx->baton_.post();
} else {
// Move the request to the replied queue.
ctx->setState(State::REPLIED_QUEUE);
repliedQueue_.push_back(*ctx);
}
}
}
}