in tools/db_bench_tool.cc [5100:5403]
Status DoDeterministicCompact(ThreadState* thread,
CompactionStyle compaction_style,
WriteMode write_mode) {
#ifndef ROCKSDB_LITE
ColumnFamilyMetaData meta;
std::vector<DB*> db_list;
if (db_.db != nullptr) {
db_list.push_back(db_.db);
} else {
for (auto& db : multi_dbs_) {
db_list.push_back(db.db);
}
}
std::vector<Options> options_list;
for (auto db : db_list) {
options_list.push_back(db->GetOptions());
if (compaction_style != kCompactionStyleFIFO) {
db->SetOptions({{"disable_auto_compactions", "1"},
{"level0_slowdown_writes_trigger", "400000000"},
{"level0_stop_writes_trigger", "400000000"}});
} else {
db->SetOptions({{"disable_auto_compactions", "1"}});
}
}
assert(!db_list.empty());
auto num_db = db_list.size();
size_t num_levels = static_cast<size_t>(open_options_.num_levels);
size_t output_level = open_options_.num_levels - 1;
std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
std::vector<size_t> num_files_at_level0(num_db, 0);
if (compaction_style == kCompactionStyleLevel) {
if (num_levels == 0) {
return Status::InvalidArgument("num_levels should be larger than 1");
}
bool should_stop = false;
while (!should_stop) {
if (sorted_runs[0].empty()) {
DoWrite(thread, write_mode);
} else {
DoWrite(thread, UNIQUE_RANDOM);
}
for (size_t i = 0; i < num_db; i++) {
auto db = db_list[i];
db->Flush(FlushOptions());
db->GetColumnFamilyMetaData(&meta);
if (num_files_at_level0[i] == meta.levels[0].files.size() ||
writes_ == 0) {
should_stop = true;
continue;
}
sorted_runs[i].emplace_back(
meta.levels[0].files.begin(),
meta.levels[0].files.end() - num_files_at_level0[i]);
num_files_at_level0[i] = meta.levels[0].files.size();
if (sorted_runs[i].back().size() == 1) {
should_stop = true;
continue;
}
if (sorted_runs[i].size() == output_level) {
auto& L1 = sorted_runs[i].back();
L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
should_stop = true;
continue;
}
}
writes_ /= static_cast<int64_t>(open_options_.max_bytes_for_level_multiplier);
}
for (size_t i = 0; i < num_db; i++) {
if (sorted_runs[i].size() < num_levels - 1) {
fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
exit(1);
}
}
for (size_t i = 0; i < num_db; i++) {
auto db = db_list[i];
auto compactionOptions = CompactionOptions();
compactionOptions.compression = FLAGS_compression_type_e;
auto options = db->GetOptions();
MutableCFOptions mutable_cf_options(options);
for (size_t j = 0; j < sorted_runs[i].size(); j++) {
compactionOptions.output_file_size_limit =
MaxFileSizeForLevel(mutable_cf_options,
static_cast<int>(output_level), compaction_style);
std::cout << sorted_runs[i][j].size() << std::endl;
db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
sorted_runs[i][j].front().name},
static_cast<int>(output_level - j) /*level*/);
}
}
} else if (compaction_style == kCompactionStyleUniversal) {
auto ratio = open_options_.compaction_options_universal.size_ratio;
bool should_stop = false;
while (!should_stop) {
if (sorted_runs[0].empty()) {
DoWrite(thread, write_mode);
} else {
DoWrite(thread, UNIQUE_RANDOM);
}
for (size_t i = 0; i < num_db; i++) {
auto db = db_list[i];
db->Flush(FlushOptions());
db->GetColumnFamilyMetaData(&meta);
if (num_files_at_level0[i] == meta.levels[0].files.size() ||
writes_ == 0) {
should_stop = true;
continue;
}
sorted_runs[i].emplace_back(
meta.levels[0].files.begin(),
meta.levels[0].files.end() - num_files_at_level0[i]);
num_files_at_level0[i] = meta.levels[0].files.size();
if (sorted_runs[i].back().size() == 1) {
should_stop = true;
continue;
}
num_files_at_level0[i] = meta.levels[0].files.size();
}
writes_ = static_cast<int64_t>(writes_* static_cast<double>(100) / (ratio + 200));
}
for (size_t i = 0; i < num_db; i++) {
if (sorted_runs[i].size() < num_levels) {
fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
exit(1);
}
}
for (size_t i = 0; i < num_db; i++) {
auto db = db_list[i];
auto compactionOptions = CompactionOptions();
compactionOptions.compression = FLAGS_compression_type_e;
auto options = db->GetOptions();
MutableCFOptions mutable_cf_options(options);
for (size_t j = 0; j < sorted_runs[i].size(); j++) {
compactionOptions.output_file_size_limit =
MaxFileSizeForLevel(mutable_cf_options,
static_cast<int>(output_level), compaction_style);
db->CompactFiles(
compactionOptions,
{sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
(output_level > j ? static_cast<int>(output_level - j)
: 0) /*level*/);
}
}
} else if (compaction_style == kCompactionStyleFIFO) {
if (num_levels != 1) {
return Status::InvalidArgument(
"num_levels should be 1 for FIFO compaction");
}
if (FLAGS_num_multi_db != 0) {
return Status::InvalidArgument("Doesn't support multiDB");
}
auto db = db_list[0];
std::vector<std::string> file_names;
while (true) {
if (sorted_runs[0].empty()) {
DoWrite(thread, write_mode);
} else {
DoWrite(thread, UNIQUE_RANDOM);
}
db->Flush(FlushOptions());
db->GetColumnFamilyMetaData(&meta);
auto total_size = meta.levels[0].size;
if (total_size >=
db->GetOptions().compaction_options_fifo.max_table_files_size) {
for (auto file_meta : meta.levels[0].files) {
file_names.emplace_back(file_meta.name);
}
break;
}
}
// TODO(shuzhang1989): Investigate why CompactFiles not working
// auto compactionOptions = CompactionOptions();
// db->CompactFiles(compactionOptions, file_names, 0);
auto compactionOptions = CompactRangeOptions();
db->CompactRange(compactionOptions, nullptr, nullptr);
} else {
fprintf(stdout,
"%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
"filldeterministic");
return Status::InvalidArgument("None compaction is not supported");
}
// Verify seqno and key range
// Note: the seqno get changed at the max level by implementation
// optimization, so skip the check of the max level.
#ifndef NDEBUG
for (size_t k = 0; k < num_db; k++) {
auto db = db_list[k];
db->GetColumnFamilyMetaData(&meta);
// verify the number of sorted runs
if (compaction_style == kCompactionStyleLevel) {
assert(num_levels - 1 == sorted_runs[k].size());
} else if (compaction_style == kCompactionStyleUniversal) {
assert(meta.levels[0].files.size() + num_levels - 1 ==
sorted_runs[k].size());
} else if (compaction_style == kCompactionStyleFIFO) {
// TODO(gzh): FIFO compaction
db->GetColumnFamilyMetaData(&meta);
auto total_size = meta.levels[0].size;
assert(total_size <=
db->GetOptions().compaction_options_fifo.max_table_files_size);
break;
}
// verify smallest/largest seqno and key range of each sorted run
auto max_level = num_levels - 1;
int level;
for (size_t i = 0; i < sorted_runs[k].size(); i++) {
level = static_cast<int>(max_level - i);
SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
SequenceNumber sorted_run_largest_seqno = 0;
std::string sorted_run_smallest_key, sorted_run_largest_key;
bool first_key = true;
for (auto fileMeta : sorted_runs[k][i]) {
sorted_run_smallest_seqno =
std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
sorted_run_largest_seqno =
std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
if (first_key ||
db->DefaultColumnFamily()->GetComparator()->Compare(
fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
sorted_run_smallest_key = fileMeta.smallestkey;
}
if (first_key ||
db->DefaultColumnFamily()->GetComparator()->Compare(
fileMeta.largestkey, sorted_run_largest_key) > 0) {
sorted_run_largest_key = fileMeta.largestkey;
}
first_key = false;
}
if (compaction_style == kCompactionStyleLevel ||
(compaction_style == kCompactionStyleUniversal && level > 0)) {
SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
SequenceNumber level_largest_seqno = 0;
for (auto fileMeta : meta.levels[level].files) {
level_smallest_seqno =
std::min(level_smallest_seqno, fileMeta.smallest_seqno);
level_largest_seqno =
std::max(level_largest_seqno, fileMeta.largest_seqno);
}
assert(sorted_run_smallest_key ==
meta.levels[level].files.front().smallestkey);
assert(sorted_run_largest_key ==
meta.levels[level].files.back().largestkey);
if (level != static_cast<int>(max_level)) {
// compaction at max_level would change sequence number
assert(sorted_run_smallest_seqno == level_smallest_seqno);
assert(sorted_run_largest_seqno == level_largest_seqno);
}
} else if (compaction_style == kCompactionStyleUniversal) {
// level <= 0 means sorted runs on level 0
auto level0_file =
meta.levels[0].files[sorted_runs[k].size() - 1 - i];
assert(sorted_run_smallest_key == level0_file.smallestkey);
assert(sorted_run_largest_key == level0_file.largestkey);
if (level != static_cast<int>(max_level)) {
assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
assert(sorted_run_largest_seqno == level0_file.largest_seqno);
}
}
}
}
#endif
// print the size of each sorted_run
for (size_t k = 0; k < num_db; k++) {
auto db = db_list[k];
fprintf(stdout,
"---------------------- DB %" ROCKSDB_PRIszt " LSM ---------------------\n", k);
db->GetColumnFamilyMetaData(&meta);
for (auto& levelMeta : meta.levels) {
if (levelMeta.files.empty()) {
continue;
}
if (levelMeta.level == 0) {
for (auto& fileMeta : levelMeta.files) {
fprintf(stdout, "Level[%d]: %s(size: %" PRIi64 " bytes)\n",
levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
}
} else {
fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
levelMeta.level, levelMeta.files.front().name.c_str(),
levelMeta.files.back().name.c_str(), levelMeta.size);
}
}
}
for (size_t i = 0; i < num_db; i++) {
db_list[i]->SetOptions(
{{"disable_auto_compactions",
std::to_string(options_list[i].disable_auto_compactions)},
{"level0_slowdown_writes_trigger",
std::to_string(options_list[i].level0_slowdown_writes_trigger)},
{"level0_stop_writes_trigger",
std::to_string(options_list[i].level0_stop_writes_trigger)}});
}
return Status::OK();
#else
(void)thread;
(void)compaction_style;
(void)write_mode;
fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
return Status::NotSupported(
"Rocksdb Lite doesn't support filldeterministic");
#endif // ROCKSDB_LITE
}