dsn::error_code pegasus_server_impl::start()

in src/server/pegasus_server_impl.cpp [1548:1857]


dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
    CHECK_PREFIX_MSG(!_is_open, "replica is already opened");
    LOG_INFO_PREFIX("start to open app {}", data_dir());

    // parse envs for parameters
    // envs is compounded in replication_app_base::open() function
    std::map<std::string, std::string> envs;
    if (argc > 0) {
        if ((argc - 1) % 2 != 0) {
            LOG_ERROR_PREFIX("parse envs failed, invalid argc = {}", argc);
            return dsn::ERR_INVALID_PARAMETERS;
        }
        if (argv == nullptr) {
            LOG_ERROR_PREFIX("parse envs failed, invalid argv = nullptr");
            return dsn::ERR_INVALID_PARAMETERS;
        }
        int idx = 1;
        while (idx < argc) {
            const char *key = argv[idx++];
            const char *value = argv[idx++];
            envs.emplace(key, value);
        }
    }
    // Update all envs before opening db, ensure all envs are effective for the newly opened db.
    update_app_envs_before_open_db(envs);

    // TODO(yingchun): refactor the following code
    //
    // here, we must distinguish three cases, such as:
    //  case 1: we open the db that already exist
    //  case 2: we load duplication data base checkpoint from master
    //  case 3: we open a new db
    //  case 4: we restore the db base on old data
    //
    // if we want to restore the db base on old data, only all of the restore preconditions are
    // satisfied
    //      restore preconditions:
    //          1, rdb isn't exist
    //          2, we can parse restore info from app env, which is stored in argv
    //          3, restore_dir is exist
    //
    bool db_exist = true;
    auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), replication_app_base::kRdbDir);
    auto duplication_path = duplication_dir();
    if (dsn::utils::filesystem::path_exists(rdb_path)) {
        // only case 1
        LOG_INFO_PREFIX("rdb is already exist, path = {}", rdb_path);
    } else {
        // case 2
        if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) {
            if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) {
                LOG_ERROR_PREFIX(
                    "load duplication data from {} to {} failed", duplication_path, rdb_path);
                return dsn::ERR_FILE_OPERATION_FAILED;
            }
        } else {
            std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
            const std::string &restore_dir = restore_info.first;
            bool force_restore = restore_info.second;
            if (restore_dir.empty()) {
                // case 3
                if (force_restore) {
                    LOG_ERROR_PREFIX("try to restore, but we can't combine restore_dir from envs");
                    return dsn::ERR_FILE_OPERATION_FAILED;
                } else {
                    db_exist = false;
                    LOG_DEBUG_PREFIX("open a new db, path = {}", rdb_path);
                }
            } else {
                // case 4
                LOG_INFO_PREFIX("try to restore from restore_dir = {}", restore_dir);
                if (dsn::utils::filesystem::directory_exists(restore_dir)) {
                    // here, we just rename restore_dir to rdb, then continue the normal process
                    if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) {
                        LOG_INFO_PREFIX(
                            "rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path);
                    } else {
                        LOG_ERROR_PREFIX(
                            "rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path);
                        return dsn::ERR_FILE_OPERATION_FAILED;
                    }
                } else {
                    if (force_restore) {
                        LOG_ERROR_PREFIX(
                            "try to restore, but restore_dir isn't exist, restore_dir = {}",
                            restore_dir);
                        return dsn::ERR_FILE_OPERATION_FAILED;
                    } else {
                        db_exist = false;
                        LOG_WARNING_PREFIX(
                            "try to restore and restore_dir({}) isn't exist, but we don't force "
                            "it, the role of this replica must not primary, so we open a new db on "
                            "the "
                            "path({})",
                            restore_dir,
                            rdb_path);
                    }
                }
            }
        }
    }

    LOG_INFO_PREFIX("start to open rocksDB's rdb({})", rdb_path);

    // Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
    // will be used elsewhere.
    _table_data_cf_opts = _data_cf_opts;
    _table_data_cf_opts_recalculated = false;
    bool has_incompatible_db_options = false;
    if (db_exist) {
        // When DB exists, meta CF and data CF must be present.
        bool missing_meta_cf = true;
        bool missing_data_cf = true;
        auto ec = check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf);
        if (ec != dsn::ERR_OK) {
            LOG_ERROR_PREFIX("check column families failed");
            return ec;
        }
        CHECK_PREFIX_MSG(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
        CHECK_PREFIX_MSG(!missing_data_cf, "Missing data column family");

        // Load latest options from option file stored in the db directory.
        rocksdb::DBOptions loaded_db_opt;
        std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
        rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
        rocksdb::ConfigOptions config_options;
        // Set `ignore_unknown_options` true for forward compatibility.
        config_options.ignore_unknown_options = true;
        config_options.env = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive);
        auto status =
            rocksdb::LoadLatestOptions(config_options, rdb_path, &loaded_db_opt, &loaded_cf_descs);
        if (!status.ok()) {
            // Here we ignore an invalid argument error related to `pegasus_data_version` and
            // `pegasus_data` options, which were used in old version rocksdbs (before 2.1.0).
            if (status.code() != rocksdb::Status::kInvalidArgument ||
                status.ToString().find("pegasus_data") == std::string::npos) {
                LOG_ERROR_PREFIX("load latest option file failed: {}.", status.ToString());
                return dsn::ERR_LOCAL_APP_FAILURE;
            }
            has_incompatible_db_options = true;
            LOG_WARNING_PREFIX(
                "The latest option file has incompatible db options: {}, use default "
                "options to open db.",
                status.ToString());
        }

        if (!has_incompatible_db_options) {
            for (int i = 0; i < loaded_cf_descs.size(); ++i) {
                if (loaded_cf_descs[i].name == meta_store::DATA_COLUMN_FAMILY_NAME) {
                    loaded_data_cf_opts = loaded_cf_descs[i].options;
                }
            }
            // Reset usage scenario related options according to loaded_data_cf_opts.
            // We don't use `loaded_data_cf_opts` directly because pointer-typed options will
            // only be initialized with default values when calling 'LoadLatestOptions', see
            // 'rocksdb/utilities/options_util.h'.
            reset_rocksdb_options(
                loaded_data_cf_opts, loaded_db_opt, envs, &_table_data_cf_opts, &_db_opts);
        }
    } else {
        // When create new DB, we have to create a new column family to store meta data (meta column
        // family).
        _db_opts.create_missing_column_families = true;
        _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
    }

    std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
        {{meta_store::DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts},
         {meta_store::META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
    rocksdb::ConfigOptions config_options;
    config_options.ignore_unknown_options = true;
    config_options.ignore_unsupported_options = true;
    config_options.sanity_level =
        rocksdb::ConfigOptions::SanityLevel::kSanityLevelLooselyCompatible;
    config_options.env = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive);
    auto s =
        rocksdb::CheckOptionsCompatibility(config_options, rdb_path, _db_opts, column_families);
    if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
        LOG_ERROR_PREFIX("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
        return dsn::ERR_LOCAL_APP_FAILURE;
    }
    std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
    auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db);
    if (!status.ok()) {
        LOG_ERROR_PREFIX("rocksdb::DB::Open failed, error = {}", status.ToString());
        return dsn::ERR_LOCAL_APP_FAILURE;
    }
    CHECK_EQ_PREFIX(2, handles_opened.size());
    CHECK_EQ_PREFIX(handles_opened[0]->GetName(), meta_store::DATA_COLUMN_FAMILY_NAME);
    CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME);
    _data_cf = handles_opened[0];
    _meta_cf = handles_opened[1];

    // Create _meta_store which provide Pegasus meta data read and write.
    _meta_store = std::make_unique<meta_store>(replica_name(), _db, _meta_cf);

    if (db_exist) {
        auto cleanup = dsn::defer([this]() { release_db(); });
        uint64_t decree = 0;
        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
                              _meta_store->get_last_flushed_decree(&decree),
                              "get_last_flushed_decree failed");
        _last_committed_decree.store(static_cast<int64_t>(decree));
        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
                              _meta_store->get_data_version(&_pegasus_data_version),
                              "get_data_version failed");
        _usage_scenario = _meta_store->get_usage_scenario();
        uint64_t last_manual_compact_finish_time = 0;
        LOG_AND_RETURN_NOT_OK(
            ERROR_PREFIX,
            _meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
            "get_last_manual_compact_finish_time failed");
        LOG_AND_RETURN_NOT_TRUE(ERROR_PREFIX,
                                _pegasus_data_version <= PEGASUS_DATA_VERSION_MAX,
                                dsn::ERR_LOCAL_APP_FAILURE,
                                "open app failed, unsupported data version {}",
                                _pegasus_data_version);
        // update last manual compact finish timestamp
        _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
        cleanup.cancel();
    } else {
        // Write initial meta data to meta CF and flush when create new DB.
        _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
        _meta_store->set_last_flushed_decree(0);
        _meta_store->set_last_manual_compact_finish_time(0);
        flush_all_family_columns(true);
    }

    // only enable filter after correct pegasus_data_version set
    _key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
    _key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
    _key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
    _key_ttl_compaction_filter_factory->EnableFilter();

    parse_checkpoints();

    // checkpoint if necessary to make last_durable_decree() fresh.
    // only need async checkpoint because we sure that memtable is empty now.
    int64_t last_flushed = static_cast<int64_t>(_last_committed_decree);
    if (last_flushed != last_durable_decree()) {
        LOG_INFO_PREFIX(
            "start to do async checkpoint, last_durable_decree = {}, last_flushed_decree = {}",
            last_durable_decree(),
            last_flushed);
        auto err = async_checkpoint(false);
        if (err != dsn::ERR_OK) {
            LOG_ERROR_PREFIX("create checkpoint failed, error = {}", err);
            release_db();
            return err;
        }
        CHECK_EQ_PREFIX(last_flushed, last_durable_decree());
    }

    LOG_INFO_PREFIX("open app succeed, pegasus_data_version = {}, last_durable_decree = {}",
                    _pegasus_data_version,
                    last_durable_decree());

    _is_open = true;

    if (!db_exist) {
        // When create a new db, update usage scenario according to app envs.
        update_usage_scenario(envs);
    }

    LOG_DEBUG_PREFIX("start the update replica-level rocksdb statistics timer task");
    _update_replica_rdb_stat = dsn::tasking::enqueue_timer(
        LPC_REPLICATION_LONG_COMMON,
        &_tracker,
        [this]() { this->update_replica_rocksdb_statistics(); },
        std::chrono::seconds(FLAGS_update_rdb_stat_interval));

    // These counters are singletons on this server shared by all replicas, their metrics update
    // task should be scheduled once an interval on the server view.
    static std::once_flag flag;
    std::call_once(flag, [&]() {
        // The timer task will always running even though there is no replicas
        CHECK_NE(kServerStatUpdateTimeSec.count(), 0);

        // TODO(wangdan): _update_server_rdb_stat is server-level, thus it could not be simply
        // cancelled in the destructor of pegasus_server_impl which is replica-level.
        //
        // We should refactor to make _update_server_rdb_stat exit gracefully by
        // `_update_server_rdb_stat->cancel(true)`.
        _update_server_rdb_stat = dsn::tasking::enqueue_timer(
            LPC_REPLICATION_LONG_COMMON,
            nullptr, // TODO: the tracker is nullptr, we will fix it later
            []() { update_server_rocksdb_statistics(); },
            kServerStatUpdateTimeSec);
    });

    // initialize cu calculator and write service after server being initialized.
    _cu_calculator = std::make_unique<capacity_unit_calculator>(
        this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller);
    _server_write = std::make_unique<pegasus_server_write>(this);

    dsn::tasking::enqueue_timer(
        LPC_ANALYZE_HOTKEY,
        &_tracker,
        [this]() { _read_hotkey_collector->analyse_data(); },
        std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));

    dsn::tasking::enqueue_timer(
        LPC_ANALYZE_HOTKEY,
        &_tracker,
        [this]() { _write_hotkey_collector->analyse_data(); },
        std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));

    return dsn::ERR_OK;
}