in src/replica/replica_stub.cpp [755:934]
void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/)
{
_primary_host_port = dsn_primary_host_port();
_primary_host_port_cache = _primary_host_port.to_string();
LOG_INFO("primary_host_port = {}", _primary_host_port_cache);
set_options(opts);
LOG_INFO("meta_servers = {}", fmt::join(_options.meta_servers, ", "));
_deny_client = FLAGS_deny_client_on_start;
_verbose_client_log = FLAGS_verbose_client_log_on_start;
_verbose_commit_log = FLAGS_verbose_commit_log_on_start;
_release_tcmalloc_memory = FLAGS_mem_release_enabled;
_mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;
// clear dirs if need
if (clear) {
CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir),
"Fail to remove {}.",
_options.slog_dir);
for (auto &dir : _options.data_dirs) {
CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir);
}
}
const auto &kms_path =
utils::filesystem::path_combine(_options.data_dirs[0], kms_info::kKmsInfo);
// FLAGS_data_dirs may be empty when load configuration, use LOG_FATAL instead of group
// validator.
if (!FLAGS_encrypt_data_at_rest && utils::filesystem::path_exists(kms_path)) {
LOG_FATAL("The kms_info file exists at ({}), but [pegasus.server] "
"encrypt_data_at_rest is enbale."
"Encryption in Pegasus is irreversible after its initial activation.",
kms_path);
}
dsn::replication::kms_info kms_info;
if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) {
_key_provider.reset(new dsn::security::kms_key_provider(
::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()),
FLAGS_encryption_cluster_key_name));
const auto &ec = dsn::utils::load_rjobj_from_file(
kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info);
if (ec != dsn::ERR_PATH_NOT_FOUND && ec != dsn::ERR_OK) {
CHECK_EQ_MSG(dsn::ERR_OK, ec, "Can't load kms key from kms-info file");
}
// Upon the first launch, the encryption key should be empty. The process will then retrieve
// EEK, IV, and KV from KMS.
// After the first launch, the encryption key, obtained from the kms-info file, should not
// be empty. The process will then acquire the DEK from KMS.
if (ec == dsn::ERR_PATH_NOT_FOUND) {
LOG_WARNING("It's normal to encounter a temporary inability to open the kms-info file "
"during the first process launch.");
CHECK_OK(_key_provider->GenerateEncryptionKey(&kms_info),
"Generate encryption key from kms failed");
}
CHECK_OK(_key_provider->DecryptEncryptionKey(kms_info, &_server_key),
"Get decryption key failed from {}",
kms_path);
FLAGS_server_key = _server_key.c_str();
}
// Initialize the file system manager.
_fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);
if (_key_provider && !utils::filesystem::path_exists(kms_path)) {
const auto &err = dsn::utils::dump_rjobj_to_file(
kms_info, dsn::utils::FileDataType::kNonSensitive, kms_path);
CHECK_EQ_MSG(dsn::ERR_OK, err, "Can't store kms key to kms-info file");
}
// Check slog is not exist.
auto full_slog_path = fmt::format("{}/replica/slog/", _options.slog_dir);
if (utils::filesystem::directory_exists(full_slog_path)) {
std::vector<std::string> slog_files;
CHECK(utils::filesystem::get_subfiles(full_slog_path, slog_files, false),
"check slog files failed");
CHECK(slog_files.empty(),
"slog({}) files are not empty. Make sure you are upgrading from 2.5.0",
full_slog_path);
}
// Start to load replicas in available data directories.
LOG_INFO("start to load replicas");
replica_map_by_gpid reps;
load_replicas(reps);
LOG_INFO("load replicas succeed, replica_count = {}", reps.size());
bool is_log_complete = true;
for (auto it = reps.begin(); it != reps.end(); ++it) {
CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed");
it->second->reset_prepare_list_after_replay();
decree pmax = invalid_decree;
decree pmax_commit = invalid_decree;
if (it->second->private_log()) {
pmax = it->second->private_log()->max_decree(it->first);
pmax_commit = it->second->private_log()->max_commit_on_disk();
}
LOG_INFO(
"{}: load replica done, durable = {}, committed = {}, "
"prepared = {}, ballot = {}, "
"valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}",
it->second->name(),
it->second->last_durable_decree(),
it->second->last_committed_decree(),
it->second->max_prepared_decree(),
it->second->get_ballot(),
it->second->get_app()->init_info().init_offset_in_private_log,
pmax,
pmax_commit);
}
// we will mark all replicas inactive not transient unless all logs are complete
if (!is_log_complete) {
LOG_ERROR("logs are not complete for some replicas, which means that shared log is "
"truncated, mark all replicas as inactive");
for (auto &[_, rep] : reps) {
rep->set_inactive_state_transient(false);
}
}
// replicas stat
if (!FLAGS_replicas_stat_disabled) {
_replicas_stat_timer_task = tasking::enqueue_timer(
LPC_REPLICAS_STAT,
&_tracker,
[this] { on_replicas_stat(); },
std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms),
0,
std::chrono::milliseconds(rand::next_u32(0, FLAGS_replicas_stat_interval_ms)));
}
// disk stat
if (!FLAGS_disk_stat_disabled) {
_disk_stat_timer_task = ::dsn::tasking::enqueue_timer(
LPC_DISK_STAT,
&_tracker,
[this]() { on_disk_stat(); },
std::chrono::seconds(FLAGS_disk_stat_interval_seconds),
0,
std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
}
// Attach `reps`.
_replicas = std::move(reps);
METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
for (const auto &[pid, rep] : _replicas) {
_fs_manager.add_replica(pid, rep->dir());
}
_nfs = dsn::nfs_node::create();
_nfs->start();
dist::cmd::register_remote_command_rpc();
if (FLAGS_delay_for_fd_timeout_on_start) {
uint64_t now_time_ms = dsn_now_ms();
uint64_t delay_time_ms =
(FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds
if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) {
uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms;
LOG_INFO("delay for {} ms to make failure detector timeout", delay);
tasking::enqueue(
LPC_REPLICA_SERVER_DELAY_START,
&_tracker,
[this]() { this->initialize_start(); },
0,
std::chrono::milliseconds(delay));
} else {
initialize_start();
}
} else {
initialize_start();
}
}