in src/server/pegasus_server_impl.cpp [1509:1806]
dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
CHECK_PREFIX_MSG(!_is_open, "replica is already opened");
LOG_INFO_PREFIX("start to open app {}", data_dir());
// parse envs for parameters
// envs is compounded in replication_app_base::open() function
std::map<std::string, std::string> envs;
if (argc > 0) {
if ((argc - 1) % 2 != 0) {
LOG_ERROR_PREFIX("parse envs failed, invalid argc = {}", argc);
return dsn::ERR_INVALID_PARAMETERS;
}
if (argv == nullptr) {
LOG_ERROR_PREFIX("parse envs failed, invalid argv = nullptr");
return dsn::ERR_INVALID_PARAMETERS;
}
int idx = 1;
while (idx < argc) {
const char *key = argv[idx++];
const char *value = argv[idx++];
envs.emplace(key, value);
}
}
// Update all envs before opening db, ensure all envs are effective for the newly opened db.
update_app_envs_before_open_db(envs);
// TODO(yingchun): refactor the following code
//
// here, we must distinguish three cases, such as:
// case 1: we open the db that already exist
// case 2: we load duplication data base checkpoint from master
// case 3: we open a new db
// case 4: we restore the db base on old data
//
// if we want to restore the db base on old data, only all of the restore preconditions are
// satisfied
// restore preconditions:
// 1, rdb isn't exist
// 2, we can parse restore info from app env, which is stored in argv
// 3, restore_dir is exist
//
bool db_exist = true;
auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), "rdb");
auto duplication_path = duplication_dir();
if (dsn::utils::filesystem::path_exists(rdb_path)) {
// only case 1
LOG_INFO_PREFIX("rdb is already exist, path = {}", rdb_path);
} else {
// case 2
if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) {
if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) {
LOG_ERROR_PREFIX(
"load duplication data from {} to {} failed", duplication_path, rdb_path);
return dsn::ERR_FILE_OPERATION_FAILED;
}
} else {
std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
const std::string &restore_dir = restore_info.first;
bool force_restore = restore_info.second;
if (restore_dir.empty()) {
// case 3
if (force_restore) {
LOG_ERROR_PREFIX("try to restore, but we can't combine restore_dir from envs");
return dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
LOG_DEBUG_PREFIX("open a new db, path = {}", rdb_path);
}
} else {
// case 4
LOG_INFO_PREFIX("try to restore from restore_dir = {}", restore_dir);
if (dsn::utils::filesystem::directory_exists(restore_dir)) {
// here, we just rename restore_dir to rdb, then continue the normal process
if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) {
LOG_INFO_PREFIX(
"rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path);
} else {
LOG_ERROR_PREFIX(
"rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path);
return dsn::ERR_FILE_OPERATION_FAILED;
}
} else {
if (force_restore) {
LOG_ERROR_PREFIX(
"try to restore, but restore_dir isn't exist, restore_dir = {}",
restore_dir);
return dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
LOG_WARNING_PREFIX(
"try to restore and restore_dir({}) isn't exist, but we don't force "
"it, the role of this replica must not primary, so we open a new db on "
"the "
"path({})",
restore_dir,
rdb_path);
}
}
}
}
}
LOG_INFO_PREFIX("start to open rocksDB's rdb({})", rdb_path);
// Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
_table_data_cf_opts = _data_cf_opts;
_table_data_cf_opts_recalculated = false;
bool has_incompatible_db_options = false;
if (db_exist) {
// When DB exists, meta CF and data CF must be present.
bool missing_meta_cf = true;
bool missing_data_cf = true;
auto ec = check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf);
if (ec != dsn::ERR_OK) {
LOG_ERROR_PREFIX("check column families failed");
return ec;
}
CHECK_PREFIX_MSG(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
CHECK_PREFIX_MSG(!missing_data_cf, "Missing data column family");
// Load latest options from option file stored in the db directory.
rocksdb::DBOptions loaded_db_opt;
std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
// Set `ignore_unknown_options` true for forward compatibility.
auto status = rocksdb::LoadLatestOptions(rdb_path,
rocksdb::Env::Default(),
&loaded_db_opt,
&loaded_cf_descs,
/*ignore_unknown_options=*/true);
if (!status.ok()) {
// Here we ignore an invalid argument error related to `pegasus_data_version` and
// `pegasus_data` options, which were used in old version rocksdbs (before 2.1.0).
if (status.code() != rocksdb::Status::kInvalidArgument ||
status.ToString().find("pegasus_data") == std::string::npos) {
LOG_ERROR_PREFIX("load latest option file failed: {}.", status.ToString());
return dsn::ERR_LOCAL_APP_FAILURE;
}
has_incompatible_db_options = true;
LOG_WARNING_PREFIX(
"The latest option file has incompatible db options: {}, use default "
"options to open db.",
status.ToString());
}
if (!has_incompatible_db_options) {
for (int i = 0; i < loaded_cf_descs.size(); ++i) {
if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
loaded_data_cf_opts = loaded_cf_descs[i].options;
}
}
// Reset usage scenario related options according to loaded_data_cf_opts.
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
_db_opts.create_missing_column_families = true;
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
rocksdb::Env::Default(),
_db_opts,
column_families,
/*ignore_unknown_options=*/true);
if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
LOG_ERROR_PREFIX("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
return dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db);
if (!status.ok()) {
LOG_ERROR_PREFIX("rocksdb::DB::Open failed, error = {}", status.ToString());
return dsn::ERR_LOCAL_APP_FAILURE;
}
CHECK_EQ_PREFIX(2, handles_opened.size());
CHECK_EQ_PREFIX(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];
// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);
if (db_exist) {
auto cleanup = dsn::defer([this]() { release_db(); });
uint64_t decree = 0;
LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
_meta_store->get_last_flushed_decree(&decree),
"get_last_flushed_decree failed");
_last_committed_decree.store(static_cast<int64_t>(decree));
LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
_meta_store->get_data_version(&_pegasus_data_version),
"get_data_version failed");
_usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time = 0;
LOG_AND_RETURN_NOT_OK(
ERROR_PREFIX,
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
"get_last_manual_compact_finish_time failed");
LOG_AND_RETURN_NOT_TRUE(ERROR_PREFIX,
_pegasus_data_version <= PEGASUS_DATA_VERSION_MAX,
dsn::ERR_LOCAL_APP_FAILURE,
"open app failed, unsupported data version {}",
_pegasus_data_version);
// update last manual compact finish timestamp
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
cleanup.cancel();
} else {
// Write initial meta data to meta CF and flush when create new DB.
_meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
_meta_store->set_last_flushed_decree(0);
_meta_store->set_last_manual_compact_finish_time(0);
flush_all_family_columns(true);
}
// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
_key_ttl_compaction_filter_factory->EnableFilter();
parse_checkpoints();
// checkpoint if necessary to make last_durable_decree() fresh.
// only need async checkpoint because we sure that memtable is empty now.
int64_t last_flushed = static_cast<int64_t>(_last_committed_decree);
if (last_flushed != last_durable_decree()) {
LOG_INFO_PREFIX(
"start to do async checkpoint, last_durable_decree = {}, last_flushed_decree = {}",
last_durable_decree(),
last_flushed);
auto err = async_checkpoint(false);
if (err != dsn::ERR_OK) {
LOG_ERROR_PREFIX("create checkpoint failed, error = {}", err.to_string());
release_db();
return err;
}
CHECK_EQ_PREFIX(last_flushed, last_durable_decree());
}
LOG_INFO_PREFIX("open app succeed, pegasus_data_version = {}, last_durable_decree = {}",
_pegasus_data_version,
last_durable_decree());
_is_open = true;
if (!db_exist) {
// When create a new db, update usage scenario according to app envs.
update_usage_scenario(envs);
}
LOG_DEBUG_PREFIX("start the update replica-level rocksdb statistics timer task");
_update_replica_rdb_stat =
dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
&_tracker,
[this]() { this->update_replica_rocksdb_statistics(); },
std::chrono::seconds(FLAGS_update_rdb_stat_interval));
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
CHECK_NE(kServerStatUpdateTimeSec.count(), 0);
_update_server_rdb_stat = dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[]() { update_server_rocksdb_statistics(); },
kServerStatUpdateTimeSec);
});
// initialize cu calculator and write service after server being initialized.
_cu_calculator = std::make_unique<capacity_unit_calculator>(
this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller);
_server_write = std::make_unique<pegasus_server_write>(this);
dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
&_tracker,
[this]() { _read_hotkey_collector->analyse_data(); },
std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
&_tracker,
[this]() { _write_hotkey_collector->analyse_data(); },
std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
return dsn::ERR_OK;
}