void McClientRequestContextQueue::reply()

in mcrouter/lib/network/McClientRequestContext-inl.h [134:200]


void McClientRequestContextQueue::reply(
    uint64_t id,
    Reply&& r,
    RpcStatsContext rpcStatsContext) {
  // Get the context and erase it from the queue and map.
  McClientRequestContextBase* ctx{nullptr};
  if (outOfOrder_) {
    auto iter = getContextById(id);
    if (iter != set_.end()) {
      ctx = &(*iter);
      if (iter->state() == State::PENDING_REPLY_QUEUE) {
        pendingReplyQueue_.erase(pendingReplyQueue_.iterator_to(*iter));
        set_.erase(iter);
      } else if (iter->state() == State::WRITE_QUEUE) {
        // We didn't get write callback yet, so need to properly handle that.
        set_.erase(iter);
      } else {
        LOG_FAILURE(
            "AsyncMcClient",
            failure::Category::kOther,
            "Received reply for a request in an unexpected state {}!",
            static_cast<uint64_t>(iter->state()));
        return;
      }

      auto oldState = ctx->state();
      ctx->reply(std::move(r));
      ctx->setRpcStatsContext(rpcStatsContext);
      ctx->setState(State::COMPLETE);

      if (oldState == State::PENDING_REPLY_QUEUE) {
        ctx->baton_.post();
      }
    }
  } else {
    // First we're going to receive replies for timed out requests.
    if (!timedOutInitializers_.empty()) {
      timedOutInitializers_.pop();
    } else if (!pendingReplyQueue_.empty()) {
      ctx = &pendingReplyQueue_.front();
      pendingReplyQueue_.pop_front();
    } else if (!writeQueue_.empty()) {
      ctx = &writeQueue_.front();
      writeQueue_.pop_front();
    } else {
      // With old mc_parser it's possible to receive unexpected replies, we need
      // to ignore them. But we need to log this.
      LOG_FAILURE(
          "AsyncMcClient",
          failure::Category::kOther,
          "Received unexpected reply from server!");
    }

    if (ctx) {
      ctx->reply(std::move(r));
      ctx->setRpcStatsContext(rpcStatsContext);
      if (ctx->state() == State::PENDING_REPLY_QUEUE) {
        ctx->setState(State::COMPLETE);
        ctx->baton_.post();
      } else {
        // Move the request to the replied queue.
        ctx->setState(State::REPLIED_QUEUE);
        repliedQueue_.push_back(*ctx);
      }
    }
  }
}