in db/flush_job.cc [356:627]
Status FlushJob::MemPurge() {
Status s;
db_mutex_->AssertHeld();
db_mutex_->Unlock();
assert(!mems_.empty());
// Measure purging time.
const uint64_t start_micros = clock_->NowMicros();
const uint64_t start_cpu_micros = clock_->CPUMicros();
MemTable* new_mem = nullptr;
// For performance/log investigation purposes:
// look at how much useful payload we harvest in the new_mem.
// This value is then printed to the DB log.
double new_mem_capacity = 0.0;
// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
// Range Tombstones (from DeleteRanges).
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
std::vector<InternalIterator*> memtables;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
for (MemTable* m : mems_) {
memtables.push_back(m->NewIterator(ro, &arena));
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
}
assert(!memtables.empty());
SequenceNumber first_seqno = kMaxSequenceNumber;
SequenceNumber earliest_seqno = kMaxSequenceNumber;
// Pick first and earliest seqno as min of all first_seqno
// and earliest_seqno of the mempurged memtables.
for (const auto& mem : mems_) {
first_seqno = mem->GetFirstSequenceNumber() < first_seqno
? mem->GetFirstSequenceNumber()
: first_seqno;
earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
? mem->GetEarliestSequenceNumber()
: earliest_seqno;
}
ScopedArenaIterator iter(
NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena));
auto* ioptions = cfd_->ioptions();
// Place iterator at the First (meaning most recent) key node.
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
existing_snapshots_));
for (auto& rd_iter : range_del_iters) {
range_del_agg->AddTombstones(std::move(rd_iter));
}
// If there is valid data in the memtable,
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
// MaxSize is the size of a memtable.
size_t maxSize = mutable_cf_options_.write_buffer_size;
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
TableFileCreationReason::kFlush)) {
CompactionFilter::Context ctx;
ctx.is_full_compaction = false;
ctx.is_manual_compaction = false;
ctx.column_family_id = cfd_->GetID();
ctx.reason = TableFileCreationReason::kFlush;
compaction_filter =
ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return s;
}
}
new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
mutable_cf_options_, cfd_->write_buffer_mgr(),
earliest_seqno, cfd_->GetID());
assert(new_mem != nullptr);
Env* env = db_options_.env;
assert(env);
MergeHelper merge(
env, (cfd_->internal_comparator()).user_comparator(),
(ioptions->merge_operator).get(), compaction_filter.get(),
ioptions->logger, true /* internal key corruption is not ok */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
snapshot_checker_);
CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env,
ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, ioptions->info_log,
&(cfd_->GetFullHistoryTsLow()));
// Set earliest sequence number in the new memtable
// to be equal to the earliest sequence number of the
// memtable being flushed (See later if there is a need
// to update this number!).
new_mem->SetEarliestSequenceNumber(earliest_seqno);
// Likewise for first seq number.
new_mem->SetFirstSequenceNumber(first_seqno);
SequenceNumber new_first_seqno = kMaxSequenceNumber;
c_iter.SeekToFirst();
// Key transfer
for (; c_iter.Valid(); c_iter.Next()) {
const ParsedInternalKey ikey = c_iter.ikey();
const Slice value = c_iter.value();
new_first_seqno =
ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
// Should we update "OldestKeyTime" ???? -> timestamp appear
// to still be an "experimental" feature.
s = new_mem->Add(
ikey.sequence, ikey.type, ikey.user_key, value,
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
// If new_mem has size greater than maxSize,
// then rollback to regular flush operation,
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted("Mempurge filled more than one memtable.");
new_mem_capacity = 1.0;
break;
}
}
// Check status and propagate
// potential error status from c_iter
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
// Range tombstone transfer.
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
new_first_seqno =
tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
s = new_mem->Add(
tombstone.seq_, // Sequence number
kTypeRangeDeletion, // KV type
tombstone.start_key_, // Key is start key.
tombstone.end_key_, // Value is end key.
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
// If new_mem has size greater than maxSize,
// then rollback to regular flush operation,
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
new_mem_capacity = 1.0;
break;
}
}
}
// If everything happened smoothly and new_mem contains valid data,
// decide if it is flushed to storage or kept in the imm()
// memtable list (memory).
if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
// Rectify the first sequence number, which (unlike the earliest seq
// number) needs to be present in the new memtable.
new_mem->SetFirstSequenceNumber(new_first_seqno);
// The new_mem is added to the list of immutable memtables
// only if it filled at less than 100% capacity and isn't flagged
// as in need of being flushed.
if (new_mem->ApproximateMemoryUsage() < maxSize &&
!(new_mem->ShouldFlushNow())) {
db_mutex_->Lock();
uint64_t new_mem_id = mems_[0]->GetID();
new_mem->SetID(new_mem_id);
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first flushed memtable.
db_mutex_->AssertHeld();
meta_.fd.file_size = 0;
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif // !ROCKSDB_LITE
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
new_mem_capacity = 1.0;
if (new_mem) {
job_context_->memtables_to_free.push_back(new_mem);
}
}
} else {
// In this case, the newly allocated new_mem is empty.
assert(new_mem != nullptr);
job_context_->memtables_to_free.push_back(new_mem);
}
}
// Reacquire the mutex for WriteLevel0 function.
db_mutex_->Lock();
// If mempurge successful, don't write input tables to level0,
// but write any full output table to level0.
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
} else {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
}
const uint64_t micros = clock_->NowMicros() - start_micros;
const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Mempurge lasted %" PRIu64
" microseconds, and %" PRIu64
" cpu "
"microseconds. Status is %s ok. Perc capacity: %f\n",
cfd_->GetName().c_str(), job_context_->job_id, micros,
cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
return s;
}