McLeaseGetReply BigValueRoute::doLeaseGetRoute()

in mcrouter/routes/BigValueRoute.cpp [53:148]


McLeaseGetReply BigValueRoute::doLeaseGetRoute(
    const McLeaseGetRequest& req,
    size_t retriesLeft) const {
  auto initialReply = ch_->route(req);
  bool isBigValue = ((*initialReply.flags_ref() & MC_MSG_FLAG_BIG_VALUE) != 0);
  if (!isBigValue) {
    return initialReply;
  }

  if (!isHitResult(*initialReply.result_ref())) {
    // if bigValue item, create a new reply with result, lease-token and return
    // so that we don't send any meta data that may be present in initialReply
    McLeaseGetReply reply(*initialReply.result_ref());
    reply.leaseToken_ref() = *initialReply.leaseToken_ref();
    return reply;
  }

  ChunksInfo chunksInfo(coalesceAndGetRange(initialReply.value_ref()));
  if (!chunksInfo.valid()) {
    // We cannot return carbon::Result::NOTFOUND without a valid lease token. We
    // err on the side of allowing clients to make progress by returning a lease
    // token of -1.
    McLeaseGetReply missReply(carbon::Result::NOTFOUND);
    missReply.leaseToken_ref() = static_cast<uint64_t>(-1);
    return missReply;
  }

  // Send a gets request for the metadata while sending ordinary get requests
  // to fetch the subpieces. We may need to use the returned CAS token to
  // invalidate the metadata piece later on.
  const auto key = req.key_ref()->fullKey();
  McGetsRequest getsMetadataReq(key);
  const auto reqs = chunkGetRequests(req, chunksInfo);
  std::vector<std::function<McGetReply()>> fs;
  fs.reserve(reqs.size());

  auto& target = *ch_;
  for (const auto& chunkReq : reqs) {
    fs.push_back([&target, &chunkReq]() { return target.route(chunkReq); });
  }

  McGetsReply getsMetadataReply;
  std::vector<McGetReply> replies;
  std::vector<std::function<void()>> tasks;
  tasks.emplace_back([&getsMetadataReq, &getsMetadataReply, &target]() {
    getsMetadataReply = target.route(getsMetadataReq);
  });
  tasks.emplace_back([this, &replies, fs = std::move(fs)]() mutable {
    replies = collectAllByBatches(fs.begin(), fs.end());
  });

  folly::fibers::collectAll(tasks.begin(), tasks.end());
  const auto reducedReply = mergeChunkGetReplies(
      replies.begin(), replies.end(), std::move(initialReply));

  // Return reducedReply on hit or error
  if (!isMissResult(*reducedReply.result_ref())) {
    return reducedReply;
  }

  if (isErrorResult(*getsMetadataReply.result_ref())) {
    if (retriesLeft > 0) {
      return doLeaseGetRoute(req, --retriesLeft);
    }
    McLeaseGetReply errorReply(*getsMetadataReply.result_ref());
    errorReply.message_ref() = std::move(*getsMetadataReply.message_ref());
    return errorReply;
  }

  // This is the tricky part with leases. There was a hit on the metadata,
  // but a miss/error on one of the subpieces. One of the clients needs to
  // invalidate the metadata. Then one (possibly the same) client will be able
  // to get a valid lease token.
  // TODO: Consider also firing off async deletes for the subpieces for better
  // cache use.
  if (isHitResult(*getsMetadataReply.result_ref())) {
    McCasRequest invalidateReq(key);
    invalidateReq.exptime_ref() = -1;
    invalidateReq.casToken_ref() = *getsMetadataReply.casToken_ref();
    auto invalidateReply = ch_->route(invalidateReq);
    if (isErrorResult(*invalidateReply.result_ref())) {
      McLeaseGetReply errorReply(*invalidateReply.result_ref());
      errorReply.message_ref() = std::move(*invalidateReply.message_ref());
      return errorReply;
    }
  }

  if (retriesLeft > 0) {
    return doLeaseGetRoute(req, --retriesLeft);
  }

  McLeaseGetReply reply(carbon::Result::REMOTE_ERROR);
  reply.message_ref() = folly::sformat(
      "BigValueRoute: exhausted retries for lease-get for key {}", key);
  return reply;
}