Status Tablet::DoMergeCompactionOrFlush()

in src/kudu/tablet/tablet.cc [2010:2312]


Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input,
                                        int64_t mrs_being_flushed,
                                        const vector<TxnInfoBeingFlushed>& txns_being_flushed) {
  const char *op_name =
        (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) ? "Compaction" : "Flush";
  TRACE_EVENT2("tablet", "Tablet::DoMergeCompactionOrFlush",
               "tablet_id", tablet_id(),
               "op", op_name);

  // Save the stats on the total on-disk size of all deltas in selected rowsets.
  size_t deltas_on_disk_size = 0;
  if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
    deltas_on_disk_size = GetAllDeltasSizeOnDisk(input);
  }

  const auto& tid = tablet_id();
  const IOContext io_context({ tid });

  const SchemaPtr schema_ptr = schema();
  MvccSnapshot flush_snap(mvcc_);
  VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). "
                                    "Phase 1 snapshot: $1",
                                    op_name, flush_snap.ToString());

  // Fault injection hook for testing and debugging purpose only.
  if (common_hooks_) {
    RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
                          "PostTakeMvccSnapshot hook failed");
  }

  std::shared_ptr<MemTracker> parent_tracker =
      MemTracker::FindOrCreateGlobalTracker(-1, "rowset_merge_compaction");
  std::shared_ptr<MemTracker> tracker =
      MemTracker::CreateTracker(-1, Substitute("rowset_merge_compaction:$0", tid),
                                parent_tracker);

  // Create input of rowsets by iterating through all rowsets and for each rowset:
  //   - For compaction ops, create input that contains initialized base,
  //     relevant REDO and UNDO delta iterators to be used read from persistent storage.
  //   - For Flush ops, create iterator for in-memory tree holding data updates.
  shared_ptr<CompactionOrFlushInput> merge;
  RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
                                                   schema_ptr.get(),
                                                   &io_context,
                                                   parent_tracker,
                                                   tracker,
                                                   &merge));

  // Initializing a DRS writer, to be used later for writing REDO, UNDO deltas, delta stats, etc.
  RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(),
                               compaction_policy_->target_rowset_size());
  RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");

  // Get tablet history, to be used later for AHM validation checks.
  HistoryGcOpts history_gc_opts = GetHistoryGcOpts();

  // Apply REDO and UNDO deltas to the rows, merge histories of rows with 'ghost' entries.
  RETURN_NOT_OK_PREPEND(
      FlushCompactionInput(
          tid, metadata_->fs_manager()->block_manager()->error_manager(),
          merge.get(), flush_snap, history_gc_opts, &drsw),
      "Flush to disk failed");
  RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");

  // Fault injection hook for testing and debugging purpose only.
  if (common_hooks_) {
    RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(),
                          "PostWriteSnapshot hook failed");
  }

  // Though unlikely, it's possible that no rows were written because all of
  // the input rows were GCed in this compaction. In that case, we don't
  // actually want to reopen.
  if (drsw.rows_written_count() == 0) {
    LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows "
                          << "were GCed!)  Removing all input rowsets.";
    return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed,
                                        txns_being_flushed);
  }

  // The RollingDiskRowSet writer wrote out one or more RowSets as the
  // output. Open these into 'new_rowsets'.
  const auto& new_drs_metas(drsw.GetWrittenRowSetMetadata());
  DCHECK(!new_drs_metas.empty());

  if (metrics_) {
    metrics_->bytes_flushed->IncrementBy(drsw.written_size());
  }

  // Open all the rowsets (that were processed in this stage) from disk and
  // store the pointers to each of rowset inside new_disk_rowsets.
  vector<shared_ptr<RowSet>> new_disk_rowsets;
  new_disk_rowsets.reserve(new_drs_metas.size());
  {
    TRACE_EVENT0("tablet", "Opening compaction results");
    for (const auto& meta : new_drs_metas) {
      // TODO(awong): it'd be nice to plumb delta stats from the rowset writer
      // into the new deltafile readers opened here.
      shared_ptr<DiskRowSet> new_rowset;
      Status s = DiskRowSet::Open(meta,
                                  log_anchor_registry_.get(),
                                  mem_trackers_,
                                  &io_context,
                                  &new_rowset);
      if (PREDICT_FALSE(!s.ok())) {
        LOG_WITH_PREFIX(WARNING) << "Unable to open snapshot " << op_name << " results "
                                 << meta->ToString() << ": " << s.ToString();
        return s;
      }
      new_disk_rowsets.emplace_back(std::move(new_rowset));
    }
  }

  // Setup for Phase 2: Start duplicating any new updates into the new on-disk
  // rowsets.
  //
  // During Phase 1, we may have missed some updates which came into the input
  // rowsets while we were writing. So, we can't immediately start reading from
  // the on-disk rowsets alone. Starting here, we continue to read from the
  // original rowset(s), but mirror updates to both the input and the output
  // data.
  //
  // It's crucial that, during the rest of the compaction, we do not allow the
  // output rowsets to flush their deltas to disk. This is to avoid the following
  // bug:
  // - during phase 1, timestamp 1 updates a flushed row. This is only reflected in the
  //   input rowset. (ie it is a "missed delta")
  // - during phase 2, timestamp 2 updates the same row. This is reflected in both the
  //   input and output, because of the DuplicatingRowSet.
  // - now suppose the output rowset were allowed to flush deltas. This would create the
  //   first DeltaFile for the output rowset, with only timestamp 2.
  // - Now we run the "ReupdateMissedDeltas", and copy over the first op to the
  //   output DMS, which later flushes.
  // The end result would be that redos[0] has timestamp 2, and redos[1] has timestamp 1.
  // This breaks an invariant that the redo files are time-ordered, and we would probably
  // reapply the deltas in the wrong order on the read path.
  //
  // The way that we avoid this case is that DuplicatingRowSet's FlushDeltas method is a
  // no-op.
  VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 2 (starting to "
                                    "duplicate updates in new rowsets)",
                                    op_name);
  shared_ptr<DuplicatingRowSet> inprogress_rowset(
      make_shared<DuplicatingRowSet>(input.rowsets(), new_disk_rowsets));

  // The next step is to swap in the DuplicatingRowSet, and at the same time,
  // determine an MVCC snapshot which includes all of the ops that saw a
  // pre-DuplicatingRowSet version of components_.
  MvccSnapshot non_duplicated_ops_snap;
  vector<Timestamp> applying_during_swap;
  {
    TRACE_EVENT0("tablet", "Swapping DuplicatingRowSet");
    // Taking component_lock_ in write mode ensures that no new ops can
    // StartApplying() (or snapshot components_) during this block.
    std::lock_guard lock(component_lock_);
    AtomicSwapRowSetsUnlocked(input.rowsets(), { inprogress_rowset });

    // NOTE: ops may *commit* in between these two lines.
    // We need to make sure all such ops end up in the 'applying_during_swap'
    // list, the 'non_duplicated_ops_snap' snapshot, or both. Thus it's crucial
    // that these next two lines are in this order!
    mvcc_.GetApplyingOpsTimestamps(&applying_during_swap);
    non_duplicated_ops_snap = MvccSnapshot(mvcc_);
  }

  // All ops committed in 'non_duplicated_ops_snap' saw the pre-swap
  // components_. Additionally, any ops that were APPLYING during the above
  // block by definition _started_ doing so before the swap. Hence those ops
  // also need to get included in non_duplicated_ops_snap. To do so, we wait
  // for them to commit, and then manually include them into our snapshot.
  if (VLOG_IS_ON(1) && !applying_during_swap.empty()) {
    VLOG_WITH_PREFIX(1) << "Waiting for " << applying_during_swap.size()
                        << " mid-APPLY ops to commit before finishing compaction...";
    for (const Timestamp& ts : applying_during_swap) {
      VLOG_WITH_PREFIX(1) << "  " << ts.value();
    }
  }

  // This wait is a little bit conservative - technically we only need to wait
  // for those ops in 'applying_during_swap', but MVCC doesn't implement the
  // ability to wait for a specific set. So instead we wait for all currently
  // applying -- a bit more than we need, but still correct.
  RETURN_NOT_OK(mvcc_.WaitForApplyingOpsToApply());

  // Then we want to consider all those ops that were in-flight when we did the
  // swap as committed in 'non_duplicated_ops_snap'.
  non_duplicated_ops_snap.AddAppliedTimestamps(applying_during_swap);

  // Fault injection hook for testing and debugging purpose only.
  if (common_hooks_) {
    RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(),
                          "PostSwapInDuplicatingRowSet hook failed");
  }

  // Store the stats on the max memory used for compaction phase 1.
  const size_t peak_mem_usage_ph1 = merge->memory_footprint();

  // Phase 2. Here we re-scan the compaction input, copying those missed updates into the
  // new rowset's DeltaTracker.
  VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates "
                                    "which arrived during Phase 1. Snapshot: $1",
                                    op_name, non_duplicated_ops_snap.ToString());
  const SchemaPtr schema_ptr2 = schema();
  RETURN_NOT_OK_PREPEND(input.CreateCompactionOrFlushInput(non_duplicated_ops_snap,
                                                           schema_ptr2.get(),
                                                           &io_context,
                                                           parent_tracker,
                                                           tracker,
                                                           &merge),
                        Substitute("Failed to create $0 inputs", op_name).c_str());

  // Update the output rowsets with the deltas that came in in phase 1, before we swapped
  // in the DuplicatingRowSets. This will perform a flush of the updated DeltaTrackers
  // in the end so that the data that is reported in the log as belonging to the input
  // rowsets is flushed.
  RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(&io_context,
                                             merge.get(),
                                             history_gc_opts,
                                             flush_snap,
                                             non_duplicated_ops_snap,
                                             new_disk_rowsets),
        Substitute("Failed to re-update deltas missed during $0 phase 1",
                     op_name).c_str());

  // Fault injection hook for testing and debugging purpose only.
  if (common_hooks_) {
    RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(),
                          "PostReupdateMissedDeltas hook failed");
  }

  // ------------------------------
  // Flush was successful.

  // Run fault points used by some integration tests.
  if (input.num_rowsets() > 1) {
    MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_compaction);
  } else if (input.num_rowsets() == 1 &&
      input.rowsets()[0]->OnDiskBaseDataSizeWithRedos() == 0) {
    MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_flush_mrs);
  }

  // Write out the new Tablet Metadata and remove old rowsets.
  RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed,
                                      txns_being_flushed),
                        "Failed to flush new tablet metadata");

  // Now that we've completed the operation, mark any rowsets that have been
  // compacted, preventing them from being considered for future compactions.
  for (const auto& rs : input.rowsets()) {
    rs->set_has_been_compacted();
  }

  // Replace the compacted rowsets with the new on-disk rowsets, making them visible now that
  // their metadata was written to disk.
  AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
  UpdateAverageRowsetHeight();

  const size_t peak_mem_usage = std::max(peak_mem_usage_ph1,
                                         merge->memory_footprint());
  // For rowset merge compactions, update the stats on the max peak memory used
  // and ratio of the amount of memory used to the size of all deltas on disk.
  if (deltas_on_disk_size > 0) {
    // Update the peak memory usage metric.
    metrics_->compact_rs_mem_usage->Increment(peak_mem_usage);

    // Update the ratio of the peak memory usage to the size of deltas on disk.
    // To keep the stats relevant for larger rowsets, filter out rowsets with
    // relatively small amount of data in deltas. Update the memory-to-disk size
    // ratio metric only when the on-disk size of deltas crosses the configured
    // threshold.
    const int64_t min_deltas_size_bytes =
        FLAGS_rowset_compaction_estimate_min_deltas_size_mb * 1024 * 1024;
    if (deltas_on_disk_size > min_deltas_size_bytes) {
      // Round up the ratio. Since the ratio is used to estimate the amount of
      // memory needed to perform merge rowset compaction based on the amount of
      // data stored in rowsets' deltas, it's safer to provide an upper rather
      // than a lower bound estimate.
      metrics_->compact_rs_mem_usage_to_deltas_size_ratio->Increment(
          (peak_mem_usage + deltas_on_disk_size - 1) / deltas_on_disk_size);
    }
  }

  const auto rows_written = drsw.rows_written_count();
  const auto drs_written = drsw.drs_written_count();
  const auto bytes_written = drsw.written_size();
  TRACE_COUNTER_INCREMENT("rows_written", rows_written);
  TRACE_COUNTER_INCREMENT("drs_written", drs_written);
  TRACE_COUNTER_INCREMENT("bytes_written", bytes_written);
  TRACE_COUNTER_INCREMENT("peak_mem_usage", peak_mem_usage);
  VLOG_WITH_PREFIX(1) << Substitute("$0 successful on $1 rows ($2 rowsets, $3 bytes)",
                                    op_name,
                                    rows_written,
                                    drs_written,
                                    bytes_written);

  // Fault injection hook for testing and debugging purpose only.
  if (common_hooks_) {
    RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),
                          "PostSwapNewRowSet hook failed");
  }

  return Status::OK();
}