in src/kudu/tablet/tablet.cc [2010:2312]
Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input,
int64_t mrs_being_flushed,
const vector<TxnInfoBeingFlushed>& txns_being_flushed) {
const char *op_name =
(mrs_being_flushed == TabletMetadata::kNoMrsFlushed) ? "Compaction" : "Flush";
TRACE_EVENT2("tablet", "Tablet::DoMergeCompactionOrFlush",
"tablet_id", tablet_id(),
"op", op_name);
// Save the stats on the total on-disk size of all deltas in selected rowsets.
size_t deltas_on_disk_size = 0;
if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
deltas_on_disk_size = GetAllDeltasSizeOnDisk(input);
}
const auto& tid = tablet_id();
const IOContext io_context({ tid });
const SchemaPtr schema_ptr = schema();
MvccSnapshot flush_snap(mvcc_);
VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). "
"Phase 1 snapshot: $1",
op_name, flush_snap.ToString());
// Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
"PostTakeMvccSnapshot hook failed");
}
std::shared_ptr<MemTracker> parent_tracker =
MemTracker::FindOrCreateGlobalTracker(-1, "rowset_merge_compaction");
std::shared_ptr<MemTracker> tracker =
MemTracker::CreateTracker(-1, Substitute("rowset_merge_compaction:$0", tid),
parent_tracker);
// Create input of rowsets by iterating through all rowsets and for each rowset:
// - For compaction ops, create input that contains initialized base,
// relevant REDO and UNDO delta iterators to be used read from persistent storage.
// - For Flush ops, create iterator for in-memory tree holding data updates.
shared_ptr<CompactionOrFlushInput> merge;
RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
schema_ptr.get(),
&io_context,
parent_tracker,
tracker,
&merge));
// Initializing a DRS writer, to be used later for writing REDO, UNDO deltas, delta stats, etc.
RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(),
compaction_policy_->target_rowset_size());
RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
// Get tablet history, to be used later for AHM validation checks.
HistoryGcOpts history_gc_opts = GetHistoryGcOpts();
// Apply REDO and UNDO deltas to the rows, merge histories of rows with 'ghost' entries.
RETURN_NOT_OK_PREPEND(
FlushCompactionInput(
tid, metadata_->fs_manager()->block_manager()->error_manager(),
merge.get(), flush_snap, history_gc_opts, &drsw),
"Flush to disk failed");
RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
// Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(),
"PostWriteSnapshot hook failed");
}
// Though unlikely, it's possible that no rows were written because all of
// the input rows were GCed in this compaction. In that case, we don't
// actually want to reopen.
if (drsw.rows_written_count() == 0) {
LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows "
<< "were GCed!) Removing all input rowsets.";
return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed,
txns_being_flushed);
}
// The RollingDiskRowSet writer wrote out one or more RowSets as the
// output. Open these into 'new_rowsets'.
const auto& new_drs_metas(drsw.GetWrittenRowSetMetadata());
DCHECK(!new_drs_metas.empty());
if (metrics_) {
metrics_->bytes_flushed->IncrementBy(drsw.written_size());
}
// Open all the rowsets (that were processed in this stage) from disk and
// store the pointers to each of rowset inside new_disk_rowsets.
vector<shared_ptr<RowSet>> new_disk_rowsets;
new_disk_rowsets.reserve(new_drs_metas.size());
{
TRACE_EVENT0("tablet", "Opening compaction results");
for (const auto& meta : new_drs_metas) {
// TODO(awong): it'd be nice to plumb delta stats from the rowset writer
// into the new deltafile readers opened here.
shared_ptr<DiskRowSet> new_rowset;
Status s = DiskRowSet::Open(meta,
log_anchor_registry_.get(),
mem_trackers_,
&io_context,
&new_rowset);
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(WARNING) << "Unable to open snapshot " << op_name << " results "
<< meta->ToString() << ": " << s.ToString();
return s;
}
new_disk_rowsets.emplace_back(std::move(new_rowset));
}
}
// Setup for Phase 2: Start duplicating any new updates into the new on-disk
// rowsets.
//
// During Phase 1, we may have missed some updates which came into the input
// rowsets while we were writing. So, we can't immediately start reading from
// the on-disk rowsets alone. Starting here, we continue to read from the
// original rowset(s), but mirror updates to both the input and the output
// data.
//
// It's crucial that, during the rest of the compaction, we do not allow the
// output rowsets to flush their deltas to disk. This is to avoid the following
// bug:
// - during phase 1, timestamp 1 updates a flushed row. This is only reflected in the
// input rowset. (ie it is a "missed delta")
// - during phase 2, timestamp 2 updates the same row. This is reflected in both the
// input and output, because of the DuplicatingRowSet.
// - now suppose the output rowset were allowed to flush deltas. This would create the
// first DeltaFile for the output rowset, with only timestamp 2.
// - Now we run the "ReupdateMissedDeltas", and copy over the first op to the
// output DMS, which later flushes.
// The end result would be that redos[0] has timestamp 2, and redos[1] has timestamp 1.
// This breaks an invariant that the redo files are time-ordered, and we would probably
// reapply the deltas in the wrong order on the read path.
//
// The way that we avoid this case is that DuplicatingRowSet's FlushDeltas method is a
// no-op.
VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 2 (starting to "
"duplicate updates in new rowsets)",
op_name);
shared_ptr<DuplicatingRowSet> inprogress_rowset(
make_shared<DuplicatingRowSet>(input.rowsets(), new_disk_rowsets));
// The next step is to swap in the DuplicatingRowSet, and at the same time,
// determine an MVCC snapshot which includes all of the ops that saw a
// pre-DuplicatingRowSet version of components_.
MvccSnapshot non_duplicated_ops_snap;
vector<Timestamp> applying_during_swap;
{
TRACE_EVENT0("tablet", "Swapping DuplicatingRowSet");
// Taking component_lock_ in write mode ensures that no new ops can
// StartApplying() (or snapshot components_) during this block.
std::lock_guard lock(component_lock_);
AtomicSwapRowSetsUnlocked(input.rowsets(), { inprogress_rowset });
// NOTE: ops may *commit* in between these two lines.
// We need to make sure all such ops end up in the 'applying_during_swap'
// list, the 'non_duplicated_ops_snap' snapshot, or both. Thus it's crucial
// that these next two lines are in this order!
mvcc_.GetApplyingOpsTimestamps(&applying_during_swap);
non_duplicated_ops_snap = MvccSnapshot(mvcc_);
}
// All ops committed in 'non_duplicated_ops_snap' saw the pre-swap
// components_. Additionally, any ops that were APPLYING during the above
// block by definition _started_ doing so before the swap. Hence those ops
// also need to get included in non_duplicated_ops_snap. To do so, we wait
// for them to commit, and then manually include them into our snapshot.
if (VLOG_IS_ON(1) && !applying_during_swap.empty()) {
VLOG_WITH_PREFIX(1) << "Waiting for " << applying_during_swap.size()
<< " mid-APPLY ops to commit before finishing compaction...";
for (const Timestamp& ts : applying_during_swap) {
VLOG_WITH_PREFIX(1) << " " << ts.value();
}
}
// This wait is a little bit conservative - technically we only need to wait
// for those ops in 'applying_during_swap', but MVCC doesn't implement the
// ability to wait for a specific set. So instead we wait for all currently
// applying -- a bit more than we need, but still correct.
RETURN_NOT_OK(mvcc_.WaitForApplyingOpsToApply());
// Then we want to consider all those ops that were in-flight when we did the
// swap as committed in 'non_duplicated_ops_snap'.
non_duplicated_ops_snap.AddAppliedTimestamps(applying_during_swap);
// Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(),
"PostSwapInDuplicatingRowSet hook failed");
}
// Store the stats on the max memory used for compaction phase 1.
const size_t peak_mem_usage_ph1 = merge->memory_footprint();
// Phase 2. Here we re-scan the compaction input, copying those missed updates into the
// new rowset's DeltaTracker.
VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates "
"which arrived during Phase 1. Snapshot: $1",
op_name, non_duplicated_ops_snap.ToString());
const SchemaPtr schema_ptr2 = schema();
RETURN_NOT_OK_PREPEND(input.CreateCompactionOrFlushInput(non_duplicated_ops_snap,
schema_ptr2.get(),
&io_context,
parent_tracker,
tracker,
&merge),
Substitute("Failed to create $0 inputs", op_name).c_str());
// Update the output rowsets with the deltas that came in in phase 1, before we swapped
// in the DuplicatingRowSets. This will perform a flush of the updated DeltaTrackers
// in the end so that the data that is reported in the log as belonging to the input
// rowsets is flushed.
RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(&io_context,
merge.get(),
history_gc_opts,
flush_snap,
non_duplicated_ops_snap,
new_disk_rowsets),
Substitute("Failed to re-update deltas missed during $0 phase 1",
op_name).c_str());
// Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(),
"PostReupdateMissedDeltas hook failed");
}
// ------------------------------
// Flush was successful.
// Run fault points used by some integration tests.
if (input.num_rowsets() > 1) {
MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_compaction);
} else if (input.num_rowsets() == 1 &&
input.rowsets()[0]->OnDiskBaseDataSizeWithRedos() == 0) {
MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_flush_mrs);
}
// Write out the new Tablet Metadata and remove old rowsets.
RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed,
txns_being_flushed),
"Failed to flush new tablet metadata");
// Now that we've completed the operation, mark any rowsets that have been
// compacted, preventing them from being considered for future compactions.
for (const auto& rs : input.rowsets()) {
rs->set_has_been_compacted();
}
// Replace the compacted rowsets with the new on-disk rowsets, making them visible now that
// their metadata was written to disk.
AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
UpdateAverageRowsetHeight();
const size_t peak_mem_usage = std::max(peak_mem_usage_ph1,
merge->memory_footprint());
// For rowset merge compactions, update the stats on the max peak memory used
// and ratio of the amount of memory used to the size of all deltas on disk.
if (deltas_on_disk_size > 0) {
// Update the peak memory usage metric.
metrics_->compact_rs_mem_usage->Increment(peak_mem_usage);
// Update the ratio of the peak memory usage to the size of deltas on disk.
// To keep the stats relevant for larger rowsets, filter out rowsets with
// relatively small amount of data in deltas. Update the memory-to-disk size
// ratio metric only when the on-disk size of deltas crosses the configured
// threshold.
const int64_t min_deltas_size_bytes =
FLAGS_rowset_compaction_estimate_min_deltas_size_mb * 1024 * 1024;
if (deltas_on_disk_size > min_deltas_size_bytes) {
// Round up the ratio. Since the ratio is used to estimate the amount of
// memory needed to perform merge rowset compaction based on the amount of
// data stored in rowsets' deltas, it's safer to provide an upper rather
// than a lower bound estimate.
metrics_->compact_rs_mem_usage_to_deltas_size_ratio->Increment(
(peak_mem_usage + deltas_on_disk_size - 1) / deltas_on_disk_size);
}
}
const auto rows_written = drsw.rows_written_count();
const auto drs_written = drsw.drs_written_count();
const auto bytes_written = drsw.written_size();
TRACE_COUNTER_INCREMENT("rows_written", rows_written);
TRACE_COUNTER_INCREMENT("drs_written", drs_written);
TRACE_COUNTER_INCREMENT("bytes_written", bytes_written);
TRACE_COUNTER_INCREMENT("peak_mem_usage", peak_mem_usage);
VLOG_WITH_PREFIX(1) << Substitute("$0 successful on $1 rows ($2 rowsets, $3 bytes)",
op_name,
rows_written,
drs_written,
bytes_written);
// Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),
"PostSwapNewRowSet hook failed");
}
return Status::OK();
}