Status LogCache::AppendOperations()

in src/kudu/consensus/log_cache.cc [362:544]


Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
                                  const StatusCallback& callback) {
  CHECK_GT(msgs.size(), 0);

  // SpaceUsed is relatively expensive, so do calculations outside the lock
  // and cache the result with each message.
  int64_t mem_required = 0;
  int64_t total_msg_size = 0;
  int64_t compressed_size = 0;
  int64_t uncompressed_size = 0;
  vector<CacheEntry> entries_to_insert;
  entries_to_insert.reserve(msgs.size());

  bool found_compressed_msgs = false;
  for (const auto& msg : msgs) {
    CacheEntry e;
    e.mem_usage = 0;
    e.msg_size = msg->get()->SpaceUsedLong();

    const auto op_type = msg->get()->op_type();
    bool is_compressed =
      (msg->get()->write_payload().compression_codec() != NO_COMPRESSION);
    uncompressed_size += msg->get()->write_payload().payload().size();

    if (is_compressed) {
      // This batch has a compressed message. Store this fact to be used later
      // when appending this batch to the log
      found_compressed_msgs = true;
    }

    // Try to compress the message if:
    // (1) this msg is not already compressed (identified by 'is_compressed').
    // This can happen when this AppendOperations() is called on a secondary and
    // the primary has already compressed this msg
    // (2) Compression is configured for this instance (codec_ should not be
    // nullptr)
    // (3) This is a 'WRITE_OP_EXT' type (config-change/no-op/rotates are not
    // compressed today - these messages are small and can be left uncompressed)
    const CompressionCodec* codec = codec_.load();
    if (!is_compressed && codec && op_type == WRITE_OP_EXT) {
      std::unique_ptr<ReplicateMsg> compressed_msg;
      auto status =
        CompressMsg(msg->get(), log_cache_compression_buf_, &compressed_msg);
      if (status.ok()) {
        // Successfully compressed this msg. So, use the compressed msg to cache
        e.mem_usage = static_cast<int64_t>(compressed_msg->SpaceUsedLong());
        e.msg = make_scoped_refptr_replicate(compressed_msg.release());
      } else {
        KLOG_EVERY_N_SECS(WARNING, 300) << "Failed to compress replicate msg";
      }
    }

    if (e.mem_usage == 0) {
      // Need to store the original msg. One of the three checks mentioned above
      // was not true
      e.mem_usage = e.msg_size;
      e.msg = msg;
    }

    compressed_size += e.msg->get()->write_payload().payload().size();

    // Update the crc32 checksum for the payload
    uint32_t payload_crc32 = crc::Crc32c(
        e.msg->get()->write_payload().payload().c_str(),
        e.msg->get()->write_payload().payload().size());
    e.msg->get()->mutable_write_payload()->set_crc32(payload_crc32);

    total_msg_size += e.msg_size;
    mem_required += e.mem_usage;
    entries_to_insert.emplace_back(std::move(e));
  }

  int64_t first_idx_in_batch = msgs.front()->get()->id().index();
  int64_t last_idx_in_batch = msgs.back()->get()->id().index();

  std::unique_lock<Mutex> l(lock_);
  // If we're not appending a consecutive op we're likely overwriting and
  // need to replace operations in the cache.
  if (first_idx_in_batch != next_sequential_op_index_) {
    TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
  }

  // Try to consume the memory. If it can't be consumed, we may need to evict.
  bool borrowed_memory = false;
  if (!tracker_->TryConsume(mem_required)) {
    int spare = tracker_->SpareCapacity();
    int need_to_free = mem_required - spare;
    VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append "
                        << HumanReadableNumBytes::ToString(mem_required)
                        << " to log cache (available="
                        << HumanReadableNumBytes::ToString(spare)
                        << "): attempting to evict some operations...";

    // TODO: we should also try to evict from other tablets - probably better to
    // evict really old ops from another tablet than evict recent ops from this one.
    EvictSomeUnlocked(min_pinned_op_index_, need_to_free);

    // Force consuming, so that we don't refuse appending data. We might
    // blow past our limit a little bit (as much as the number of tablets times
    // the amount of in-flight data in the log), but until implementing the above TODO,
    // it's difficult to solve this issue.
    tracker_->Consume(mem_required);

    borrowed_memory = parent_tracker_->LimitExceeded();
  }

  for (auto& e : entries_to_insert) {
    auto index = e.msg->get()->id().index();
    EmplaceOrDie(&cache_, index, std::move(e));
    next_sequential_op_index_ = index + 1;
  }

  // We drop the lock during the AsyncAppendReplicates call, since it may block
  // if the queue is full, and the queue might not drain if it's trying to call
  // our callback and blocked on this lock.
  l.unlock();

  metrics_.log_cache_size->IncrementBy(mem_required);
  metrics_.log_cache_msg_size->IncrementBy(total_msg_size);
  metrics_.log_cache_num_ops->IncrementBy(msgs.size());
  metrics_.log_cache_payload_size->IncrementBy(uncompressed_size);
  metrics_.log_cache_compressed_payload_size->IncrementBy(compressed_size);

  VLOG(2) << "Compressed size: " << compressed_size <<
             ", Uncompressed size: " << uncompressed_size <<
             ", Total msg size: " << total_msg_size <<
             ", Msg Size: " << mem_required;

  Status log_status;
  if (!found_compressed_msgs) {
    // No compressed messages are found, so appen orinals msgs to log
    log_status = log_->AsyncAppendReplicates(
      msgs, Bind(&LogCache::LogCallback,
                 Unretained(this),
                 last_idx_in_batch,
                 borrowed_memory,
                 callback));
  } else {
    // The batch contains some compressed msgs. It needs to be uncompressed
    // before appending to the log
    vector<ReplicateRefPtr> uncompressed_msgs;
    uncompressed_msgs.reserve(msgs.size());

    for (const auto& msg : msgs) {
      const auto compression_codec =
        msg->get()->write_payload().compression_codec();

      if (compression_codec == NO_COMPRESSION) {
        // msg is not compressed, use it to write to the log
        uncompressed_msgs.push_back(msg);
      } else {
        // msg needs to be uncompressed before writing to the log
        std::unique_ptr<ReplicateMsg> uncompressed_msg;
        auto status = UncompressMsg(
            msg, log_cache_compression_buf_, &uncompressed_msg);

        // Crash if uncompression failed
        CHECK_OK_PREPEND(status,
            Substitute("Uncompess failed when writing to log"));
        uncompressed_msgs.push_back(std::move(
              make_scoped_refptr_replicate(uncompressed_msg.release())));
      }
    }

    log_status = log_->AsyncAppendReplicates(
      uncompressed_msgs, Bind(&LogCache::LogCallback,
                              Unretained(this),
                              last_idx_in_batch,
                              borrowed_memory,
                              callback));
  }

  if (!log_status.ok()) {
    LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Couldn't append to log: " << log_status.ToString();
    tracker_->Release(mem_required);
    return log_status;
  }

  // Now signal any threads that might be waiting for Ops to be appended to the
  // log
  next_index_cond_.Broadcast();
  return Status::OK();
}