in src/shell/commands/local_partition_split.cpp [338:545]
bool split_partition(const LocalPartitionSplitContext &lpsc,
const ToSplitPatition &tsp,
const std::string &dst_replicas_dir,
const std::string &tmp_split_replicas_dir,
PartitionSplitResult &psr)
{
static const std::string kRdbDirPostfix =
dsn::utils::filesystem::path_combine(dsn::replication::replication_app_base::kDataDir,
dsn::replication::replication_app_base::kRdbDir);
const auto rdb_dir = dsn::utils::filesystem::path_combine(tsp.replica_dir, kRdbDirPostfix);
fmt::print(stdout, " start to split '{}'\n", rdb_dir);
// 1. Open the original rocksdb in read-only mode.
rocksdb::DBOptions db_opts;
// The following options should be set in Pegasus 2.0 and lower versions.
// db_opts.pegasus_data = true;
// db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;
const std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
{{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
{pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
std::vector<rocksdb::ColumnFamilyHandle *> cf_hdls;
rocksdb::DB *db = nullptr;
RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, &cf_hdls, &db), "");
// 2. Get metadata from rocksdb.
// - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
// file.
// - In Pegasus 2.0, the metadata is stored both in the metadata column family and
// MANIFEST file.
// - Since Pegasus 2.1, the metadata is only stored in the metadata column family.
auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), db, cf_hdls[1]);
uint64_t last_committed_decree;
RETURN_FALSE_IF_NON_OK(ms->get_last_flushed_decree(&last_committed_decree),
"get_last_flushed_decree from '{}' failed",
rdb_dir);
uint32_t pegasus_data_version;
RETURN_FALSE_IF_NON_OK(
ms->get_data_version(&pegasus_data_version), "get_data_version from '{}' failed", rdb_dir);
uint64_t last_manual_compact_finish_time;
RETURN_FALSE_IF_NON_OK(
ms->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
"get_last_manual_compact_finish_time from '{}' failed",
rdb_dir);
// 3. Get all live sst files.
std::vector<rocksdb::LiveFileMetaData> files;
db->GetLiveFilesMetaData(&files);
// 4. Close rocksdb.
release_db(&cf_hdls, &db);
// 5. Split the sst files.
auto files_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_partition)));
psr.fsrs.reserve(files.size());
for (const auto &file : files) {
// Skip metadata column family files, we will write metadata manually later in
// the new DB.
if (file.column_family_name == pegasus::server::meta_store::META_COLUMN_FAMILY_NAME) {
fmt::print(
stdout, " skip [{}]: {}: {}\n", file.column_family_name, file.db_path, file.name);
continue;
}
// Statistic the file split result.
psr.fsrs.emplace_back();
auto &sfr = psr.fsrs.back();
sfr.filename = file.name;
sfr.split_counts.resize(lpsc.split_count);
files_thread_pool->SubmitJob([=, &sfr]() {
sfr.success =
split_file(lpsc, tsp, file, tmp_split_replicas_dir, pegasus_data_version, sfr);
});
}
files_thread_pool->WaitForJobsAndJoinAllThreads();
files_thread_pool.reset();
// 6. Create new rocksdb instances for the new partitions.
// TODO(yingchun): poolize the following operations if necessary.
for (int i = 0; i < lpsc.split_count; i++) {
// The new replica is placed in 'dst_replicas_dir'.
const auto new_replica_dir =
construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, i);
const auto new_rdb_dir =
dsn::utils::filesystem::path_combine(new_replica_dir, kRdbDirPostfix);
// i. Create the directory for the split rocksdb.
// TODO(yingchun): make sure it's not exist!
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir),
"create directory '{}' failed",
new_rdb_dir);
// ii. Open new rocksdb.
rocksdb::DBOptions new_db_opts;
new_db_opts.create_if_missing = true;
// Create the 'pegasus_meta_cf' column family.
new_db_opts.create_missing_column_families = true;
RETURN_FALSE_IF_NOT(open_rocksdb(new_db_opts, new_rdb_dir, false, cf_dscs, &cf_hdls, &db),
"");
const auto count_of_new_replica =
psr.key_count_by_dst_replica_dirs.insert({new_replica_dir, -1});
CHECK_TRUE(count_of_new_replica.second);
// iii. Ingest the split sst files to the new rocksdb.
do {
// Skip non-exist directory.
const auto dst_tmp_rdb_dir =
construct_split_directory(tmp_split_replicas_dir, tsp, lpsc.dst_app_id, i);
if (!dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
break;
}
// Gather all files.
rocksdb::IngestExternalFileArg arg;
arg.column_family = cf_hdls[0];
RETURN_FALSE_IF_NOT(
dsn::utils::filesystem::get_subfiles(dst_tmp_rdb_dir, arg.external_files, false),
"get sub-files from '{}' failed",
dst_tmp_rdb_dir);
// Skip empty directory.
if (arg.external_files.empty()) {
break;
}
// Ingest files.
RETURN_FALSE_IF_NON_RDB_OK(db->IngestExternalFiles({arg}),
"ingest files from '{}' to '{}' failed",
dst_tmp_rdb_dir,
new_rdb_dir);
// Optional full compaction.
if (lpsc.post_full_compact) {
RETURN_FALSE_IF_NON_RDB_OK(
db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr),
"full compact rocksdb in '{}' failed",
new_rdb_dir);
}
// Optional data counting.
if (lpsc.post_count) {
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator({}));
int new_total_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
new_total_count++;
}
count_of_new_replica.first->second = new_total_count;
}
} while (false);
// iv. Set metadata to rocksdb.
// - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
// file.
// - In Pegasus 2.0, the metadata is stored both in the metadata column family and
// MANIFEST file.
// - Since Pegasus 2.1, the metadata is only stored in the metadata column family.
// TODO(yingchun): these metadata are only written to the metadata column family,
// not the manifest file. So this tool is not supporting Pegasus versions lower
// than 2.0.
// For Pegasus 2.0, it's needed to set [pegasus.server]get_meta_store_type =
// "metacf" when restart replica servers after using this tool.
auto new_ms =
std::make_unique<pegasus::server::meta_store>(new_rdb_dir.c_str(), db, cf_hdls[1]);
new_ms->set_data_version(pegasus_data_version);
new_ms->set_last_flushed_decree(last_committed_decree);
new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
rocksdb::FlushOptions options;
options.wait = true;
RETURN_FALSE_IF_NON_RDB_OK(
db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", new_rdb_dir);
// v. Close rocksdb.
release_db(&cf_hdls, &db);
// vi. Generate new ".app-info".
dsn::app_info new_ai(tsp.ai);
new_ai.app_name = lpsc.dst_app_name;
new_ai.app_id = static_cast<int32_t>(lpsc.dst_app_id);
new_ai.partition_count = static_cast<int32_t>(lpsc.dst_partition_count);
// Note that the online partition split used 'init_partition_count' field will be
// reset.
new_ai.init_partition_count = -1;
dsn::replication::replica_app_info rai(&new_ai);
const auto rai_path = dsn::utils::filesystem::path_combine(
new_replica_dir, dsn::replication::replica_app_info::kAppInfo);
RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info '{}' failed", rai_path);
// vii. Generate new ".init-info".
dsn::replication::replica_init_info new_rii(tsp.rii);
new_rii.init_offset_in_shared_log = 0;
new_rii.init_offset_in_private_log = 0;
const auto rii_path =
dsn::utils::filesystem::path_combine(new_replica_dir, replica_init_info::kInitInfo);
RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, rii_path),
"write replica_init_info '{}' failed",
rii_path);
}
if (std::any_of(psr.fsrs.begin(), psr.fsrs.end(), [](const FileSplitResult &fsr) {
return !fsr.success;
})) {
return false;
}
return true;
}