bool split_partition()

in src/shell/commands/local_partition_split.cpp [338:545]


bool split_partition(const LocalPartitionSplitContext &lpsc,
                     const ToSplitPatition &tsp,
                     const std::string &dst_replicas_dir,
                     const std::string &tmp_split_replicas_dir,
                     PartitionSplitResult &psr)
{
    static const std::string kRdbDirPostfix =
        dsn::utils::filesystem::path_combine(dsn::replication::replication_app_base::kDataDir,
                                             dsn::replication::replication_app_base::kRdbDir);
    const auto rdb_dir = dsn::utils::filesystem::path_combine(tsp.replica_dir, kRdbDirPostfix);
    fmt::print(stdout, " start to split '{}'\n", rdb_dir);

    // 1. Open the original rocksdb in read-only mode.
    rocksdb::DBOptions db_opts;
    // The following options should be set in Pegasus 2.0 and lower versions.
    // db_opts.pegasus_data = true;
    // db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;
    const std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
        {{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
         {pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
    std::vector<rocksdb::ColumnFamilyHandle *> cf_hdls;
    rocksdb::DB *db = nullptr;
    RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, &cf_hdls, &db), "");

    // 2. Get metadata from rocksdb.
    // - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
    //   file.
    // - In Pegasus 2.0, the metadata is stored both in the metadata column family and
    //   MANIFEST file.
    // - Since Pegasus 2.1, the metadata is only stored in the metadata column family.
    auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), db, cf_hdls[1]);
    uint64_t last_committed_decree;
    RETURN_FALSE_IF_NON_OK(ms->get_last_flushed_decree(&last_committed_decree),
                           "get_last_flushed_decree from '{}' failed",
                           rdb_dir);

    uint32_t pegasus_data_version;
    RETURN_FALSE_IF_NON_OK(
        ms->get_data_version(&pegasus_data_version), "get_data_version from '{}' failed", rdb_dir);

    uint64_t last_manual_compact_finish_time;
    RETURN_FALSE_IF_NON_OK(
        ms->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
        "get_last_manual_compact_finish_time from '{}' failed",
        rdb_dir);

    // 3. Get all live sst files.
    std::vector<rocksdb::LiveFileMetaData> files;
    db->GetLiveFilesMetaData(&files);

    // 4. Close rocksdb.
    release_db(&cf_hdls, &db);

    // 5. Split the sst files.
    auto files_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
        rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_partition)));
    psr.fsrs.reserve(files.size());
    for (const auto &file : files) {
        // Skip metadata column family files, we will write metadata manually later in
        // the new DB.
        if (file.column_family_name == pegasus::server::meta_store::META_COLUMN_FAMILY_NAME) {
            fmt::print(
                stdout, "  skip [{}]: {}: {}\n", file.column_family_name, file.db_path, file.name);
            continue;
        }

        // Statistic the file split result.
        psr.fsrs.emplace_back();
        auto &sfr = psr.fsrs.back();
        sfr.filename = file.name;
        sfr.split_counts.resize(lpsc.split_count);

        files_thread_pool->SubmitJob([=, &sfr]() {
            sfr.success =
                split_file(lpsc, tsp, file, tmp_split_replicas_dir, pegasus_data_version, sfr);
        });
    }
    files_thread_pool->WaitForJobsAndJoinAllThreads();
    files_thread_pool.reset();

    // 6. Create new rocksdb instances for the new partitions.
    // TODO(yingchun): poolize the following operations if necessary.
    for (int i = 0; i < lpsc.split_count; i++) {
        // The new replica is placed in 'dst_replicas_dir'.
        const auto new_replica_dir =
            construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, i);
        const auto new_rdb_dir =
            dsn::utils::filesystem::path_combine(new_replica_dir, kRdbDirPostfix);

        // i. Create the directory for the split rocksdb.
        // TODO(yingchun): make sure it's not exist!
        RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir),
                            "create directory '{}' failed",
                            new_rdb_dir);

        // ii. Open new rocksdb.
        rocksdb::DBOptions new_db_opts;
        new_db_opts.create_if_missing = true;
        // Create the 'pegasus_meta_cf' column family.
        new_db_opts.create_missing_column_families = true;
        RETURN_FALSE_IF_NOT(open_rocksdb(new_db_opts, new_rdb_dir, false, cf_dscs, &cf_hdls, &db),
                            "");
        const auto count_of_new_replica =
            psr.key_count_by_dst_replica_dirs.insert({new_replica_dir, -1});
        CHECK_TRUE(count_of_new_replica.second);

        // iii. Ingest the split sst files to the new rocksdb.
        do {
            // Skip non-exist directory.
            const auto dst_tmp_rdb_dir =
                construct_split_directory(tmp_split_replicas_dir, tsp, lpsc.dst_app_id, i);
            if (!dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
                break;
            }

            // Gather all files.
            rocksdb::IngestExternalFileArg arg;
            arg.column_family = cf_hdls[0];
            RETURN_FALSE_IF_NOT(
                dsn::utils::filesystem::get_subfiles(dst_tmp_rdb_dir, arg.external_files, false),
                "get sub-files from '{}' failed",
                dst_tmp_rdb_dir);

            // Skip empty directory.
            if (arg.external_files.empty()) {
                break;
            }

            // Ingest files.
            RETURN_FALSE_IF_NON_RDB_OK(db->IngestExternalFiles({arg}),
                                       "ingest files from '{}' to '{}' failed",
                                       dst_tmp_rdb_dir,
                                       new_rdb_dir);

            // Optional full compaction.
            if (lpsc.post_full_compact) {
                RETURN_FALSE_IF_NON_RDB_OK(
                    db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr),
                    "full compact rocksdb in '{}' failed",
                    new_rdb_dir);
            }

            // Optional data counting.
            if (lpsc.post_count) {
                std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator({}));
                int new_total_count = 0;
                for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
                    new_total_count++;
                }
                count_of_new_replica.first->second = new_total_count;
            }
        } while (false);

        // iv. Set metadata to rocksdb.
        // - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
        //   file.
        // - In Pegasus 2.0, the metadata is stored both in the metadata column family and
        //   MANIFEST file.
        // - Since Pegasus 2.1, the metadata is only stored in the metadata column family.

        // TODO(yingchun): these metadata are only written to the metadata column family,
        //  not the manifest file. So this tool is not supporting Pegasus versions lower
        //  than 2.0.
        //  For Pegasus 2.0, it's needed to set [pegasus.server]get_meta_store_type =
        //  "metacf" when restart replica servers after using this tool.
        auto new_ms =
            std::make_unique<pegasus::server::meta_store>(new_rdb_dir.c_str(), db, cf_hdls[1]);
        new_ms->set_data_version(pegasus_data_version);
        new_ms->set_last_flushed_decree(last_committed_decree);
        new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
        rocksdb::FlushOptions options;
        options.wait = true;
        RETURN_FALSE_IF_NON_RDB_OK(
            db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", new_rdb_dir);

        // v. Close rocksdb.
        release_db(&cf_hdls, &db);

        // vi. Generate new ".app-info".
        dsn::app_info new_ai(tsp.ai);
        new_ai.app_name = lpsc.dst_app_name;
        new_ai.app_id = static_cast<int32_t>(lpsc.dst_app_id);
        new_ai.partition_count = static_cast<int32_t>(lpsc.dst_partition_count);
        // Note that the online partition split used 'init_partition_count' field will be
        // reset.
        new_ai.init_partition_count = -1;
        dsn::replication::replica_app_info rai(&new_ai);
        const auto rai_path = dsn::utils::filesystem::path_combine(
            new_replica_dir, dsn::replication::replica_app_info::kAppInfo);
        RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info '{}' failed", rai_path);

        // vii. Generate new ".init-info".
        dsn::replication::replica_init_info new_rii(tsp.rii);
        new_rii.init_offset_in_shared_log = 0;
        new_rii.init_offset_in_private_log = 0;
        const auto rii_path =
            dsn::utils::filesystem::path_combine(new_replica_dir, replica_init_info::kInitInfo);
        RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, rii_path),
                               "write replica_init_info '{}' failed",
                               rii_path);
    }
    if (std::any_of(psr.fsrs.begin(), psr.fsrs.end(), [](const FileSplitResult &fsr) {
            return !fsr.success;
        })) {
        return false;
    }
    return true;
}