Status DoDeterministicCompact()

in tools/db_bench_tool.cc [5100:5403]


  Status DoDeterministicCompact(ThreadState* thread,
                                CompactionStyle compaction_style,
                                WriteMode write_mode) {
#ifndef ROCKSDB_LITE
    ColumnFamilyMetaData meta;
    std::vector<DB*> db_list;
    if (db_.db != nullptr) {
      db_list.push_back(db_.db);
    } else {
      for (auto& db : multi_dbs_) {
        db_list.push_back(db.db);
      }
    }
    std::vector<Options> options_list;
    for (auto db : db_list) {
      options_list.push_back(db->GetOptions());
      if (compaction_style != kCompactionStyleFIFO) {
        db->SetOptions({{"disable_auto_compactions", "1"},
                        {"level0_slowdown_writes_trigger", "400000000"},
                        {"level0_stop_writes_trigger", "400000000"}});
      } else {
        db->SetOptions({{"disable_auto_compactions", "1"}});
      }
    }

    assert(!db_list.empty());
    auto num_db = db_list.size();
    size_t num_levels = static_cast<size_t>(open_options_.num_levels);
    size_t output_level = open_options_.num_levels - 1;
    std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
    std::vector<size_t> num_files_at_level0(num_db, 0);
    if (compaction_style == kCompactionStyleLevel) {
      if (num_levels == 0) {
        return Status::InvalidArgument("num_levels should be larger than 1");
      }
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          if (sorted_runs[i].size() == output_level) {
            auto& L1 = sorted_runs[i].back();
            L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
            should_stop = true;
            continue;
          }
        }
        writes_ /= static_cast<int64_t>(open_options_.max_bytes_for_level_multiplier);
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels - 1) {
          fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
        compactionOptions.compression = FLAGS_compression_type_e;
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
              MaxFileSizeForLevel(mutable_cf_options,
                  static_cast<int>(output_level), compaction_style);
          std::cout << sorted_runs[i][j].size() << std::endl;
          db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
                                               sorted_runs[i][j].front().name},
                           static_cast<int>(output_level - j) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleUniversal) {
      auto ratio = open_options_.compaction_options_universal.size_ratio;
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          num_files_at_level0[i] = meta.levels[0].files.size();
        }
        writes_ =  static_cast<int64_t>(writes_* static_cast<double>(100) / (ratio + 200));
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels) {
          fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt  " levels\n", num_levels);
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
        compactionOptions.compression = FLAGS_compression_type_e;
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
              MaxFileSizeForLevel(mutable_cf_options,
                  static_cast<int>(output_level), compaction_style);
          db->CompactFiles(
              compactionOptions,
              {sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
              (output_level > j ? static_cast<int>(output_level - j)
                                : 0) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleFIFO) {
      if (num_levels != 1) {
        return Status::InvalidArgument(
          "num_levels should be 1 for FIFO compaction");
      }
      if (FLAGS_num_multi_db != 0) {
        return Status::InvalidArgument("Doesn't support multiDB");
      }
      auto db = db_list[0];
      std::vector<std::string> file_names;
      while (true) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        db->Flush(FlushOptions());
        db->GetColumnFamilyMetaData(&meta);
        auto total_size = meta.levels[0].size;
        if (total_size >=
          db->GetOptions().compaction_options_fifo.max_table_files_size) {
          for (auto file_meta : meta.levels[0].files) {
            file_names.emplace_back(file_meta.name);
          }
          break;
        }
      }
      // TODO(shuzhang1989): Investigate why CompactFiles not working
      // auto compactionOptions = CompactionOptions();
      // db->CompactFiles(compactionOptions, file_names, 0);
      auto compactionOptions = CompactRangeOptions();
      db->CompactRange(compactionOptions, nullptr, nullptr);
    } else {
      fprintf(stdout,
              "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
              "filldeterministic");
      return Status::InvalidArgument("None compaction is not supported");
    }

// Verify seqno and key range
// Note: the seqno get changed at the max level by implementation
// optimization, so skip the check of the max level.
#ifndef NDEBUG
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      db->GetColumnFamilyMetaData(&meta);
      // verify the number of sorted runs
      if (compaction_style == kCompactionStyleLevel) {
        assert(num_levels - 1 == sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleUniversal) {
        assert(meta.levels[0].files.size() + num_levels - 1 ==
               sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleFIFO) {
        // TODO(gzh): FIFO compaction
        db->GetColumnFamilyMetaData(&meta);
        auto total_size = meta.levels[0].size;
        assert(total_size <=
          db->GetOptions().compaction_options_fifo.max_table_files_size);
          break;
      }

      // verify smallest/largest seqno and key range of each sorted run
      auto max_level = num_levels - 1;
      int level;
      for (size_t i = 0; i < sorted_runs[k].size(); i++) {
        level = static_cast<int>(max_level - i);
        SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
        SequenceNumber sorted_run_largest_seqno = 0;
        std::string sorted_run_smallest_key, sorted_run_largest_key;
        bool first_key = true;
        for (auto fileMeta : sorted_runs[k][i]) {
          sorted_run_smallest_seqno =
              std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
          sorted_run_largest_seqno =
              std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
            sorted_run_smallest_key = fileMeta.smallestkey;
          }
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.largestkey, sorted_run_largest_key) > 0) {
            sorted_run_largest_key = fileMeta.largestkey;
          }
          first_key = false;
        }
        if (compaction_style == kCompactionStyleLevel ||
            (compaction_style == kCompactionStyleUniversal && level > 0)) {
          SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
          SequenceNumber level_largest_seqno = 0;
          for (auto fileMeta : meta.levels[level].files) {
            level_smallest_seqno =
                std::min(level_smallest_seqno, fileMeta.smallest_seqno);
            level_largest_seqno =
                std::max(level_largest_seqno, fileMeta.largest_seqno);
          }
          assert(sorted_run_smallest_key ==
                 meta.levels[level].files.front().smallestkey);
          assert(sorted_run_largest_key ==
                 meta.levels[level].files.back().largestkey);
          if (level != static_cast<int>(max_level)) {
            // compaction at max_level would change sequence number
            assert(sorted_run_smallest_seqno == level_smallest_seqno);
            assert(sorted_run_largest_seqno == level_largest_seqno);
          }
        } else if (compaction_style == kCompactionStyleUniversal) {
          // level <= 0 means sorted runs on level 0
          auto level0_file =
              meta.levels[0].files[sorted_runs[k].size() - 1 - i];
          assert(sorted_run_smallest_key == level0_file.smallestkey);
          assert(sorted_run_largest_key == level0_file.largestkey);
          if (level != static_cast<int>(max_level)) {
            assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
            assert(sorted_run_largest_seqno == level0_file.largest_seqno);
          }
        }
      }
    }
#endif
    // print the size of each sorted_run
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      fprintf(stdout,
              "---------------------- DB %" ROCKSDB_PRIszt " LSM ---------------------\n", k);
      db->GetColumnFamilyMetaData(&meta);
      for (auto& levelMeta : meta.levels) {
        if (levelMeta.files.empty()) {
          continue;
        }
        if (levelMeta.level == 0) {
          for (auto& fileMeta : levelMeta.files) {
            fprintf(stdout, "Level[%d]: %s(size: %" PRIi64 " bytes)\n",
                    levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
          }
        } else {
          fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
                  levelMeta.level, levelMeta.files.front().name.c_str(),
                  levelMeta.files.back().name.c_str(), levelMeta.size);
        }
      }
    }
    for (size_t i = 0; i < num_db; i++) {
      db_list[i]->SetOptions(
          {{"disable_auto_compactions",
            std::to_string(options_list[i].disable_auto_compactions)},
           {"level0_slowdown_writes_trigger",
            std::to_string(options_list[i].level0_slowdown_writes_trigger)},
           {"level0_stop_writes_trigger",
            std::to_string(options_list[i].level0_stop_writes_trigger)}});
    }
    return Status::OK();
#else
    (void)thread;
    (void)compaction_style;
    (void)write_mode;
    fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
    return Status::NotSupported(
        "Rocksdb Lite doesn't support filldeterministic");
#endif  // ROCKSDB_LITE
  }