in src/kudu/consensus/log_cache.cc [362:544]
Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback) {
CHECK_GT(msgs.size(), 0);
// SpaceUsed is relatively expensive, so do calculations outside the lock
// and cache the result with each message.
int64_t mem_required = 0;
int64_t total_msg_size = 0;
int64_t compressed_size = 0;
int64_t uncompressed_size = 0;
vector<CacheEntry> entries_to_insert;
entries_to_insert.reserve(msgs.size());
bool found_compressed_msgs = false;
for (const auto& msg : msgs) {
CacheEntry e;
e.mem_usage = 0;
e.msg_size = msg->get()->SpaceUsedLong();
const auto op_type = msg->get()->op_type();
bool is_compressed =
(msg->get()->write_payload().compression_codec() != NO_COMPRESSION);
uncompressed_size += msg->get()->write_payload().payload().size();
if (is_compressed) {
// This batch has a compressed message. Store this fact to be used later
// when appending this batch to the log
found_compressed_msgs = true;
}
// Try to compress the message if:
// (1) this msg is not already compressed (identified by 'is_compressed').
// This can happen when this AppendOperations() is called on a secondary and
// the primary has already compressed this msg
// (2) Compression is configured for this instance (codec_ should not be
// nullptr)
// (3) This is a 'WRITE_OP_EXT' type (config-change/no-op/rotates are not
// compressed today - these messages are small and can be left uncompressed)
const CompressionCodec* codec = codec_.load();
if (!is_compressed && codec && op_type == WRITE_OP_EXT) {
std::unique_ptr<ReplicateMsg> compressed_msg;
auto status =
CompressMsg(msg->get(), log_cache_compression_buf_, &compressed_msg);
if (status.ok()) {
// Successfully compressed this msg. So, use the compressed msg to cache
e.mem_usage = static_cast<int64_t>(compressed_msg->SpaceUsedLong());
e.msg = make_scoped_refptr_replicate(compressed_msg.release());
} else {
KLOG_EVERY_N_SECS(WARNING, 300) << "Failed to compress replicate msg";
}
}
if (e.mem_usage == 0) {
// Need to store the original msg. One of the three checks mentioned above
// was not true
e.mem_usage = e.msg_size;
e.msg = msg;
}
compressed_size += e.msg->get()->write_payload().payload().size();
// Update the crc32 checksum for the payload
uint32_t payload_crc32 = crc::Crc32c(
e.msg->get()->write_payload().payload().c_str(),
e.msg->get()->write_payload().payload().size());
e.msg->get()->mutable_write_payload()->set_crc32(payload_crc32);
total_msg_size += e.msg_size;
mem_required += e.mem_usage;
entries_to_insert.emplace_back(std::move(e));
}
int64_t first_idx_in_batch = msgs.front()->get()->id().index();
int64_t last_idx_in_batch = msgs.back()->get()->id().index();
std::unique_lock<Mutex> l(lock_);
// If we're not appending a consecutive op we're likely overwriting and
// need to replace operations in the cache.
if (first_idx_in_batch != next_sequential_op_index_) {
TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
}
// Try to consume the memory. If it can't be consumed, we may need to evict.
bool borrowed_memory = false;
if (!tracker_->TryConsume(mem_required)) {
int spare = tracker_->SpareCapacity();
int need_to_free = mem_required - spare;
VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append "
<< HumanReadableNumBytes::ToString(mem_required)
<< " to log cache (available="
<< HumanReadableNumBytes::ToString(spare)
<< "): attempting to evict some operations...";
// TODO: we should also try to evict from other tablets - probably better to
// evict really old ops from another tablet than evict recent ops from this one.
EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
// Force consuming, so that we don't refuse appending data. We might
// blow past our limit a little bit (as much as the number of tablets times
// the amount of in-flight data in the log), but until implementing the above TODO,
// it's difficult to solve this issue.
tracker_->Consume(mem_required);
borrowed_memory = parent_tracker_->LimitExceeded();
}
for (auto& e : entries_to_insert) {
auto index = e.msg->get()->id().index();
EmplaceOrDie(&cache_, index, std::move(e));
next_sequential_op_index_ = index + 1;
}
// We drop the lock during the AsyncAppendReplicates call, since it may block
// if the queue is full, and the queue might not drain if it's trying to call
// our callback and blocked on this lock.
l.unlock();
metrics_.log_cache_size->IncrementBy(mem_required);
metrics_.log_cache_msg_size->IncrementBy(total_msg_size);
metrics_.log_cache_num_ops->IncrementBy(msgs.size());
metrics_.log_cache_payload_size->IncrementBy(uncompressed_size);
metrics_.log_cache_compressed_payload_size->IncrementBy(compressed_size);
VLOG(2) << "Compressed size: " << compressed_size <<
", Uncompressed size: " << uncompressed_size <<
", Total msg size: " << total_msg_size <<
", Msg Size: " << mem_required;
Status log_status;
if (!found_compressed_msgs) {
// No compressed messages are found, so appen orinals msgs to log
log_status = log_->AsyncAppendReplicates(
msgs, Bind(&LogCache::LogCallback,
Unretained(this),
last_idx_in_batch,
borrowed_memory,
callback));
} else {
// The batch contains some compressed msgs. It needs to be uncompressed
// before appending to the log
vector<ReplicateRefPtr> uncompressed_msgs;
uncompressed_msgs.reserve(msgs.size());
for (const auto& msg : msgs) {
const auto compression_codec =
msg->get()->write_payload().compression_codec();
if (compression_codec == NO_COMPRESSION) {
// msg is not compressed, use it to write to the log
uncompressed_msgs.push_back(msg);
} else {
// msg needs to be uncompressed before writing to the log
std::unique_ptr<ReplicateMsg> uncompressed_msg;
auto status = UncompressMsg(
msg, log_cache_compression_buf_, &uncompressed_msg);
// Crash if uncompression failed
CHECK_OK_PREPEND(status,
Substitute("Uncompess failed when writing to log"));
uncompressed_msgs.push_back(std::move(
make_scoped_refptr_replicate(uncompressed_msg.release())));
}
}
log_status = log_->AsyncAppendReplicates(
uncompressed_msgs, Bind(&LogCache::LogCallback,
Unretained(this),
last_idx_in_batch,
borrowed_memory,
callback));
}
if (!log_status.ok()) {
LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Couldn't append to log: " << log_status.ToString();
tracker_->Release(mem_required);
return log_status;
}
// Now signal any threads that might be waiting for Ops to be appended to the
// log
next_index_cond_.Broadcast();
return Status::OK();
}