in src/replica/replica_stub.cpp [575:846]
void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/)
{
_primary_address = dsn_primary_address();
strcpy(_primary_address_str, _primary_address.to_string());
LOG_INFO("primary_address = {}", _primary_address_str);
set_options(opts);
std::ostringstream oss;
for (int i = 0; i < _options.meta_servers.size(); ++i) {
if (i != 0)
oss << ",";
oss << _options.meta_servers[i].to_string();
}
LOG_INFO("meta_servers = {}", oss.str());
_deny_client = FLAGS_deny_client_on_start;
_verbose_client_log = FLAGS_verbose_client_log_on_start;
_verbose_commit_log = FLAGS_verbose_commit_log_on_start;
_release_tcmalloc_memory = FLAGS_mem_release_enabled;
_mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;
_max_concurrent_bulk_load_downloading_count =
_options.max_concurrent_bulk_load_downloading_count;
// clear dirs if need
if (clear) {
CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir),
"Fail to remove {}.",
_options.slog_dir);
for (auto &dir : _options.data_dirs) {
CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir);
}
}
// Initialize the file system manager.
_fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);
// TODO(yingchun): remove the slog related code.
// Create slog directory if it does not exist.
std::string cdir;
std::string err_msg;
CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg);
_options.slog_dir = cdir;
// Initialize slog.
_log = new mutation_log_shared(_options.slog_dir,
FLAGS_log_shared_file_size_mb,
FLAGS_log_shared_force_flush,
&_counter_shared_log_recent_write_size);
LOG_INFO("slog_dir = {}", _options.slog_dir);
// Start to load replicas in available data directories.
LOG_INFO("start to load replicas");
std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
for (const auto &dn : _fs_manager.get_dir_nodes()) {
// Skip IO error dir_node.
if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
continue;
}
std::vector<std::string> sub_directories;
CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false),
"fail to get sub_directories in {}",
dn->full_dir);
dirs_by_dn.emplace(dn.get(), sub_directories);
}
replicas rps;
utils::ex_lock rps_lock;
std::deque<task_ptr> load_tasks;
uint64_t start_time = dsn_now_ms();
for (const auto &dn_dirs : dirs_by_dn) {
const auto dn = dn_dirs.first;
for (const auto &dir : dn_dirs.second) {
if (dsn::replication::is_data_dir_invalid(dir)) {
LOG_WARNING("ignore dir {}", dir);
continue;
}
load_tasks.push_back(
tasking::create_task(LPC_REPLICATION_INIT_LOAD,
&_tracker,
[this, dn, dir, &rps, &rps_lock] {
LOG_INFO("process dir {}", dir);
auto r = load_replica(dn, dir.c_str());
if (r == nullptr) {
return;
}
LOG_INFO("{}@{}: load replica '{}' success, <durable, "
"commit> = <{}, {}>, last_prepared_decree = {}",
r->get_gpid(),
dsn_primary_address(),
dir,
r->last_durable_decree(),
r->last_committed_decree(),
r->last_prepared_decree());
utils::auto_lock<utils::ex_lock> l(rps_lock);
CHECK(rps.find(r->get_gpid()) == rps.end(),
"conflict replica dir: {} <--> {}",
r->dir(),
rps[r->get_gpid()]->dir());
rps[r->get_gpid()] = r;
},
load_tasks.size()));
load_tasks.back()->enqueue();
}
}
for (auto &tsk : load_tasks) {
tsk->wait();
}
uint64_t finish_time = dsn_now_ms();
dirs_by_dn.clear();
load_tasks.clear();
LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
rps.size(),
finish_time - start_time);
// init shared prepare log
LOG_INFO("start to replay shared log");
std::map<gpid, decree> replay_condition;
for (auto it = rps.begin(); it != rps.end(); ++it) {
replay_condition[it->first] = it->second->last_committed_decree();
}
start_time = dsn_now_ms();
error_code err = _log->open(
[&rps](int log_length, mutation_ptr &mu) {
auto it = rps.find(mu->data.header.pid);
if (it != rps.end()) {
return it->second->replay_mutation(mu, false);
} else {
return false;
}
},
[this](error_code err) { this->handle_log_failure(err); },
replay_condition);
finish_time = dsn_now_ms();
if (err == ERR_OK) {
LOG_INFO("replay shared log succeed, time_used = {} ms", finish_time - start_time);
} else {
if (FLAGS_crash_on_slog_error) {
LOG_FATAL("replay shared log failed, err = {}, please check the error details", err);
}
LOG_ERROR("replay shared log failed, err = {}, time_used = {} ms, clear all logs ...",
err,
finish_time - start_time);
// we must delete or update meta server the error for all replicas
// before we fix the logs
// otherwise, the next process restart may consider the replicas'
// state complete
// delete all replicas
// TODO: checkpoint latest state and update on meta server so learning is cheaper
for (auto it = rps.begin(); it != rps.end(); ++it) {
it->second->close();
move_to_err_path(it->second->dir(), "initialize replica");
_counter_replicas_recent_replica_move_error_count->increment();
}
rps.clear();
// restart log service
_log->close();
_log = nullptr;
CHECK(utils::filesystem::remove_path(_options.slog_dir),
"remove directory {} failed",
_options.slog_dir);
_log = new mutation_log_shared(_options.slog_dir,
FLAGS_log_shared_file_size_mb,
FLAGS_log_shared_force_flush,
&_counter_shared_log_recent_write_size);
CHECK_EQ_MSG(_log->open(nullptr, [this](error_code err) { this->handle_log_failure(err); }),
ERR_OK,
"restart log service failed");
}
bool is_log_complete = true;
for (auto it = rps.begin(); it != rps.end(); ++it) {
CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed");
it->second->reset_prepare_list_after_replay();
decree pmax = invalid_decree;
decree pmax_commit = invalid_decree;
if (it->second->private_log()) {
pmax = it->second->private_log()->max_decree(it->first);
pmax_commit = it->second->private_log()->max_commit_on_disk();
}
LOG_INFO(
"{}: load replica done, err = {}, durable = {}, committed = {}, "
"prepared = {}, ballot = {}, "
"valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}, "
"valid_offset_in_slog = {}",
it->second->name(),
err.to_string(),
it->second->last_durable_decree(),
it->second->last_committed_decree(),
it->second->max_prepared_decree(),
it->second->get_ballot(),
it->second->get_app()->init_info().init_offset_in_private_log,
pmax,
pmax_commit,
it->second->get_app()->init_info().init_offset_in_shared_log);
}
// we will mark all replicas inactive not transient unless all logs are complete
if (!is_log_complete) {
LOG_ERROR("logs are not complete for some replicas, which means that shared log is "
"truncated, mark all replicas as inactive");
for (auto it = rps.begin(); it != rps.end(); ++it) {
it->second->set_inactive_state_transient(false);
}
}
// gc
if (!FLAGS_gc_disabled) {
_gc_timer_task = tasking::enqueue_timer(
LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this] { on_gc(); },
std::chrono::milliseconds(FLAGS_gc_interval_ms),
0,
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms)));
}
// disk stat
if (!FLAGS_disk_stat_disabled) {
_disk_stat_timer_task =
::dsn::tasking::enqueue_timer(LPC_DISK_STAT,
&_tracker,
[this]() { on_disk_stat(); },
std::chrono::seconds(FLAGS_disk_stat_interval_seconds),
0,
std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
}
// attach rps
_replicas = std::move(rps);
_counter_replicas_count->add((uint64_t)_replicas.size());
for (const auto &kv : _replicas) {
_fs_manager.add_replica(kv.first, kv.second->dir());
}
_nfs = dsn::nfs_node::create();
_nfs->start();
dist::cmd::register_remote_command_rpc();
if (FLAGS_delay_for_fd_timeout_on_start) {
uint64_t now_time_ms = dsn_now_ms();
uint64_t delay_time_ms =
(FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds
if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) {
uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms;
LOG_INFO("delay for {} ms to make failure detector timeout", delay);
tasking::enqueue(LPC_REPLICA_SERVER_DELAY_START,
&_tracker,
[this]() { this->initialize_start(); },
0,
std::chrono::milliseconds(delay));
} else {
initialize_start();
}
} else {
initialize_start();
}
}