void replica_stub::initialize()

in src/replica/replica_stub.cpp [575:846]


void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/)
{
    _primary_address = dsn_primary_address();
    strcpy(_primary_address_str, _primary_address.to_string());
    LOG_INFO("primary_address = {}", _primary_address_str);

    set_options(opts);
    std::ostringstream oss;
    for (int i = 0; i < _options.meta_servers.size(); ++i) {
        if (i != 0)
            oss << ",";
        oss << _options.meta_servers[i].to_string();
    }
    LOG_INFO("meta_servers = {}", oss.str());

    _deny_client = FLAGS_deny_client_on_start;
    _verbose_client_log = FLAGS_verbose_client_log_on_start;
    _verbose_commit_log = FLAGS_verbose_commit_log_on_start;
    _release_tcmalloc_memory = FLAGS_mem_release_enabled;
    _mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;
    _max_concurrent_bulk_load_downloading_count =
        _options.max_concurrent_bulk_load_downloading_count;

    // clear dirs if need
    if (clear) {
        CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir),
              "Fail to remove {}.",
              _options.slog_dir);
        for (auto &dir : _options.data_dirs) {
            CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir);
        }
    }

    // Initialize the file system manager.
    _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);

    // TODO(yingchun): remove the slog related code.
    // Create slog directory if it does not exist.
    std::string cdir;
    std::string err_msg;
    CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg);
    _options.slog_dir = cdir;

    // Initialize slog.
    _log = new mutation_log_shared(_options.slog_dir,
                                   FLAGS_log_shared_file_size_mb,
                                   FLAGS_log_shared_force_flush,
                                   &_counter_shared_log_recent_write_size);
    LOG_INFO("slog_dir = {}", _options.slog_dir);

    // Start to load replicas in available data directories.
    LOG_INFO("start to load replicas");
    std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
    for (const auto &dn : _fs_manager.get_dir_nodes()) {
        // Skip IO error dir_node.
        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
            continue;
        }
        std::vector<std::string> sub_directories;
        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false),
              "fail to get sub_directories in {}",
              dn->full_dir);
        dirs_by_dn.emplace(dn.get(), sub_directories);
    }

    replicas rps;
    utils::ex_lock rps_lock;
    std::deque<task_ptr> load_tasks;
    uint64_t start_time = dsn_now_ms();
    for (const auto &dn_dirs : dirs_by_dn) {
        const auto dn = dn_dirs.first;
        for (const auto &dir : dn_dirs.second) {
            if (dsn::replication::is_data_dir_invalid(dir)) {
                LOG_WARNING("ignore dir {}", dir);
                continue;
            }

            load_tasks.push_back(
                tasking::create_task(LPC_REPLICATION_INIT_LOAD,
                                     &_tracker,
                                     [this, dn, dir, &rps, &rps_lock] {
                                         LOG_INFO("process dir {}", dir);

                                         auto r = load_replica(dn, dir.c_str());
                                         if (r == nullptr) {
                                             return;
                                         }
                                         LOG_INFO("{}@{}: load replica '{}' success, <durable, "
                                                  "commit> = <{}, {}>, last_prepared_decree = {}",
                                                  r->get_gpid(),
                                                  dsn_primary_address(),
                                                  dir,
                                                  r->last_durable_decree(),
                                                  r->last_committed_decree(),
                                                  r->last_prepared_decree());

                                         utils::auto_lock<utils::ex_lock> l(rps_lock);
                                         CHECK(rps.find(r->get_gpid()) == rps.end(),
                                               "conflict replica dir: {} <--> {}",
                                               r->dir(),
                                               rps[r->get_gpid()]->dir());

                                         rps[r->get_gpid()] = r;
                                     },
                                     load_tasks.size()));
            load_tasks.back()->enqueue();
        }
    }
    for (auto &tsk : load_tasks) {
        tsk->wait();
    }
    uint64_t finish_time = dsn_now_ms();

    dirs_by_dn.clear();
    load_tasks.clear();
    LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
             rps.size(),
             finish_time - start_time);

    // init shared prepare log
    LOG_INFO("start to replay shared log");

    std::map<gpid, decree> replay_condition;
    for (auto it = rps.begin(); it != rps.end(); ++it) {
        replay_condition[it->first] = it->second->last_committed_decree();
    }

    start_time = dsn_now_ms();
    error_code err = _log->open(
        [&rps](int log_length, mutation_ptr &mu) {
            auto it = rps.find(mu->data.header.pid);
            if (it != rps.end()) {
                return it->second->replay_mutation(mu, false);
            } else {
                return false;
            }
        },
        [this](error_code err) { this->handle_log_failure(err); },
        replay_condition);
    finish_time = dsn_now_ms();

    if (err == ERR_OK) {
        LOG_INFO("replay shared log succeed, time_used = {} ms", finish_time - start_time);
    } else {
        if (FLAGS_crash_on_slog_error) {
            LOG_FATAL("replay shared log failed, err = {}, please check the error details", err);
        }
        LOG_ERROR("replay shared log failed, err = {}, time_used = {} ms, clear all logs ...",
                  err,
                  finish_time - start_time);

        // we must delete or update meta server the error for all replicas
        // before we fix the logs
        // otherwise, the next process restart may consider the replicas'
        // state complete

        // delete all replicas
        // TODO: checkpoint latest state and update on meta server so learning is cheaper
        for (auto it = rps.begin(); it != rps.end(); ++it) {
            it->second->close();
            move_to_err_path(it->second->dir(), "initialize replica");
            _counter_replicas_recent_replica_move_error_count->increment();
        }
        rps.clear();

        // restart log service
        _log->close();
        _log = nullptr;
        CHECK(utils::filesystem::remove_path(_options.slog_dir),
              "remove directory {} failed",
              _options.slog_dir);
        _log = new mutation_log_shared(_options.slog_dir,
                                       FLAGS_log_shared_file_size_mb,
                                       FLAGS_log_shared_force_flush,
                                       &_counter_shared_log_recent_write_size);
        CHECK_EQ_MSG(_log->open(nullptr, [this](error_code err) { this->handle_log_failure(err); }),
                     ERR_OK,
                     "restart log service failed");
    }

    bool is_log_complete = true;
    for (auto it = rps.begin(); it != rps.end(); ++it) {
        CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed");

        it->second->reset_prepare_list_after_replay();

        decree pmax = invalid_decree;
        decree pmax_commit = invalid_decree;
        if (it->second->private_log()) {
            pmax = it->second->private_log()->max_decree(it->first);
            pmax_commit = it->second->private_log()->max_commit_on_disk();
        }

        LOG_INFO(
            "{}: load replica done, err = {}, durable = {}, committed = {}, "
            "prepared = {}, ballot = {}, "
            "valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}, "
            "valid_offset_in_slog = {}",
            it->second->name(),
            err.to_string(),
            it->second->last_durable_decree(),
            it->second->last_committed_decree(),
            it->second->max_prepared_decree(),
            it->second->get_ballot(),
            it->second->get_app()->init_info().init_offset_in_private_log,
            pmax,
            pmax_commit,
            it->second->get_app()->init_info().init_offset_in_shared_log);
    }

    // we will mark all replicas inactive not transient unless all logs are complete
    if (!is_log_complete) {
        LOG_ERROR("logs are not complete for some replicas, which means that shared log is "
                  "truncated, mark all replicas as inactive");
        for (auto it = rps.begin(); it != rps.end(); ++it) {
            it->second->set_inactive_state_transient(false);
        }
    }

    // gc
    if (!FLAGS_gc_disabled) {
        _gc_timer_task = tasking::enqueue_timer(
            LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
            &_tracker,
            [this] { on_gc(); },
            std::chrono::milliseconds(FLAGS_gc_interval_ms),
            0,
            std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms)));
    }

    // disk stat
    if (!FLAGS_disk_stat_disabled) {
        _disk_stat_timer_task =
            ::dsn::tasking::enqueue_timer(LPC_DISK_STAT,
                                          &_tracker,
                                          [this]() { on_disk_stat(); },
                                          std::chrono::seconds(FLAGS_disk_stat_interval_seconds),
                                          0,
                                          std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
    }

    // attach rps
    _replicas = std::move(rps);
    _counter_replicas_count->add((uint64_t)_replicas.size());
    for (const auto &kv : _replicas) {
        _fs_manager.add_replica(kv.first, kv.second->dir());
    }

    _nfs = dsn::nfs_node::create();
    _nfs->start();

    dist::cmd::register_remote_command_rpc();

    if (FLAGS_delay_for_fd_timeout_on_start) {
        uint64_t now_time_ms = dsn_now_ms();
        uint64_t delay_time_ms =
            (FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds
        if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) {
            uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms;
            LOG_INFO("delay for {} ms to make failure detector timeout", delay);
            tasking::enqueue(LPC_REPLICA_SERVER_DELAY_START,
                             &_tracker,
                             [this]() { this->initialize_start(); },
                             0,
                             std::chrono::milliseconds(delay));
        } else {
            initialize_start();
        }
    } else {
        initialize_start();
    }
}