in src/kudu/consensus/log_cache.cc [647:771]
Status LogCache::ReadOps(int64_t after_op_index,
int max_size_bytes,
const ReadContext& context,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op) {
DCHECK_GE(after_op_index, 0);
// Try to lookup the first OpId in index
auto lookUpStatus = LookupOpId(after_op_index, preceding_op);
if (!lookUpStatus.ok()) {
// On error return early
if (lookUpStatus.IsNotFound()) {
// If it is a NotFound() error, then do a dummy call into
// ReadReplicatesInRange() to read a single op. This is so that it gets a
// chance to update the error manager and report the error to upper layer
vector<ReplicateMsg*> raw_replicate_ptrs;
log_->ReadReplicatesInRange(
after_op_index, after_op_index + 1, max_size_bytes, context,
&raw_replicate_ptrs);
for (ReplicateMsg* msg : raw_replicate_ptrs) {
delete msg;
}
}
return lookUpStatus;
}
std::unique_lock<Mutex> l(lock_);
int64_t next_index = after_op_index + 1;
// Return as many operations as we can, up to the limit
int64_t remaining_space = max_size_bytes;
while (remaining_space > 0 && next_index < next_sequential_op_index_) {
// If the messages the peer needs haven't been loaded into the queue yet,
// load them.
MessageCache::const_iterator iter = cache_.lower_bound(next_index);
if (iter == cache_.end() || iter->first != next_index) {
int64_t up_to;
if (iter == cache_.end()) {
// Read all the way to the current op
up_to = next_sequential_op_index_ - 1;
} else {
// Read up to the next entry that's in the cache
up_to = iter->first - 1;
}
l.unlock();
vector<ReplicateMsg*> raw_replicate_ptrs;
RETURN_NOT_OK_PREPEND(
log_->ReadReplicatesInRange(
next_index, up_to, remaining_space, context, &raw_replicate_ptrs),
Substitute("Failed to read ops $0..$1", next_index, up_to));
VLOG_WITH_PREFIX_UNLOCKED(2)
<< "Successfully read " << raw_replicate_ptrs.size() << " ops "
<< "from disk (" << next_index << ".."
<< (next_index + raw_replicate_ptrs.size() - 1) << ")";
if (enable_compression_on_cache_miss_ && !context.route_via_proxy) {
// Compress messages read from the log if:
// (1) the feature is enabled through
// enable_compression_on_cache_miss_ flag
// (2) the request is not for a proxy host (the payload is discarded for
// a proxy request and it is wasteful to compress it here)
vector<ReplicateMsg*> compressed_replicate_ptrs;
(void) CompressMsgs(raw_replicate_ptrs, &compressed_replicate_ptrs);
// TODO (vinay): Refactor this to not have to copy pointers again into
// original vector
raw_replicate_ptrs.clear();
raw_replicate_ptrs.reserve(compressed_replicate_ptrs.size());
raw_replicate_ptrs.insert(
raw_replicate_ptrs.end(),
compressed_replicate_ptrs.begin(),
compressed_replicate_ptrs.end());
}
if (!context.route_via_proxy) {
// Compute crc checksums for the payload that was read from the log
// Note that this is done _only_ for non-proxy requests because payload
// is discarded for proxy requests
for (ReplicateMsg* msg : raw_replicate_ptrs) {
const std::string& payload = msg->write_payload().payload();
uint32_t payload_crc32 = crc::Crc32c(payload.c_str(), payload.size());
msg->mutable_write_payload()->set_crc32(payload_crc32);
}
}
l.lock();
for (ReplicateMsg* msg : raw_replicate_ptrs) {
CHECK_EQ(next_index, msg->id().index());
remaining_space -= TotalByteSizeForMessage(*msg);
if (remaining_space > 0 || messages->empty()) {
messages->push_back(make_scoped_refptr_replicate(msg));
next_index++;
} else {
delete msg;
}
}
} else {
// Pull contiguous messages from the cache until the size limit is achieved.
for (; iter != cache_.end(); ++iter) {
const ReplicateRefPtr& msg = iter->second.msg;
int64_t index = msg->get()->id().index();
if (index != next_index) {
continue;
}
remaining_space -= TotalByteSizeForMessage(*msg->get());
if (remaining_space < 0 && !messages->empty()) {
break;
}
messages->push_back(msg);
next_index++;
}
}
}
return Status::OK();
}