dsn::error_code pegasus_server_impl::start()

in src/server/pegasus_server_impl.cpp [1509:1806]


dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
    CHECK_PREFIX_MSG(!_is_open, "replica is already opened");
    LOG_INFO_PREFIX("start to open app {}", data_dir());

    // parse envs for parameters
    // envs is compounded in replication_app_base::open() function
    std::map<std::string, std::string> envs;
    if (argc > 0) {
        if ((argc - 1) % 2 != 0) {
            LOG_ERROR_PREFIX("parse envs failed, invalid argc = {}", argc);
            return dsn::ERR_INVALID_PARAMETERS;
        }
        if (argv == nullptr) {
            LOG_ERROR_PREFIX("parse envs failed, invalid argv = nullptr");
            return dsn::ERR_INVALID_PARAMETERS;
        }
        int idx = 1;
        while (idx < argc) {
            const char *key = argv[idx++];
            const char *value = argv[idx++];
            envs.emplace(key, value);
        }
    }
    // Update all envs before opening db, ensure all envs are effective for the newly opened db.
    update_app_envs_before_open_db(envs);

    // TODO(yingchun): refactor the following code
    //
    // here, we must distinguish three cases, such as:
    //  case 1: we open the db that already exist
    //  case 2: we load duplication data base checkpoint from master
    //  case 3: we open a new db
    //  case 4: we restore the db base on old data
    //
    // if we want to restore the db base on old data, only all of the restore preconditions are
    // satisfied
    //      restore preconditions:
    //          1, rdb isn't exist
    //          2, we can parse restore info from app env, which is stored in argv
    //          3, restore_dir is exist
    //
    bool db_exist = true;
    auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), "rdb");
    auto duplication_path = duplication_dir();
    if (dsn::utils::filesystem::path_exists(rdb_path)) {
        // only case 1
        LOG_INFO_PREFIX("rdb is already exist, path = {}", rdb_path);
    } else {
        // case 2
        if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) {
            if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) {
                LOG_ERROR_PREFIX(
                    "load duplication data from {} to {} failed", duplication_path, rdb_path);
                return dsn::ERR_FILE_OPERATION_FAILED;
            }
        } else {
            std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
            const std::string &restore_dir = restore_info.first;
            bool force_restore = restore_info.second;
            if (restore_dir.empty()) {
                // case 3
                if (force_restore) {
                    LOG_ERROR_PREFIX("try to restore, but we can't combine restore_dir from envs");
                    return dsn::ERR_FILE_OPERATION_FAILED;
                } else {
                    db_exist = false;
                    LOG_DEBUG_PREFIX("open a new db, path = {}", rdb_path);
                }
            } else {
                // case 4
                LOG_INFO_PREFIX("try to restore from restore_dir = {}", restore_dir);
                if (dsn::utils::filesystem::directory_exists(restore_dir)) {
                    // here, we just rename restore_dir to rdb, then continue the normal process
                    if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) {
                        LOG_INFO_PREFIX(
                            "rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path);
                    } else {
                        LOG_ERROR_PREFIX(
                            "rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path);
                        return dsn::ERR_FILE_OPERATION_FAILED;
                    }
                } else {
                    if (force_restore) {
                        LOG_ERROR_PREFIX(
                            "try to restore, but restore_dir isn't exist, restore_dir = {}",
                            restore_dir);
                        return dsn::ERR_FILE_OPERATION_FAILED;
                    } else {
                        db_exist = false;
                        LOG_WARNING_PREFIX(
                            "try to restore and restore_dir({}) isn't exist, but we don't force "
                            "it, the role of this replica must not primary, so we open a new db on "
                            "the "
                            "path({})",
                            restore_dir,
                            rdb_path);
                    }
                }
            }
        }
    }

    LOG_INFO_PREFIX("start to open rocksDB's rdb({})", rdb_path);

    // Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
    // will be used elsewhere.
    _table_data_cf_opts = _data_cf_opts;
    _table_data_cf_opts_recalculated = false;
    bool has_incompatible_db_options = false;
    if (db_exist) {
        // When DB exists, meta CF and data CF must be present.
        bool missing_meta_cf = true;
        bool missing_data_cf = true;
        auto ec = check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf);
        if (ec != dsn::ERR_OK) {
            LOG_ERROR_PREFIX("check column families failed");
            return ec;
        }
        CHECK_PREFIX_MSG(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
        CHECK_PREFIX_MSG(!missing_data_cf, "Missing data column family");

        // Load latest options from option file stored in the db directory.
        rocksdb::DBOptions loaded_db_opt;
        std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
        rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
        // Set `ignore_unknown_options` true for forward compatibility.
        auto status = rocksdb::LoadLatestOptions(rdb_path,
                                                 rocksdb::Env::Default(),
                                                 &loaded_db_opt,
                                                 &loaded_cf_descs,
                                                 /*ignore_unknown_options=*/true);
        if (!status.ok()) {
            // Here we ignore an invalid argument error related to `pegasus_data_version` and
            // `pegasus_data` options, which were used in old version rocksdbs (before 2.1.0).
            if (status.code() != rocksdb::Status::kInvalidArgument ||
                status.ToString().find("pegasus_data") == std::string::npos) {
                LOG_ERROR_PREFIX("load latest option file failed: {}.", status.ToString());
                return dsn::ERR_LOCAL_APP_FAILURE;
            }
            has_incompatible_db_options = true;
            LOG_WARNING_PREFIX(
                "The latest option file has incompatible db options: {}, use default "
                "options to open db.",
                status.ToString());
        }

        if (!has_incompatible_db_options) {
            for (int i = 0; i < loaded_cf_descs.size(); ++i) {
                if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
                    loaded_data_cf_opts = loaded_cf_descs[i].options;
                }
            }
            // Reset usage scenario related options according to loaded_data_cf_opts.
            // We don't use `loaded_data_cf_opts` directly because pointer-typed options will
            // only be initialized with default values when calling 'LoadLatestOptions', see
            // 'rocksdb/utilities/options_util.h'.
            reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
            _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
        }
    } else {
        // When create new DB, we have to create a new column family to store meta data (meta column
        // family).
        _db_opts.create_missing_column_families = true;
        _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
    }

    std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
        {{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
    auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
                                                rocksdb::Env::Default(),
                                                _db_opts,
                                                column_families,
                                                /*ignore_unknown_options=*/true);
    if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
        LOG_ERROR_PREFIX("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
        return dsn::ERR_LOCAL_APP_FAILURE;
    }
    std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
    auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db);
    if (!status.ok()) {
        LOG_ERROR_PREFIX("rocksdb::DB::Open failed, error = {}", status.ToString());
        return dsn::ERR_LOCAL_APP_FAILURE;
    }
    CHECK_EQ_PREFIX(2, handles_opened.size());
    CHECK_EQ_PREFIX(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
    CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
    _data_cf = handles_opened[0];
    _meta_cf = handles_opened[1];

    // Create _meta_store which provide Pegasus meta data read and write.
    _meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);

    if (db_exist) {
        auto cleanup = dsn::defer([this]() { release_db(); });
        uint64_t decree = 0;
        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
                              _meta_store->get_last_flushed_decree(&decree),
                              "get_last_flushed_decree failed");
        _last_committed_decree.store(static_cast<int64_t>(decree));
        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
                              _meta_store->get_data_version(&_pegasus_data_version),
                              "get_data_version failed");
        _usage_scenario = _meta_store->get_usage_scenario();
        uint64_t last_manual_compact_finish_time = 0;
        LOG_AND_RETURN_NOT_OK(
            ERROR_PREFIX,
            _meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
            "get_last_manual_compact_finish_time failed");
        LOG_AND_RETURN_NOT_TRUE(ERROR_PREFIX,
                                _pegasus_data_version <= PEGASUS_DATA_VERSION_MAX,
                                dsn::ERR_LOCAL_APP_FAILURE,
                                "open app failed, unsupported data version {}",
                                _pegasus_data_version);
        // update last manual compact finish timestamp
        _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
        cleanup.cancel();
    } else {
        // Write initial meta data to meta CF and flush when create new DB.
        _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
        _meta_store->set_last_flushed_decree(0);
        _meta_store->set_last_manual_compact_finish_time(0);
        flush_all_family_columns(true);
    }

    // only enable filter after correct pegasus_data_version set
    _key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
    _key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
    _key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
    _key_ttl_compaction_filter_factory->EnableFilter();

    parse_checkpoints();

    // checkpoint if necessary to make last_durable_decree() fresh.
    // only need async checkpoint because we sure that memtable is empty now.
    int64_t last_flushed = static_cast<int64_t>(_last_committed_decree);
    if (last_flushed != last_durable_decree()) {
        LOG_INFO_PREFIX(
            "start to do async checkpoint, last_durable_decree = {}, last_flushed_decree = {}",
            last_durable_decree(),
            last_flushed);
        auto err = async_checkpoint(false);
        if (err != dsn::ERR_OK) {
            LOG_ERROR_PREFIX("create checkpoint failed, error = {}", err.to_string());
            release_db();
            return err;
        }
        CHECK_EQ_PREFIX(last_flushed, last_durable_decree());
    }

    LOG_INFO_PREFIX("open app succeed, pegasus_data_version = {}, last_durable_decree = {}",
                    _pegasus_data_version,
                    last_durable_decree());

    _is_open = true;

    if (!db_exist) {
        // When create a new db, update usage scenario according to app envs.
        update_usage_scenario(envs);
    }

    LOG_DEBUG_PREFIX("start the update replica-level rocksdb statistics timer task");
    _update_replica_rdb_stat =
        dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
                                    &_tracker,
                                    [this]() { this->update_replica_rocksdb_statistics(); },
                                    std::chrono::seconds(FLAGS_update_rdb_stat_interval));

    // These counters are singletons on this server shared by all replicas, their metrics update
    // task should be scheduled once an interval on the server view.
    static std::once_flag flag;
    std::call_once(flag, [&]() {
        // The timer task will always running even though there is no replicas
        CHECK_NE(kServerStatUpdateTimeSec.count(), 0);
        _update_server_rdb_stat = dsn::tasking::enqueue_timer(
            LPC_REPLICATION_LONG_COMMON,
            nullptr, // TODO: the tracker is nullptr, we will fix it later
            []() { update_server_rocksdb_statistics(); },
            kServerStatUpdateTimeSec);
    });

    // initialize cu calculator and write service after server being initialized.
    _cu_calculator = std::make_unique<capacity_unit_calculator>(
        this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller);
    _server_write = std::make_unique<pegasus_server_write>(this);

    dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
                                &_tracker,
                                [this]() { _read_hotkey_collector->analyse_data(); },
                                std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));

    dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
                                &_tracker,
                                [this]() { _write_hotkey_collector->analyse_data(); },
                                std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));

    return dsn::ERR_OK;
}