void replica_stub::initialize()

in src/replica/replica_stub.cpp [755:934]


void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/)
{
    _primary_host_port = dsn_primary_host_port();
    _primary_host_port_cache = _primary_host_port.to_string();
    LOG_INFO("primary_host_port = {}", _primary_host_port_cache);

    set_options(opts);
    LOG_INFO("meta_servers = {}", fmt::join(_options.meta_servers, ", "));

    _deny_client = FLAGS_deny_client_on_start;
    _verbose_client_log = FLAGS_verbose_client_log_on_start;
    _verbose_commit_log = FLAGS_verbose_commit_log_on_start;
    _release_tcmalloc_memory = FLAGS_mem_release_enabled;
    _mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;

    // clear dirs if need
    if (clear) {
        CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir),
              "Fail to remove {}.",
              _options.slog_dir);
        for (auto &dir : _options.data_dirs) {
            CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir);
        }
    }

    const auto &kms_path =
        utils::filesystem::path_combine(_options.data_dirs[0], kms_info::kKmsInfo);
    // FLAGS_data_dirs may be empty when load configuration, use LOG_FATAL instead of group
    // validator.
    if (!FLAGS_encrypt_data_at_rest && utils::filesystem::path_exists(kms_path)) {
        LOG_FATAL("The kms_info file exists at ({}), but [pegasus.server] "
                  "encrypt_data_at_rest is enbale."
                  "Encryption in Pegasus is irreversible after its initial activation.",
                  kms_path);
    }

    dsn::replication::kms_info kms_info;
    if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) {
        _key_provider.reset(new dsn::security::kms_key_provider(
            ::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()),
            FLAGS_encryption_cluster_key_name));
        const auto &ec = dsn::utils::load_rjobj_from_file(
            kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info);
        if (ec != dsn::ERR_PATH_NOT_FOUND && ec != dsn::ERR_OK) {
            CHECK_EQ_MSG(dsn::ERR_OK, ec, "Can't load kms key from kms-info file");
        }
        // Upon the first launch, the encryption key should be empty. The process will then retrieve
        // EEK, IV, and KV from KMS.
        // After the first launch, the encryption key, obtained from the kms-info file, should not
        // be empty. The process will then acquire the DEK from KMS.
        if (ec == dsn::ERR_PATH_NOT_FOUND) {
            LOG_WARNING("It's normal to encounter a temporary inability to open the kms-info file "
                        "during the first process launch.");
            CHECK_OK(_key_provider->GenerateEncryptionKey(&kms_info),
                     "Generate encryption key from kms failed");
        }
        CHECK_OK(_key_provider->DecryptEncryptionKey(kms_info, &_server_key),
                 "Get decryption key failed from {}",
                 kms_path);
        FLAGS_server_key = _server_key.c_str();
    }

    // Initialize the file system manager.
    _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);

    if (_key_provider && !utils::filesystem::path_exists(kms_path)) {
        const auto &err = dsn::utils::dump_rjobj_to_file(
            kms_info, dsn::utils::FileDataType::kNonSensitive, kms_path);
        CHECK_EQ_MSG(dsn::ERR_OK, err, "Can't store kms key to kms-info file");
    }

    // Check slog is not exist.
    auto full_slog_path = fmt::format("{}/replica/slog/", _options.slog_dir);
    if (utils::filesystem::directory_exists(full_slog_path)) {
        std::vector<std::string> slog_files;
        CHECK(utils::filesystem::get_subfiles(full_slog_path, slog_files, false),
              "check slog files failed");
        CHECK(slog_files.empty(),
              "slog({}) files are not empty. Make sure you are upgrading from 2.5.0",
              full_slog_path);
    }

    // Start to load replicas in available data directories.
    LOG_INFO("start to load replicas");

    replica_map_by_gpid reps;
    load_replicas(reps);

    LOG_INFO("load replicas succeed, replica_count = {}", reps.size());

    bool is_log_complete = true;
    for (auto it = reps.begin(); it != reps.end(); ++it) {
        CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed");

        it->second->reset_prepare_list_after_replay();

        decree pmax = invalid_decree;
        decree pmax_commit = invalid_decree;
        if (it->second->private_log()) {
            pmax = it->second->private_log()->max_decree(it->first);
            pmax_commit = it->second->private_log()->max_commit_on_disk();
        }

        LOG_INFO(
            "{}: load replica done, durable = {}, committed = {}, "
            "prepared = {}, ballot = {}, "
            "valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}",
            it->second->name(),
            it->second->last_durable_decree(),
            it->second->last_committed_decree(),
            it->second->max_prepared_decree(),
            it->second->get_ballot(),
            it->second->get_app()->init_info().init_offset_in_private_log,
            pmax,
            pmax_commit);
    }

    // we will mark all replicas inactive not transient unless all logs are complete
    if (!is_log_complete) {
        LOG_ERROR("logs are not complete for some replicas, which means that shared log is "
                  "truncated, mark all replicas as inactive");
        for (auto &[_, rep] : reps) {
            rep->set_inactive_state_transient(false);
        }
    }

    // replicas stat
    if (!FLAGS_replicas_stat_disabled) {
        _replicas_stat_timer_task = tasking::enqueue_timer(
            LPC_REPLICAS_STAT,
            &_tracker,
            [this] { on_replicas_stat(); },
            std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms),
            0,
            std::chrono::milliseconds(rand::next_u32(0, FLAGS_replicas_stat_interval_ms)));
    }

    // disk stat
    if (!FLAGS_disk_stat_disabled) {
        _disk_stat_timer_task = ::dsn::tasking::enqueue_timer(
            LPC_DISK_STAT,
            &_tracker,
            [this]() { on_disk_stat(); },
            std::chrono::seconds(FLAGS_disk_stat_interval_seconds),
            0,
            std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
    }

    // Attach `reps`.
    _replicas = std::move(reps);
    METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
    for (const auto &[pid, rep] : _replicas) {
        _fs_manager.add_replica(pid, rep->dir());
    }

    _nfs = dsn::nfs_node::create();
    _nfs->start();

    dist::cmd::register_remote_command_rpc();

    if (FLAGS_delay_for_fd_timeout_on_start) {
        uint64_t now_time_ms = dsn_now_ms();
        uint64_t delay_time_ms =
            (FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds
        if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) {
            uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms;
            LOG_INFO("delay for {} ms to make failure detector timeout", delay);
            tasking::enqueue(
                LPC_REPLICA_SERVER_DELAY_START,
                &_tracker,
                [this]() { this->initialize_start(); },
                0,
                std::chrono::milliseconds(delay));
        } else {
            initialize_start();
        }
    } else {
        initialize_start();
    }
}