Status LogCache::ReadOps()

in src/kudu/consensus/log_cache.cc [647:771]


Status LogCache::ReadOps(int64_t after_op_index,
                         int max_size_bytes,
                         const ReadContext& context,
                         std::vector<ReplicateRefPtr>* messages,
                         OpId* preceding_op) {
  DCHECK_GE(after_op_index, 0);

  // Try to lookup the first OpId in index
  auto lookUpStatus = LookupOpId(after_op_index, preceding_op);
  if (!lookUpStatus.ok()) {
    // On error return early
    if (lookUpStatus.IsNotFound()) {
      // If it is a NotFound() error, then do a dummy call into
      // ReadReplicatesInRange() to read a single op. This is so that it gets a
      // chance to update the error manager and report the error to upper layer
      vector<ReplicateMsg*> raw_replicate_ptrs;
      log_->ReadReplicatesInRange(
          after_op_index, after_op_index + 1, max_size_bytes, context,
          &raw_replicate_ptrs);
      for (ReplicateMsg* msg : raw_replicate_ptrs) {
        delete msg;
      }
    }

    return lookUpStatus;
  }

  std::unique_lock<Mutex> l(lock_);
  int64_t next_index = after_op_index + 1;

  // Return as many operations as we can, up to the limit
  int64_t remaining_space = max_size_bytes;
  while (remaining_space > 0 && next_index < next_sequential_op_index_) {

    // If the messages the peer needs haven't been loaded into the queue yet,
    // load them.
    MessageCache::const_iterator iter = cache_.lower_bound(next_index);
    if (iter == cache_.end() || iter->first != next_index) {
      int64_t up_to;
      if (iter == cache_.end()) {
        // Read all the way to the current op
        up_to = next_sequential_op_index_ - 1;
      } else {
        // Read up to the next entry that's in the cache
        up_to = iter->first - 1;
      }

      l.unlock();

      vector<ReplicateMsg*> raw_replicate_ptrs;
      RETURN_NOT_OK_PREPEND(
        log_->ReadReplicatesInRange(
          next_index, up_to, remaining_space, context, &raw_replicate_ptrs),
        Substitute("Failed to read ops $0..$1", next_index, up_to));

      VLOG_WITH_PREFIX_UNLOCKED(2)
          << "Successfully read " << raw_replicate_ptrs.size() << " ops "
          << "from disk (" << next_index << ".."
          << (next_index + raw_replicate_ptrs.size() - 1) << ")";

      if (enable_compression_on_cache_miss_ && !context.route_via_proxy) {
        // Compress messages read from the log if:
        // (1) the feature is enabled through
        // enable_compression_on_cache_miss_ flag
        // (2) the request is not for a proxy host (the payload is discarded for
        // a proxy request and it is wasteful to compress it here)
        vector<ReplicateMsg*> compressed_replicate_ptrs;
        (void) CompressMsgs(raw_replicate_ptrs, &compressed_replicate_ptrs);

        // TODO (vinay): Refactor this to not have to copy pointers again into
        // original vector
        raw_replicate_ptrs.clear();
        raw_replicate_ptrs.reserve(compressed_replicate_ptrs.size());
        raw_replicate_ptrs.insert(
            raw_replicate_ptrs.end(),
            compressed_replicate_ptrs.begin(),
            compressed_replicate_ptrs.end());
      }

      if (!context.route_via_proxy) {
        // Compute crc checksums for the payload that was read from the log
        // Note that this is done _only_ for non-proxy requests because payload
        // is discarded for proxy requests
        for (ReplicateMsg* msg : raw_replicate_ptrs) {
          const std::string& payload = msg->write_payload().payload();
          uint32_t payload_crc32 = crc::Crc32c(payload.c_str(), payload.size());
          msg->mutable_write_payload()->set_crc32(payload_crc32);
        }
      }

      l.lock();

      for (ReplicateMsg* msg : raw_replicate_ptrs) {
        CHECK_EQ(next_index, msg->id().index());

        remaining_space -= TotalByteSizeForMessage(*msg);
        if (remaining_space > 0 || messages->empty()) {
          messages->push_back(make_scoped_refptr_replicate(msg));
          next_index++;
        } else {
          delete msg;
        }
      }

    } else {
      // Pull contiguous messages from the cache until the size limit is achieved.
      for (; iter != cache_.end(); ++iter) {
        const ReplicateRefPtr& msg = iter->second.msg;
        int64_t index = msg->get()->id().index();
        if (index != next_index) {
          continue;
        }

        remaining_space -= TotalByteSizeForMessage(*msg->get());
        if (remaining_space < 0 && !messages->empty()) {
          break;
        }

        messages->push_back(msg);
        next_index++;
      }
    }
  }
  return Status::OK();
}