Status DBImpl::CompactRangeInternal()

in db/db_impl/db_impl_compaction_flush.cc [970:1185]


Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
                                    ColumnFamilyHandle* column_family,
                                    const Slice* begin, const Slice* end,
                                    const std::string& trim_ts) {
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  auto cfd = cfh->cfd();

  if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
    return Status::InvalidArgument("Invalid target path ID");
  }

  bool flush_needed = true;

  // Update full_history_ts_low if it's set
  if (options.full_history_ts_low != nullptr &&
      !options.full_history_ts_low->empty()) {
    std::string ts_low = options.full_history_ts_low->ToString();
    if (begin != nullptr || end != nullptr) {
      return Status::InvalidArgument(
          "Cannot specify compaction range with full_history_ts_low");
    }
    Status s = IncreaseFullHistoryTsLowImpl(cfd, ts_low);
    if (!s.ok()) {
      LogFlush(immutable_db_options_.info_log);
      return s;
    }
  }

  Status s;
  if (begin != nullptr && end != nullptr) {
    // TODO(ajkr): We could also optimize away the flush in certain cases where
    // one/both sides of the interval are unbounded. But it requires more
    // changes to RangesOverlapWithMemtables.
    Range range(*begin, *end);
    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
    s = cfd->RangesOverlapWithMemtables(
        {range}, super_version, immutable_db_options_.allow_data_in_errors,
        &flush_needed);
    CleanupSuperVersion(super_version);
  }

  if (s.ok() && flush_needed) {
    FlushOptions fo;
    fo.allow_write_stall = options.allow_write_stall;
    if (immutable_db_options_.atomic_flush) {
      autovector<ColumnFamilyData*> cfds;
      mutex_.Lock();
      SelectColumnFamiliesForAtomicFlush(&cfds);
      mutex_.Unlock();
      s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
                               false /* writes_stopped */);
    } else {
      s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
                        false /* writes_stopped*/);
    }
    if (!s.ok()) {
      LogFlush(immutable_db_options_.info_log);
      return s;
    }
  }

  constexpr int kInvalidLevel = -1;
  int final_output_level = kInvalidLevel;
  bool exclusive = options.exclusive_manual_compaction;
  if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
      cfd->NumberLevels() > 1) {
    // Always compact all files together.
    final_output_level = cfd->NumberLevels() - 1;
    // if bottom most level is reserved
    if (immutable_db_options_.allow_ingest_behind) {
      final_output_level--;
    }
    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
                            final_output_level, options, begin, end, exclusive,
                            false, port::kMaxUint64, trim_ts);
  } else {
    int first_overlapped_level = kInvalidLevel;
    int max_overlapped_level = kInvalidLevel;
    {
      SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
      Version* current_version = super_version->current;
      ReadOptions ro;
      ro.total_order_seek = true;
      bool overlap;
      for (int level = 0;
           level < current_version->storage_info()->num_non_empty_levels();
           level++) {
        overlap = true;
        if (begin != nullptr && end != nullptr) {
          Status status = current_version->OverlapWithLevelIterator(
              ro, file_options_, *begin, *end, level, &overlap);
          if (!status.ok()) {
            overlap = current_version->storage_info()->OverlapInLevel(
                level, begin, end);
          }
        } else {
          overlap = current_version->storage_info()->OverlapInLevel(level,
                                                                    begin, end);
        }
        if (overlap) {
          if (first_overlapped_level == kInvalidLevel) {
            first_overlapped_level = level;
          }
          max_overlapped_level = level;
        }
      }
      CleanupSuperVersion(super_version);
    }
    if (s.ok() && first_overlapped_level != kInvalidLevel) {
      // max_file_num_to_ignore can be used to filter out newly created SST
      // files, useful for bottom level compaction in a manual compaction
      uint64_t max_file_num_to_ignore = port::kMaxUint64;
      uint64_t next_file_number = versions_->current_next_file_number();
      final_output_level = max_overlapped_level;
      int output_level;
      for (int level = first_overlapped_level; level <= max_overlapped_level;
           level++) {
        bool disallow_trivial_move = false;
        // in case the compaction is universal or if we're compacting the
        // bottom-most level, the output level will be the same as input one.
        // level 0 can never be the bottommost level (i.e. if all files are in
        // level 0, we will compact to level 1)
        if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
            cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
          output_level = level;
        } else if (level == max_overlapped_level && level > 0) {
          if (options.bottommost_level_compaction ==
              BottommostLevelCompaction::kSkip) {
            // Skip bottommost level compaction
            continue;
          } else if (options.bottommost_level_compaction ==
                         BottommostLevelCompaction::kIfHaveCompactionFilter &&
                     cfd->ioptions()->compaction_filter == nullptr &&
                     cfd->ioptions()->compaction_filter_factory == nullptr) {
            // Skip bottommost level compaction since we don't have a compaction
            // filter
            continue;
          }
          output_level = level;
          // update max_file_num_to_ignore only for bottom level compaction
          // because data in newly compacted files in middle levels may still
          // need to be pushed down
          max_file_num_to_ignore = next_file_number;
        } else {
          output_level = level + 1;
          if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
              cfd->ioptions()->level_compaction_dynamic_level_bytes &&
              level == 0) {
            output_level = ColumnFamilyData::kCompactToBaseLevel;
          }
          // if it's a BottommostLevel compaction and `kForce*` compaction is
          // set, disallow trivial move
          if (level == max_overlapped_level &&
              (options.bottommost_level_compaction ==
                   BottommostLevelCompaction::kForce ||
               options.bottommost_level_compaction ==
                   BottommostLevelCompaction::kForceOptimized)) {
            disallow_trivial_move = true;
          }
        }
        // trim_ts need real compaction to remove latest record
        if (!trim_ts.empty()) {
          disallow_trivial_move = true;
        }
        s = RunManualCompaction(cfd, level, output_level, options, begin, end,
                                exclusive, disallow_trivial_move,
                                max_file_num_to_ignore, trim_ts);
        if (!s.ok()) {
          break;
        }
        if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
          final_output_level = cfd->NumberLevels() - 1;
        } else if (output_level > final_output_level) {
          final_output_level = output_level;
        }
        TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
        TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
      }
    }
  }
  if (!s.ok() || final_output_level == kInvalidLevel) {
    LogFlush(immutable_db_options_.info_log);
    return s;
  }

  if (options.change_level) {
    TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
    TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");

    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[RefitLevel] waiting for background threads to stop");
    DisableManualCompaction();
    s = PauseBackgroundWork();
    if (s.ok()) {
      TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
      s = ReFitLevel(cfd, final_output_level, options.target_level);
      TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
      // ContinueBackgroundWork always return Status::OK().
      Status temp_s = ContinueBackgroundWork();
      assert(temp_s.ok());
    }
    EnableManualCompaction();
    TEST_SYNC_POINT(
        "DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled");
  }
  LogFlush(immutable_db_options_.info_log);

  {
    InstrumentedMutexLock l(&mutex_);
    // an automatic compaction that has been scheduled might have been
    // preempted by the manual compactions. Need to schedule it back.
    MaybeScheduleFlushOrCompaction();
  }

  return s;
}