src/replica/replica_stub.cpp (2,671 lines of code) (raw):

/* * The MIT License (MIT) * * Copyright (c) 2015 Microsoft Corporation * * -=- Robust Distributed System Nucleus (rDSN) -=- * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include <absl/strings/str_split.h> #include <boost/algorithm/string/replace.hpp> // IWYU pragma: no_include <ext/alloc_traits.h> #include <fmt/core.h> #include <fmt/format.h> #include <nlohmann/json.hpp> #include <rapidjson/ostreamwrapper.h> #include <algorithm> #include <chrono> #include <cstdint> #include <cstdlib> #include <iterator> #include <mutex> #include <queue> #include <set> #include <sstream> #include <string_view> #include <type_traits> #include <vector> #include "backup/replica_backup_server.h" #include "bulk_load/replica_bulk_loader.h" #include "common/backup_common.h" #include "common/duplication_common.h" #include "common/json_helper.h" #include "common/replication.codes.h" #include "common/replication_enums.h" #include "disk_cleaner.h" #include "duplication/duplication_sync_timer.h" #include "meta_admin_types.h" #include "mutation_log.h" #include "nfs/nfs_node.h" #include "nfs_types.h" #include "ranger/access_type.h" #include "replica.h" #include "replica/duplication/replica_follower.h" #include "replica/kms_key_provider.h" #include "replica/replica_context.h" #include "replica/replica_stub.h" #include "replica/replication_app_base.h" #include "replica_disk_migrator.h" #include "rpc/rpc_message.h" #include "rpc/serialization.h" #include "runtime/api_layer1.h" #include "security/access_controller.h" #include "split/replica_split_manager.h" #include "task/async_calls.h" #include "task/task.h" #include "task/task_engine.h" #include "task/task_worker.h" #include "utils/api_utilities.h" #include "utils/command_manager.h" #include "utils/env.h" #include "utils/errors.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "utils/load_dump_object.h" #include "utils/ports.h" #include "utils/process_utils.h" #include "utils/rand.h" #include "utils/string_conv.h" #include "utils/strings.h" #include "utils/synchronize.h" #include "utils/threadpool_spec.h" #include "utils/timer.h" #ifdef DSN_ENABLE_GPERF #include <gperftools/malloc_extension.h> #elif defined(DSN_USE_JEMALLOC) #include "utils/je_ctl.h" #endif #include "nfs/nfs_code_definition.h" #include "remote_cmd/remote_command.h" #include "utils/fail_point.h" namespace { const char *kMaxReplicasOnLoadForEachDiskDesc = "The max number of replicas that are allowed to be loaded simultaneously for each disk dir."; const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete."; const char *kMaxConcurrentBulkLoadDownloadingCountDesc = "The maximum concurrent bulk load downloading replica count."; } // anonymous namespace METRIC_DEFINE_gauge_int64(server, total_replicas, dsn::metric_unit::kReplicas, "The total number of replicas"); METRIC_DEFINE_gauge_int64(server, opening_replicas, dsn::metric_unit::kReplicas, "The number of opening replicas"); METRIC_DEFINE_gauge_int64(server, closing_replicas, dsn::metric_unit::kReplicas, "The number of closing replicas"); METRIC_DEFINE_gauge_int64(server, inactive_replicas, dsn::metric_unit::kReplicas, "The number of inactive replicas"); METRIC_DEFINE_gauge_int64(server, error_replicas, dsn::metric_unit::kReplicas, "The number of replicas with errors"); METRIC_DEFINE_gauge_int64(server, primary_replicas, dsn::metric_unit::kReplicas, "The number of primary replicas"); METRIC_DEFINE_gauge_int64(server, secondary_replicas, dsn::metric_unit::kReplicas, "The number of secondary replicas"); METRIC_DEFINE_gauge_int64(server, learning_replicas, dsn::metric_unit::kReplicas, "The number of learning replicas"); METRIC_DEFINE_gauge_int64(server, learning_replicas_max_duration_ms, dsn::metric_unit::kMilliSeconds, "The max duration among all learning replicas"); METRIC_DEFINE_gauge_int64( server, learning_replicas_max_copy_file_bytes, dsn::metric_unit::kBytes, "The max size of files that are copied from learnee among all learning replicas"); METRIC_DEFINE_counter(server, moved_error_replicas, dsn::metric_unit::kReplicas, "The number of replicas whose dirs are moved as error"); METRIC_DEFINE_counter(server, moved_garbage_replicas, dsn::metric_unit::kReplicas, "The number of replicas whose dirs are moved as garbage"); METRIC_DEFINE_counter(server, replica_removed_dirs, dsn::metric_unit::kDirs, "The number of removed replica dirs"); METRIC_DEFINE_gauge_int64(server, replica_error_dirs, dsn::metric_unit::kDirs, "The number of error replica dirs (*.err)"); METRIC_DEFINE_gauge_int64(server, replica_garbage_dirs, dsn::metric_unit::kDirs, "The number of garbage replica dirs (*.gar)"); METRIC_DEFINE_gauge_int64(server, replica_tmp_dirs, dsn::metric_unit::kDirs, "The number of tmp replica dirs (*.tmp) for disk migration"); METRIC_DEFINE_gauge_int64(server, replica_origin_dirs, dsn::metric_unit::kDirs, "The number of origin replica dirs (*.ori) for disk migration"); #ifdef DSN_ENABLE_GPERF METRIC_DEFINE_counter(server, tcmalloc_released_bytes, dsn::metric_unit::kBytes, "The memory bytes that are released accumulatively by tcmalloc"); #endif METRIC_DEFINE_counter(server, read_failed_requests, dsn::metric_unit::kRequests, "The number of failed read requests"); METRIC_DEFINE_counter(server, write_failed_requests, dsn::metric_unit::kRequests, "The number of failed write requests"); METRIC_DEFINE_counter(server, read_busy_requests, dsn::metric_unit::kRequests, "The number of busy read requests"); METRIC_DEFINE_counter(server, write_busy_requests, dsn::metric_unit::kRequests, "The number of busy write requests"); METRIC_DEFINE_gauge_int64(server, bulk_load_running_count, dsn::metric_unit::kBulkLoads, "The number of current running bulk loads"); METRIC_DEFINE_gauge_int64(server, bulk_load_ingestion_max_duration_ms, dsn::metric_unit::kMilliSeconds, "The max duration of ingestions for bulk loads"); METRIC_DEFINE_gauge_int64(server, bulk_load_max_duration_ms, dsn::metric_unit::kMilliSeconds, "The max duration of bulk loads"); METRIC_DEFINE_gauge_int64(server, splitting_replicas, dsn::metric_unit::kReplicas, "The number of current splitting replicas"); METRIC_DEFINE_gauge_int64(server, splitting_replicas_max_duration_ms, dsn::metric_unit::kMilliSeconds, "The max duration among all splitting replicas"); METRIC_DEFINE_gauge_int64(server, splitting_replicas_async_learn_max_duration_ms, dsn::metric_unit::kMilliSeconds, "The max duration among all splitting replicas for async learns"); METRIC_DEFINE_gauge_int64(server, splitting_replicas_max_copy_file_bytes, dsn::metric_unit::kBytes, "The max size of copied files among all splitting replicas"); DSN_DECLARE_bool(duplication_enabled); DSN_DECLARE_bool(empty_write_disabled); DSN_DECLARE_bool(enable_acl); DSN_DECLARE_bool(encrypt_data_at_rest); DSN_DECLARE_int32(fd_beacon_interval_seconds); DSN_DECLARE_int32(fd_check_interval_seconds); DSN_DECLARE_int32(fd_grace_seconds); DSN_DECLARE_int32(fd_lease_seconds); DSN_DECLARE_string(data_dirs); DSN_DECLARE_string(encryption_cluster_key_name); DSN_DECLARE_string(server_key); DSN_DEFINE_uint64(replication, max_replicas_on_load_for_each_disk, 256, kMaxReplicasOnLoadForEachDiskDesc); DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE); DSN_DEFINE_validator(max_replicas_on_load_for_each_disk, [](uint64_t value) -> bool { return value > 0; }); DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, kLoadReplicaMaxWaitTimeMsDesc); DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE); DSN_DEFINE_validator(load_replica_max_wait_time_ms, [](uint64_t value) -> bool { return value > 0; }); DSN_DEFINE_int32(replication, max_concurrent_bulk_load_downloading_count, 5, kMaxConcurrentBulkLoadDownloadingCountDesc); DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count, [](int32_t value) -> bool { return value >= 0; }); DSN_DEFINE_bool(replication, deny_client_on_start, false, "Whether to deny client read and write " "requests. The 'on_start' in the name is " "meaningless, this config takes effect " "all the time"); DSN_DEFINE_bool(replication, verbose_client_log_on_start, false, "whether to print verbose error log when reply to client read and write requests " "when starting the server"); DSN_DEFINE_bool(replication, mem_release_enabled, true, "whether to enable periodic memory release"); DSN_DEFINE_bool(replication, disk_stat_disabled, false, "whether to disable disk stat"); DSN_DEFINE_bool( replication, delay_for_fd_timeout_on_start, false, "Whether to delay for a period of time to make failure detector timeout when " "starting the server. The delayed time is depends on [replication]fd_grace_seconds"); DSN_DEFINE_bool(replication, config_sync_disabled, false, "Whether to disable replica server to send replica config-sync " "requests to meta server periodically"); DSN_DEFINE_bool(replication, fd_disabled, false, "Whether to disable failure detection"); DSN_DEFINE_bool(replication, verbose_commit_log_on_start, false, "whether to print verbose log when commit mutation when starting the server"); DSN_DEFINE_uint32(replication, max_concurrent_manual_emergency_checkpointing_count, 10, "max concurrent manual emergency checkpoint running count"); DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE); DSN_DEFINE_uint32(replication, config_sync_interval_ms, 30000, "The interval milliseconds of " "replica server to send replica " "config-sync requests to meta " "server"); DSN_TAG_VARIABLE(config_sync_interval_ms, FT_MUTABLE); DSN_DEFINE_validator(config_sync_interval_ms, [](uint32_t value) -> bool { return value > 0; }); DSN_DEFINE_int32(replication, disk_stat_interval_seconds, 600, "every what period (ms) we do disk stat"); DSN_DEFINE_int32(replication, gc_memory_replica_interval_ms, 10 * 60 * 1000, "The milliseconds of a replica remain in memory for quick recover aim after it's " "closed in healthy state (due to LB)"); DSN_DEFINE_int32( replication, mem_release_check_interval_ms, 3600000, "the replica check if should release memory to the system every this period of time(ms)"); DSN_DEFINE_int32( replication, mem_release_max_reserved_mem_percentage, 10, "if tcmalloc reserved but not-used memory exceed this percentage of application allocated " "memory, replica server will release the exceeding memory back to operating system"); bool check_mem_release_max_reserved_mem_percentage(int32_t value) { return value > 0 && value <= 100; } DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage, &check_mem_release_max_reserved_mem_percentage); DSN_DEFINE_bool(replication, replicas_stat_disabled, false, "whether to disable replicas stat"); DSN_DEFINE_uint32(replication, replicas_stat_interval_ms, 30000, "period in milliseconds that stats for replicas are calculated"); DSN_TAG_VARIABLE(replicas_stat_interval_ms, FT_MUTABLE); DSN_DEFINE_validator(replicas_stat_interval_ms, [](uint32_t value) -> bool { return value > 0; }); DSN_DEFINE_string( pegasus.server, hadoop_kms_url, "", "Provide the comma-separated list of URLs from which to retrieve the " "file system's server key. Example format: 'hostname1:1234/kms,hostname2:1234/kms'."); DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string &message) -> bool { if (!FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) { message = fmt::format("[pegasus.server] encrypt_data_at_rest should be enabled only if " "[security] enable_acl is enabled."); return false; } return true; }); DSN_DEFINE_group_validator(encrypt_data_at_rest_with_kms_url, [](std::string &message) -> bool { #ifndef MOCK_TEST if (FLAGS_encrypt_data_at_rest && dsn::utils::is_empty(FLAGS_hadoop_kms_url)) { message = fmt::format("[security] hadoop_kms_url should not be empty when [pegasus.server] " "encrypt_data_at_rest is enabled."); return false; } #endif return true; }); namespace dsn { namespace replication { bool replica_stub::s_not_exit_on_log_failure = false; namespace { // Register commands that get/set flag configurations. void register_flags_ctrl_command() { // For the reaonse why using std::call_once please see comments in // replica_stub::register_ctrl_command() for details. static std::once_flag flag; std::call_once(flag, []() mutable { dsn::command_manager::instance().add_global_cmd( dsn::command_manager::instance().register_int_command( FLAGS_max_replicas_on_load_for_each_disk, FLAGS_max_replicas_on_load_for_each_disk, "replica.max-replicas-on-load-for-each-disk", kMaxReplicasOnLoadForEachDiskDesc)); dsn::command_manager::instance().add_global_cmd( dsn::command_manager::instance().register_int_command( FLAGS_load_replica_max_wait_time_ms, FLAGS_load_replica_max_wait_time_ms, "replica.load-replica-max-wait-time-ms", kLoadReplicaMaxWaitTimeMsDesc)); dsn::command_manager::instance().add_global_cmd( dsn::command_manager::instance().register_bool_command( FLAGS_empty_write_disabled, "replica.disable-empty-write", "whether to disable empty writes")); dsn::command_manager::instance().add_global_cmd( dsn::command_manager::instance().register_int_command( FLAGS_max_concurrent_bulk_load_downloading_count, FLAGS_max_concurrent_bulk_load_downloading_count, "replica.max-concurrent-bulk-load-downloading-count", kMaxConcurrentBulkLoadDownloadingCountDesc)); }); } } // anonymous namespace replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), _state(NS_Disconnected), _replica_state_subscriber(std::move(subscriber)), _is_long_subscriber(is_long_subscriber), _deny_client(false), _verbose_client_log(false), _verbose_commit_log(false), _release_tcmalloc_memory(false), _mem_release_max_reserved_mem_percentage(10), _learn_app_concurrent_count(0), _bulk_load_downloading_count(0), _manual_emergency_checkpointing_count(0), _is_running(false), #ifdef DSN_ENABLE_GPERF _is_releasing_memory(false), #endif METRIC_VAR_INIT_server(total_replicas), METRIC_VAR_INIT_server(opening_replicas), METRIC_VAR_INIT_server(closing_replicas), METRIC_VAR_INIT_server(inactive_replicas), METRIC_VAR_INIT_server(error_replicas), METRIC_VAR_INIT_server(primary_replicas), METRIC_VAR_INIT_server(secondary_replicas), METRIC_VAR_INIT_server(learning_replicas), METRIC_VAR_INIT_server(learning_replicas_max_duration_ms), METRIC_VAR_INIT_server(learning_replicas_max_copy_file_bytes), METRIC_VAR_INIT_server(moved_error_replicas), METRIC_VAR_INIT_server(moved_garbage_replicas), METRIC_VAR_INIT_server(replica_removed_dirs), METRIC_VAR_INIT_server(replica_error_dirs), METRIC_VAR_INIT_server(replica_garbage_dirs), METRIC_VAR_INIT_server(replica_tmp_dirs), METRIC_VAR_INIT_server(replica_origin_dirs), #ifdef DSN_ENABLE_GPERF METRIC_VAR_INIT_server(tcmalloc_released_bytes), #endif METRIC_VAR_INIT_server(read_failed_requests), METRIC_VAR_INIT_server(write_failed_requests), METRIC_VAR_INIT_server(read_busy_requests), METRIC_VAR_INIT_server(write_busy_requests), METRIC_VAR_INIT_server(bulk_load_running_count), METRIC_VAR_INIT_server(bulk_load_ingestion_max_duration_ms), METRIC_VAR_INIT_server(bulk_load_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas), METRIC_VAR_INIT_server(splitting_replicas_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes) { // Some flags might need to be tuned on the stage of loading replicas (during // replica_stub::initialize()), thus register their control command just in the // constructor. register_flags_ctrl_command(); } replica_stub::~replica_stub() { close(); } void replica_stub::initialize(bool clear /* = false*/) { replication_options opts; opts.initialize(); initialize(opts, clear); _access_controller = std::make_unique<dsn::security::access_controller>(); } std::vector<replica_stub::disk_replicas_info> replica_stub::get_all_disk_dirs() const { std::vector<disk_replicas_info> disks; for (const auto &disk_node : _fs_manager.get_dir_nodes()) { if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) { // Skip disks with IO errors. continue; } std::vector<std::string> sub_dirs; CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), "failed to get sub_directories in {}", disk_node->full_dir); disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)}); } return disks; } // TaskCode: LPC_REPLICATION_INIT_LOAD // ThreadPool: THREAD_POOL_LOCAL_APP void replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir, size_t total_dir_count, utils::ex_lock &reps_lock, replica_map_by_gpid &reps, std::atomic<size_t> &finished_dir_count) { // Measure execution time for loading a replica dir. // // TODO(wangdan): support decimal milliseconds or microseconds, since loading a small // replica tends to spend less than 1 milliseconds and show "0ms" in logging. SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag, replica_dir); LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir); const auto *const worker = task::get_current_worker2(); if (worker != nullptr) { CHECK(!(worker->pool()->spec().partitioned), "The thread pool THREAD_POOL_LOCAL_APP(task code: LPC_REPLICATION_INIT_LOAD) " "for loading replicas must not be partitioned since load balancing is required " "among multiple threads"); } auto rep = load_replica(disk_node, replica_dir); if (rep == nullptr) { LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}", disk_node->tag, replica_dir, ++finished_dir_count, total_dir_count); return; } LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, progress={}/{}, " "last_durable_decree={}, last_committed_decree={}, last_prepared_decree={}", rep->get_gpid(), dsn_primary_host_port(), disk_node->tag, replica_dir, ++finished_dir_count, total_dir_count, rep->last_durable_decree(), rep->last_committed_decree(), rep->last_prepared_decree()); utils::auto_lock<utils::ex_lock> l(reps_lock); const auto rep_iter = reps.find(rep->get_gpid()); CHECK(rep_iter == reps.end(), "{}@{}: newly loaded dir {} conflicts with existing {} while loading replica", rep->get_gpid(), dsn_primary_host_port(), rep->dir(), rep_iter->second->dir()); reps.emplace(rep->get_gpid(), rep); } void replica_stub::load_replicas(replica_map_by_gpid &reps) { // Measure execution time for loading all replicas from all healthy disks without IO errors. // // TODO(wangdan): show both the size of output replicas and execution time on just one // logging line. SCOPED_LOG_TIMING(INFO, "on loading replicas"); const auto &disks = get_all_disk_dirs(); // The max index of dirs that are currently being loaded for each disk, which means the dirs // with higher indexes have not begun to be loaded (namely pushed into the queue). std::vector<size_t> replica_dir_indexes(disks.size(), 0); // Each loader is for a replica dir, including its path and loading task. struct replica_dir_loader { size_t replica_dir_index; std::string replica_dir_path; task_ptr load_replica_task; }; // Each queue would cache the tasks that loading dirs for each disk. Once the task is // found finished (namely a dir has been loaded successfully), it would be popped from // the queue. std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size()); // The number of loading replica dirs that have been finished for each disk, used to show // current progress. // // TODO(wangdan): calculate the number of successful or failed loading of replica dirs, // and the number for each reason if failed. std::vector<std::atomic<size_t>> finished_replica_dirs(disks.size()); for (auto &count : finished_replica_dirs) { count.store(0); } // The lock for operations on the loaded replicas as output. utils::ex_lock reps_lock; while (true) { size_t finished_disks = 0; // For each round, start loading one replica for each disk in case there are too many // replicas in a disk, except that all of the replicas of this disk are being loaded. for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) { // TODO(wangdan): Structured bindings can be captured by closures in g++, while // not supported well by clang. Thus we do not use following statement to bind // both variables until clang has been upgraded to version 16 which could support // that well: // // const auto &[disk_node, replica_dirs] = disks[disk_index]; // // For the docs of clang 16 please see: // // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support. const auto &replica_dirs = disks[disk_index].replica_dirs; auto &replica_dir_index = replica_dir_indexes[disk_index]; if (replica_dir_index >= replica_dirs.size()) { // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, // thus just skip to next disk. ++finished_disks; continue; } const auto &disk_node = disks[disk_index].disk_node; auto &load_disk_queue = load_disk_queues[disk_index]; if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { // Loading replicas should be throttled in case that disk IO is saturated. if (!load_disk_queue.front().load_replica_task->wait( static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) { // There might be too many replicas that are being loaded which lead to // slow disk IO, thus turn to load replicas of next disk, and try to load // dir `replica_dir_index` of this disk in the next round. LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still not finished, " "there are {} replicas being loaded for disk({}:{}, {}/{}), " "now turn to next disk, and will begin to load dir({}, {}/{}) " "soon", FLAGS_load_replica_max_wait_time_ms, load_disk_queue.front().replica_dir_path, load_disk_queue.front().replica_dir_index, replica_dirs.size(), load_disk_queue.size(), disk_node->tag, disk_node->full_dir, disk_index, disks.size(), replica_dirs[replica_dir_index], replica_dir_index, replica_dirs.size()); continue; } // Now the queue size is within the limit again, continue to load a new replica dir. load_disk_queue.pop(); } if (dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) { LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})", replica_dirs[replica_dir_index], replica_dir_index, replica_dirs.size(), disk_node->tag, disk_node->full_dir, disk_index, disks.size()); ++replica_dir_index; continue; } LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})", replica_dirs[replica_dir_index], replica_dir_index, replica_dirs.size(), disk_node->tag, disk_node->full_dir, disk_index, disks.size()); load_disk_queue.push(replica_dir_loader{ replica_dir_index, replica_dirs[replica_dir_index], tasking::create_task( // Ensure that the thread pool is non-partitioned. LPC_REPLICATION_INIT_LOAD, &_tracker, std::bind(static_cast<void (replica_stub::*)(dir_node *, const std::string &, size_t, utils::ex_lock &, replica_map_by_gpid &, std::atomic<size_t> &)>( &replica_stub::load_replica), this, disk_node, replica_dirs[replica_dir_index], replica_dirs.size(), std::ref(reps_lock), std::ref(reps), std::ref(finished_replica_dirs[disk_index])))}); load_disk_queue.back().load_replica_task->enqueue(); ++replica_dir_index; } if (finished_disks >= disks.size()) { // All replicas of all disks have begun to be loaded. break; } } // All loading tasks have been in the queue. Just wait all tasks to be finished. for (auto &load_disk_queue : load_disk_queues) { while (!load_disk_queue.empty()) { CHECK_TRUE(load_disk_queue.front().load_replica_task->wait()); load_disk_queue.pop(); } } } void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) { _primary_host_port = dsn_primary_host_port(); _primary_host_port_cache = _primary_host_port.to_string(); LOG_INFO("primary_host_port = {}", _primary_host_port_cache); set_options(opts); LOG_INFO("meta_servers = {}", fmt::join(_options.meta_servers, ", ")); _deny_client = FLAGS_deny_client_on_start; _verbose_client_log = FLAGS_verbose_client_log_on_start; _verbose_commit_log = FLAGS_verbose_commit_log_on_start; _release_tcmalloc_memory = FLAGS_mem_release_enabled; _mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage; // clear dirs if need if (clear) { CHECK(dsn::utils::filesystem::remove_path(_options.slog_dir), "Fail to remove {}.", _options.slog_dir); for (auto &dir : _options.data_dirs) { CHECK(dsn::utils::filesystem::remove_path(dir), "Fail to remove {}.", dir); } } const auto &kms_path = utils::filesystem::path_combine(_options.data_dirs[0], kms_info::kKmsInfo); // FLAGS_data_dirs may be empty when load configuration, use LOG_FATAL instead of group // validator. if (!FLAGS_encrypt_data_at_rest && utils::filesystem::path_exists(kms_path)) { LOG_FATAL("The kms_info file exists at ({}), but [pegasus.server] " "encrypt_data_at_rest is enbale." "Encryption in Pegasus is irreversible after its initial activation.", kms_path); } dsn::replication::kms_info kms_info; if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) { _key_provider.reset(new dsn::security::kms_key_provider( ::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()), FLAGS_encryption_cluster_key_name)); const auto &ec = dsn::utils::load_rjobj_from_file( kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info); if (ec != dsn::ERR_PATH_NOT_FOUND && ec != dsn::ERR_OK) { CHECK_EQ_MSG(dsn::ERR_OK, ec, "Can't load kms key from kms-info file"); } // Upon the first launch, the encryption key should be empty. The process will then retrieve // EEK, IV, and KV from KMS. // After the first launch, the encryption key, obtained from the kms-info file, should not // be empty. The process will then acquire the DEK from KMS. if (ec == dsn::ERR_PATH_NOT_FOUND) { LOG_WARNING("It's normal to encounter a temporary inability to open the kms-info file " "during the first process launch."); CHECK_OK(_key_provider->GenerateEncryptionKey(&kms_info), "Generate encryption key from kms failed"); } CHECK_OK(_key_provider->DecryptEncryptionKey(kms_info, &_server_key), "Get decryption key failed from {}", kms_path); FLAGS_server_key = _server_key.c_str(); } // Initialize the file system manager. _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags); if (_key_provider && !utils::filesystem::path_exists(kms_path)) { const auto &err = dsn::utils::dump_rjobj_to_file( kms_info, dsn::utils::FileDataType::kNonSensitive, kms_path); CHECK_EQ_MSG(dsn::ERR_OK, err, "Can't store kms key to kms-info file"); } // Check slog is not exist. auto full_slog_path = fmt::format("{}/replica/slog/", _options.slog_dir); if (utils::filesystem::directory_exists(full_slog_path)) { std::vector<std::string> slog_files; CHECK(utils::filesystem::get_subfiles(full_slog_path, slog_files, false), "check slog files failed"); CHECK(slog_files.empty(), "slog({}) files are not empty. Make sure you are upgrading from 2.5.0", full_slog_path); } // Start to load replicas in available data directories. LOG_INFO("start to load replicas"); replica_map_by_gpid reps; load_replicas(reps); LOG_INFO("load replicas succeed, replica_count = {}", reps.size()); bool is_log_complete = true; for (auto it = reps.begin(); it != reps.end(); ++it) { CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed"); it->second->reset_prepare_list_after_replay(); decree pmax = invalid_decree; decree pmax_commit = invalid_decree; if (it->second->private_log()) { pmax = it->second->private_log()->max_decree(it->first); pmax_commit = it->second->private_log()->max_commit_on_disk(); } LOG_INFO( "{}: load replica done, durable = {}, committed = {}, " "prepared = {}, ballot = {}, " "valid_offset_in_plog = {}, max_decree_in_plog = {}, max_commit_on_disk_in_plog = {}", it->second->name(), it->second->last_durable_decree(), it->second->last_committed_decree(), it->second->max_prepared_decree(), it->second->get_ballot(), it->second->get_app()->init_info().init_offset_in_private_log, pmax, pmax_commit); } // we will mark all replicas inactive not transient unless all logs are complete if (!is_log_complete) { LOG_ERROR("logs are not complete for some replicas, which means that shared log is " "truncated, mark all replicas as inactive"); for (auto &[_, rep] : reps) { rep->set_inactive_state_transient(false); } } // replicas stat if (!FLAGS_replicas_stat_disabled) { _replicas_stat_timer_task = tasking::enqueue_timer( LPC_REPLICAS_STAT, &_tracker, [this] { on_replicas_stat(); }, std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms), 0, std::chrono::milliseconds(rand::next_u32(0, FLAGS_replicas_stat_interval_ms))); } // disk stat if (!FLAGS_disk_stat_disabled) { _disk_stat_timer_task = ::dsn::tasking::enqueue_timer( LPC_DISK_STAT, &_tracker, [this]() { on_disk_stat(); }, std::chrono::seconds(FLAGS_disk_stat_interval_seconds), 0, std::chrono::seconds(FLAGS_disk_stat_interval_seconds)); } // Attach `reps`. _replicas = std::move(reps); METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size()); for (const auto &[pid, rep] : _replicas) { _fs_manager.add_replica(pid, rep->dir()); } _nfs = dsn::nfs_node::create(); _nfs->start(); dist::cmd::register_remote_command_rpc(); if (FLAGS_delay_for_fd_timeout_on_start) { uint64_t now_time_ms = dsn_now_ms(); uint64_t delay_time_ms = (FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) { uint64_t delay = dsn::utils::process_start_millis() + delay_time_ms - now_time_ms; LOG_INFO("delay for {} ms to make failure detector timeout", delay); tasking::enqueue( LPC_REPLICA_SERVER_DELAY_START, &_tracker, [this]() { this->initialize_start(); }, 0, std::chrono::milliseconds(delay)); } else { initialize_start(); } } else { initialize_start(); } } void replica_stub::initialize_start() { if (_is_running) { return; } // start timer for configuration sync if (!FLAGS_config_sync_disabled) { _config_sync_timer_task = tasking::enqueue_timer( LPC_QUERY_CONFIGURATION_ALL, &_tracker, [this]() { zauto_lock l(_state_lock); this->query_configuration_by_node(); }, std::chrono::milliseconds(FLAGS_config_sync_interval_ms), 0, std::chrono::milliseconds(FLAGS_config_sync_interval_ms)); } #ifdef DSN_ENABLE_GPERF _mem_release_timer_task = tasking::enqueue_timer(LPC_MEM_RELEASE, &_tracker, std::bind(&replica_stub::gc_tcmalloc_memory, this, false), std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms), 0, std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms)); #endif if (FLAGS_duplication_enabled) { _duplication_sync_timer = std::make_unique<duplication_sync_timer>(this); _duplication_sync_timer->start(); } _backup_server = std::make_unique<replica_backup_server>(this); // init liveness monitor CHECK_EQ(NS_Disconnected, _state); if (!FLAGS_fd_disabled) { _failure_detector = std::make_shared<dsn::dist::slave_failure_detector_with_multimaster>( _options.meta_servers, [this]() { this->on_meta_server_disconnected(); }, [this]() { this->on_meta_server_connected(); }); CHECK_GT_MSG(FLAGS_fd_grace_seconds, FLAGS_fd_lease_seconds, ""); CHECK_EQ_MSG(_failure_detector->start(FLAGS_fd_check_interval_seconds, FLAGS_fd_beacon_interval_seconds, FLAGS_fd_lease_seconds, FLAGS_fd_grace_seconds), ERR_OK, "FD start failed"); _failure_detector->register_master(_failure_detector->current_server_contact()); } else { _state = NS_Connected; } _is_running = true; } dsn::error_code replica_stub::on_kill_replica(gpid id) { LOG_INFO("kill replica: gpid = {}", id); if (id.get_app_id() == -1 || id.get_partition_index() == -1) { replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; } for (auto it = rs.begin(); it != rs.end(); ++it) { replica_ptr &r = it->second; if (id.get_app_id() == -1 || id.get_app_id() == r->get_gpid().get_app_id()) r->inject_error(ERR_INJECTED); } return ERR_OK; } else { error_code err = ERR_INVALID_PARAMETERS; replica_ptr r = get_replica(id); if (r == nullptr) { err = ERR_OBJECT_NOT_FOUND; } else { r->inject_error(ERR_INJECTED); err = ERR_OK; } return err; } } std::vector<replica_ptr> replica_stub::get_all_replicas() const { std::vector<replica_ptr> result; { zauto_read_lock l(_replicas_lock); std::transform(_replicas.begin(), _replicas.end(), std::back_inserter(result), [](const std::pair<gpid, replica_ptr> &r) { return r.second; }); } return result; } std::vector<replica_ptr> replica_stub::get_all_primaries() const { std::vector<replica_ptr> result; { zauto_read_lock l(_replicas_lock); for (const auto &[_, r] : _replicas) { if (r->status() != partition_status::PS_PRIMARY) { continue; } result.push_back(r); } } return result; } replica_ptr replica_stub::get_replica(gpid id) const { zauto_read_lock l(_replicas_lock); auto it = _replicas.find(id); if (it != _replicas.end()) return it->second; else return nullptr; } replica_stub::replica_life_cycle replica_stub::get_replica_life_cycle(gpid id) { zauto_read_lock l(_replicas_lock); if (_opening_replicas.find(id) != _opening_replicas.end()) return replica_stub::RL_creating; if (_replicas.find(id) != _replicas.end()) return replica_stub::RL_serving; if (_closing_replicas.find(id) != _closing_replicas.end()) return replica_stub::RL_closing; if (_closed_replicas.find(id) != _closed_replicas.end()) return replica_stub::RL_closed; return replica_stub::RL_invalid; } void replica_stub::on_client_write(gpid id, dsn::message_ex *request) { if (_deny_client) { // ignore and do not reply return; } if (_verbose_client_log && request) { LOG_INFO("{}@{}: client = {}, code = {}, timeout = {}", id, _primary_host_port_cache, request->header->from_address, request->header->rpc_name, request->header->client.timeout_ms); } replica_ptr rep = get_replica(id); if (rep != nullptr) { rep->on_client_write(request, false); } else { response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } } void replica_stub::on_client_read(gpid id, dsn::message_ex *request) { if (_deny_client) { // ignore and do not reply return; } if (_verbose_client_log && request) { LOG_INFO("{}@{}: client = {}, code = {}, timeout = {}", id, _primary_host_port_cache, request->header->from_address, request->header->rpc_name, request->header->client.timeout_ms); } replica_ptr rep = get_replica(id); if (rep != nullptr) { rep->on_client_read(request, false); } else { response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } } void replica_stub::on_config_proposal(const configuration_update_request &proposal) { if (!is_connected()) { LOG_WARNING("{}@{}: received config proposal {} for {}: not connected, ignore", proposal.config.pid, _primary_host_port_cache, enum_to_string(proposal.type), FMT_HOST_PORT_AND_IP(proposal, node)); return; } LOG_INFO("{}@{}: received config proposal {} for {}", proposal.config.pid, _primary_host_port_cache, enum_to_string(proposal.type), FMT_HOST_PORT_AND_IP(proposal, node)); replica_ptr rep = get_replica(proposal.config.pid); if (rep == nullptr) { if (proposal.type == config_type::CT_ASSIGN_PRIMARY) { std::shared_ptr<configuration_update_request> req2(new configuration_update_request); *req2 = proposal; begin_open_replica(proposal.info, proposal.config.pid, nullptr, req2); } else if (proposal.type == config_type::CT_UPGRADE_TO_PRIMARY) { remove_replica_on_meta_server(proposal.info, proposal.config); } } if (rep != nullptr) { rep->on_config_proposal((configuration_update_request &)proposal); } } void replica_stub::on_query_decree(query_replica_decree_rpc rpc) { const query_replica_decree_request &req = rpc.request(); query_replica_decree_response &resp = rpc.response(); replica_ptr rep = get_replica(req.pid); if (rep != nullptr) { resp.err = ERR_OK; if (partition_status::PS_POTENTIAL_SECONDARY == rep->status()) { resp.last_decree = 0; } else { resp.last_decree = rep->last_committed_decree(); // TODO: use the following to alleviate data lost // resp.last_decree = rep->last_prepared_decree(); } } else { resp.err = ERR_OBJECT_NOT_FOUND; resp.last_decree = 0; } } void replica_stub::on_query_replica_info(query_replica_info_rpc rpc) { query_replica_info_response &resp = rpc.response(); std::set<gpid> visited_replicas; { zauto_read_lock l(_replicas_lock); for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { replica_ptr &r = it->second; replica_info info; get_replica_info(info, r); if (visited_replicas.find(info.pid) == visited_replicas.end()) { visited_replicas.insert(info.pid); resp.replicas.push_back(std::move(info)); } } for (auto it = _closing_replicas.begin(); it != _closing_replicas.end(); ++it) { const replica_info &info = std::get<3>(it->second); if (visited_replicas.find(info.pid) == visited_replicas.end()) { visited_replicas.insert(info.pid); resp.replicas.push_back(info); } } for (auto it = _closed_replicas.begin(); it != _closed_replicas.end(); ++it) { const replica_info &info = it->second.second; if (visited_replicas.find(info.pid) == visited_replicas.end()) { visited_replicas.insert(info.pid); resp.replicas.push_back(info); } } } resp.err = ERR_OK; } void replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc) { const learn_request &request = rpc.request(); learn_response &response = rpc.response(); replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { rep->on_query_last_checkpoint(response); } else { response.err = ERR_OBJECT_NOT_FOUND; } } // ThreadPool: THREAD_POOL_DEFAULT void replica_stub::on_query_disk_info(query_disk_info_rpc rpc) { const query_disk_info_request &req = rpc.request(); query_disk_info_response &resp = rpc.response(); int app_id = 0; if (!req.app_name.empty()) { zauto_read_lock l(_replicas_lock); app_id = get_app_id_from_replicas(req.app_name); if (app_id == 0) { resp.err = ERR_OBJECT_NOT_FOUND; return; } } resp.disk_infos = _fs_manager.get_disk_infos(app_id); // Get the statistics from fs_manager's metrics, they are thread-safe. resp.total_capacity_mb = _fs_manager._total_capacity_mb.load(std::memory_order_relaxed); resp.total_available_mb = _fs_manager._total_available_mb.load(std::memory_order_relaxed); resp.err = ERR_OK; } void replica_stub::on_disk_migrate(replica_disk_migrate_rpc rpc) { const replica_disk_migrate_request &request = rpc.request(); replica_disk_migrate_response &response = rpc.response(); replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { rep->disk_migrator()->on_migrate_replica(rpc); // THREAD_POOL_DEFAULT } else { response.err = ERR_OBJECT_NOT_FOUND; } } void replica_stub::on_query_app_info(query_app_info_rpc rpc) { const query_app_info_request &req = rpc.request(); query_app_info_response &resp = rpc.response(); LOG_INFO("got query app info request from ({})", FMT_HOST_PORT_AND_IP(req, meta_server)); resp.err = dsn::ERR_OK; std::set<app_id> visited_apps; { zauto_read_lock l(_replicas_lock); for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { replica_ptr &r = it->second; const app_info &info = *r->get_app_info(); if (visited_apps.find(info.app_id) == visited_apps.end()) { resp.apps.push_back(info); visited_apps.insert(info.app_id); } } for (auto it = _closing_replicas.begin(); it != _closing_replicas.end(); ++it) { const app_info &info = std::get<2>(it->second); if (visited_apps.find(info.app_id) == visited_apps.end()) { resp.apps.push_back(info); visited_apps.insert(info.app_id); } } for (auto it = _closed_replicas.begin(); it != _closed_replicas.end(); ++it) { const app_info &info = it->second.first; if (visited_apps.find(info.app_id) == visited_apps.end()) { resp.apps.push_back(info); visited_apps.insert(info.app_id); } } } } // ThreadPool: THREAD_POOL_DEFAULT void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) { const auto &disk_str = rpc.request().disk_str; auto &resp = rpc.response(); resp.err = ERR_OK; std::vector<std::string> data_dirs; std::vector<std::string> data_dir_tags; std::string err_msg; if (disk_str.empty() || !replication_options::get_data_dir_and_tag(disk_str, "", replication_options::kReplicaAppType, data_dirs, data_dir_tags, err_msg)) { resp.err = ERR_INVALID_PARAMETERS; resp.__set_err_hint(fmt::format("invalid str({}), err_msg: {}", disk_str, err_msg)); return; } for (auto i = 0; i < data_dir_tags.size(); ++i) { // TODO(yingchun): move the following code to fs_manager. auto dir = data_dirs[i]; if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) { resp.err = ERR_NODE_ALREADY_EXIST; resp.__set_err_hint( fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i])); return; } if (dsn_unlikely(utils::filesystem::directory_exists(dir) && !utils::filesystem::is_directory_empty(dir).second)) { resp.err = ERR_DIR_NOT_EMPTY; resp.__set_err_hint(fmt::format("Disk({}) directory is not empty", dir)); return; } std::string cdir; if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) || !utils::filesystem::check_dir_rw(dir, err_msg))) { resp.err = ERR_FILE_OPERATION_FAILED; resp.__set_err_hint(err_msg); return; } LOG_INFO("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]); // TODO(yingchun): there is a gap between _fs_manager.is_dir_node_exist() and // _fs_manager.add_new_dir_node() which is not atomic. _fs_manager.add_new_dir_node(cdir, data_dir_tags[i]); } } void replica_stub::on_nfs_copy(const ::dsn::service::copy_request &request, ::dsn::rpc_replier<::dsn::service::copy_response> &reply) { if (check_status_and_authz_with_reply(request, reply, ranger::access_type::kWrite)) { _nfs->on_copy(request, reply); } } void replica_stub::on_nfs_get_file_size( const ::dsn::service::get_file_size_request &request, ::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply) { if (check_status_and_authz_with_reply(request, reply, ranger::access_type::kWrite)) { _nfs->on_get_file_size(request, reply); } } void replica_stub::on_prepare(dsn::message_ex *request) { gpid id; dsn::unmarshall(request, id); replica_ptr rep = get_replica(id); if (rep != nullptr) { rep->on_prepare(request); } else { prepare_ack resp; resp.pid = id; resp.err = ERR_OBJECT_NOT_FOUND; reply(request, resp); } } void replica_stub::on_group_check(group_check_rpc rpc) { const group_check_request &request = rpc.request(); group_check_response &response = rpc.response(); if (!is_connected()) { LOG_WARNING("{}@{}: received group check: not connected, ignore", request.config.pid, _primary_host_port_cache); return; } LOG_INFO("{}@{}: received group check, primary = {}, ballot = {}, status = {}, " "last_committed_decree = {}", request.config.pid, _primary_host_port_cache, FMT_HOST_PORT_AND_IP(request.config, primary), request.config.ballot, enum_to_string(request.config.status), request.last_committed_decree); replica_ptr rep = get_replica(request.config.pid); if (rep != nullptr) { rep->on_group_check(request, response); } else { if (request.config.status == partition_status::PS_POTENTIAL_SECONDARY) { std::shared_ptr<group_check_request> req(new group_check_request); *req = request; begin_open_replica(request.app, request.config.pid, req, nullptr); response.err = ERR_OK; response.learner_signature = invalid_signature; } else { response.err = ERR_OBJECT_NOT_FOUND; } } } void replica_stub::on_learn(dsn::message_ex *msg) { learn_response response; learn_request request; ::dsn::unmarshall(msg, request); replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { if (!rep->access_controller_allowed(msg, ranger::access_type::kWrite)) { response.err = ERR_ACL_DENY; reply(msg, response); return; } rep->on_learn(msg, request); } else { response.err = ERR_OBJECT_NOT_FOUND; reply(msg, response); } } void replica_stub::on_learn_completion_notification(learn_completion_notification_rpc rpc) { const group_check_response &report = rpc.request(); learn_notify_response &response = rpc.response(); response.pid = report.pid; response.signature = report.learner_signature; replica_ptr rep = get_replica(report.pid); if (rep != nullptr) { rep->on_learn_completion_notification(report, response); } else { response.err = ERR_OBJECT_NOT_FOUND; } } void replica_stub::on_add_learner(const group_check_request &request) { if (!is_connected()) { LOG_WARNING("{}@{}: received add learner, primary = {}, not connected, ignore", request.config.pid, _primary_host_port_cache, FMT_HOST_PORT_AND_IP(request.config, primary)); return; } LOG_INFO("{}@{}: received add learner, primary = {}, ballot = {}, status = {}, " "last_committed_decree = {}", request.config.pid, _primary_host_port_cache, FMT_HOST_PORT_AND_IP(request.config, primary), request.config.ballot, enum_to_string(request.config.status), request.last_committed_decree); replica_ptr rep = get_replica(request.config.pid); if (rep != nullptr) { rep->on_add_learner(request); } else { std::shared_ptr<group_check_request> req(new group_check_request); *req = request; begin_open_replica(request.app, request.config.pid, req, nullptr); } } void replica_stub::on_remove(const replica_configuration &request) { replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { rep->on_remove(request); } } void replica_stub::get_replica_info(replica_info &info, replica_ptr r) { info.pid = r->get_gpid(); info.ballot = r->get_ballot(); info.status = r->status(); info.app_type = r->get_app_info()->app_type; info.last_committed_decree = r->last_committed_decree(); info.last_prepared_decree = r->last_prepared_decree(); info.last_durable_decree = r->last_durable_decree(); info.disk_tag = r->get_dir_node()->tag; info.__set_manual_compact_status(r->get_manual_compact_status()); } void replica_stub::get_local_replicas(std::vector<replica_info> &replicas) { zauto_read_lock l(_replicas_lock); // local_replicas = replicas + closing_replicas + closed_replicas int total_replicas = _replicas.size() + _closing_replicas.size() + _closed_replicas.size(); replicas.reserve(total_replicas); for (auto &pairs : _replicas) { replica_ptr &rep = pairs.second; // child partition should not sync config from meta server // because it is not ready in meta view if (rep->status() == partition_status::PS_PARTITION_SPLIT) { continue; } replica_info info; get_replica_info(info, rep); replicas.push_back(std::move(info)); } for (auto &pairs : _closing_replicas) { replicas.push_back(std::get<3>(pairs.second)); } for (auto &pairs : _closed_replicas) { replicas.push_back(pairs.second.second); } } // run in THREAD_POOL_META_SERVER // assert(_state_lock.locked()) void replica_stub::query_configuration_by_node() { if (_state == NS_Disconnected) { return; } if (_config_query_task != nullptr) { return; } dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CONFIG_SYNC); configuration_query_by_node_request req; SET_IP_AND_HOST_PORT(req, node, primary_address(), _primary_host_port); // TODO: send stored replicas may cost network, we shouldn't config the frequency get_local_replicas(req.stored_replicas); req.__isset.stored_replicas = true; ::dsn::marshall(msg, req); LOG_INFO("send query node partitions request to meta server, stored_replicas_count = {}", req.stored_replicas.size()); const auto &target = dsn::dns_resolver::instance().resolve_address(_failure_detector->get_servers()); _config_query_task = rpc::call(target, msg, &_tracker, [this](error_code err, dsn::message_ex *request, dsn::message_ex *resp) { on_node_query_reply(err, request, resp); }); } void replica_stub::on_meta_server_connected() { LOG_INFO("meta server connected"); zauto_lock l(_state_lock); if (_state == NS_Disconnected) { _state = NS_Connecting; tasking::enqueue(LPC_QUERY_CONFIGURATION_ALL, &_tracker, [this]() { zauto_lock l(_state_lock); this->query_configuration_by_node(); }); } } // run in THREAD_POOL_META_SERVER void replica_stub::on_node_query_reply(error_code err, dsn::message_ex *request, dsn::message_ex *response) { LOG_INFO("query node partitions replied, err = {}", err); zauto_lock sl(_state_lock); _config_query_task = nullptr; if (err != ERR_OK) { if (_state == NS_Connecting) { query_configuration_by_node(); } } else { if (_state == NS_Connecting) { _state = NS_Connected; } // DO NOT UPDATE STATE WHEN DISCONNECTED if (_state != NS_Connected) return; configuration_query_by_node_response resp; ::dsn::unmarshall(response, resp); if (resp.err == ERR_BUSY) { int delay_ms = 500; LOG_INFO("resend query node partitions request after {} ms for resp.err = ERR_BUSY", delay_ms); _config_query_task = tasking::enqueue( LPC_QUERY_CONFIGURATION_ALL, &_tracker, [this]() { zauto_lock l(_state_lock); _config_query_task = nullptr; this->query_configuration_by_node(); }, 0, std::chrono::milliseconds(delay_ms)); return; } if (resp.err != ERR_OK) { LOG_INFO("ignore query node partitions response for resp.err = {}", resp.err); return; } LOG_INFO("process query node partitions response for resp.err = ERR_OK, " "partitions_count({}), gc_replicas_count({})", resp.partitions.size(), resp.gc_replicas.size()); replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); reps = _replicas; } for (const auto &config_update : resp.partitions) { reps.erase(config_update.config.pid); tasking::enqueue( LPC_QUERY_NODE_CONFIGURATION_SCATTER, &_tracker, std::bind(&replica_stub::on_node_query_reply_scatter, this, this, config_update), config_update.config.pid.thread_hash()); } // For the replicas that do not exist on meta_servers. for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_QUERY_NODE_CONFIGURATION_SCATTER2, &_tracker, std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, pid), pid.thread_hash()); } // handle the replicas which need to be gc if (resp.__isset.gc_replicas) { for (replica_info &rep : resp.gc_replicas) { replica_stub::replica_life_cycle lc = get_replica_life_cycle(rep.pid); if (lc == replica_stub::RL_closed) { tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, &_tracker, std::bind(&replica_stub::on_gc_replica, this, this, rep.pid), 0); } } } } } void replica_stub::set_meta_server_connected_for_test( const configuration_query_by_node_response &resp) { zauto_lock l(_state_lock); CHECK_NE(_state, NS_Connected); _state = NS_Connected; for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) { tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER, &_tracker, std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it), it->config.pid.thread_hash()); } } void replica_stub::set_replica_state_subscriber_for_test(replica_state_subscriber subscriber, bool is_long_subscriber) { _replica_state_subscriber = subscriber; _is_long_subscriber = is_long_subscriber; } // this_ is used to hold a ref to replica_stub so we don't need to cancel the task on // replica_stub::close // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_, const configuration_update_request &req) { replica_ptr replica = get_replica(req.config.pid); if (replica != nullptr) { replica->on_config_sync(req.info, req.config, req.__isset.meta_split_status ? req.meta_split_status : split_status::NOT_SPLIT); } else { if (req.config.hp_primary == _primary_host_port) { LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it " "from meta server", req.config.pid, _primary_host_port_cache); remove_replica_on_meta_server(req.info, req.config); } else { LOG_INFO( "{}@{}: replica not exists on replica server, which is not primary, just ignore", req.config.pid, _primary_host_port_cache); } } } // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id) { replica_ptr replica = get_replica(id); if (replica != nullptr && replica->status() != partition_status::PS_POTENTIAL_SECONDARY && replica->status() != partition_status::PS_PARTITION_SPLIT) { if (replica->status() == partition_status::PS_INACTIVE && dsn_now_ms() - replica->create_time_milliseconds() < FLAGS_gc_memory_replica_interval_ms) { LOG_INFO("{}: replica not exists on meta server, wait to close", replica->name()); return; } LOG_INFO("{}: replica not exists on meta server, remove", replica->name()); // TODO: set PS_INACTIVE instead for further state reuse replica->update_local_configuration_with_no_ballot_change(partition_status::PS_ERROR); } } void replica_stub::remove_replica_on_meta_server(const app_info &info, const partition_configuration &pc) { if (FLAGS_fd_disabled) { return; } dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_UPDATE_PARTITION_CONFIGURATION); std::shared_ptr<configuration_update_request> request(new configuration_update_request); request->info = info; request->config = pc; request->config.ballot++; SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port); request->type = config_type::CT_DOWNGRADE_TO_INACTIVE; if (_primary_host_port == pc.hp_primary) { RESET_IP_AND_HOST_PORT(request->config, primary); } else if (REMOVE_IP_AND_HOST_PORT( primary_address(), _primary_host_port, request->config, secondaries)) { } else { return; } ::dsn::marshall(msg, *request); const auto &target = dsn::dns_resolver::instance().resolve_address(_failure_detector->get_servers()); rpc::call(target, msg, nullptr, [](error_code err, dsn::message_ex *, dsn::message_ex *) {}); } void replica_stub::on_meta_server_disconnected() { LOG_INFO("meta server disconnected"); zauto_lock sl(_state_lock); if (NS_Disconnected == _state) return; _state = NS_Disconnected; replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); reps = _replicas; } for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_CM_DISCONNECTED_SCATTER, &_tracker, std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, pid), pid.thread_hash()); } } // this_ is used to hold a ref to replica_stub so we don't need to cancel the task on // replica_stub::close void replica_stub::on_meta_server_disconnected_scatter(replica_stub_ptr this_, gpid id) { { zauto_lock l(_state_lock); if (_state != NS_Disconnected) return; } replica_ptr replica = get_replica(id); if (replica != nullptr) { replica->on_meta_server_disconnected(); } } void replica_stub::response_client(gpid id, bool is_read, dsn::message_ex *request, partition_status::type status, error_code error) { if (error == ERR_BUSY) { if (is_read) { METRIC_VAR_INCREMENT(read_busy_requests); } else { METRIC_VAR_INCREMENT(write_busy_requests); } } else if (error != ERR_OK) { if (is_read) { METRIC_VAR_INCREMENT(read_failed_requests); } else { METRIC_VAR_INCREMENT(write_failed_requests); } LOG_ERROR("{}@{}: {} fail: client = {}, code = {}, timeout = {}, status = {}, error = {}", id, _primary_host_port_cache, is_read ? "read" : "write", request == nullptr ? "null" : request->header->from_address.to_string(), request == nullptr ? "null" : request->header->rpc_name, request == nullptr ? 0 : request->header->client.timeout_ms, enum_to_string(status), error); } if (request != nullptr) { dsn_rpc_reply(request->create_response(), error); } } void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id) { std::pair<app_info, replica_info> closed_info; { zauto_write_lock l(_replicas_lock); auto iter = _closed_replicas.find(id); if (iter == _closed_replicas.end()) return; closed_info = iter->second; _closed_replicas.erase(iter); } _fs_manager.remove_replica(id); const auto *const dn = _fs_manager.find_replica_dir(closed_info.first.app_type, id); if (dn == nullptr) { LOG_WARNING( "gc closed replica({}.{}) failed, no exist data", id, closed_info.first.app_type); return; } const auto replica_path = dn->replica_dir(closed_info.first.app_type, id); CHECK( dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path); LOG_INFO("start to move replica({}) as garbage, path: {}", id, replica_path); const auto rename_path = fmt::format("{}.{}{}", replica_path, dsn_now_us(), kFolderSuffixGar); if (!dsn::utils::filesystem::rename_path(replica_path, rename_path)) { LOG_WARNING("gc_replica: failed to move directory '{}' to '{}'", replica_path, rename_path); // if gc the replica failed, add it back { zauto_write_lock l(_replicas_lock); _closed_replicas.emplace(id, closed_info); } _fs_manager.add_replica(id, replica_path); } else { LOG_WARNING("gc_replica: replica_dir_op succeed to move directory '{}' to '{}'", replica_path, rename_path); METRIC_VAR_INCREMENT(moved_garbage_replicas); } } void replica_stub::on_replicas_stat() { uint64_t start = dsn_now_ns(); replica_stat_info_by_gpid rep_stat_info_by_gpid; { zauto_read_lock l(_replicas_lock); // A replica was removed from _replicas before it would be closed by replica::close(). // Thus it's safe to use the replica after fetching its ref pointer from _replicas. for (const auto &replica : _replicas) { const auto &rep = replica.second; auto &rep_stat_info = rep_stat_info_by_gpid[replica.first]; rep_stat_info.rep = rep; rep_stat_info.status = rep->status(); rep_stat_info.plog = rep->private_log(); rep_stat_info.last_durable_decree = rep->last_durable_decree(); } } LOG_INFO("start replicas statistics, replica_count = {}", rep_stat_info_by_gpid.size()); // statistic learning info uint64_t learning_max_duration_time_ms = 0; uint64_t learning_max_copy_file_size = 0; uint64_t bulk_load_running_count = 0; uint64_t bulk_load_max_ingestion_time_ms = 0; uint64_t bulk_load_max_duration_time_ms = 0; uint64_t splitting_max_duration_time_ms = 0; uint64_t splitting_max_async_learn_time_ms = 0; uint64_t splitting_max_copy_file_size = 0; std::map<partition_status::type, size_t> status_counts; for (const auto &[_, rep_stat_info] : rep_stat_info_by_gpid) { const auto &rep = rep_stat_info.rep; ++status_counts[rep->status()]; if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) { learning_max_duration_time_ms = std::max( learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms()); learning_max_copy_file_size = std::max(learning_max_copy_file_size, rep->_potential_secondary_states.learning_copy_file_size); continue; } if (rep->status() == partition_status::PS_PRIMARY || rep->status() == partition_status::PS_SECONDARY) { if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) { bulk_load_running_count++; bulk_load_max_ingestion_time_ms = std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms()); bulk_load_max_duration_time_ms = std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms()); } continue; } // splitting_max_copy_file_size, rep->_split_states.copy_file_size if (rep->status() == partition_status::PS_PARTITION_SPLIT) { splitting_max_duration_time_ms = std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms()); splitting_max_async_learn_time_ms = std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms()); splitting_max_copy_file_size = std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size); continue; } } METRIC_VAR_SET(inactive_replicas, status_counts[partition_status::PS_INACTIVE]); METRIC_VAR_SET(error_replicas, status_counts[partition_status::PS_ERROR]); METRIC_VAR_SET(primary_replicas, status_counts[partition_status::PS_PRIMARY]); METRIC_VAR_SET(secondary_replicas, status_counts[partition_status::PS_SECONDARY]); METRIC_VAR_SET(learning_replicas, status_counts[partition_status::PS_POTENTIAL_SECONDARY]); METRIC_VAR_SET(learning_replicas_max_duration_ms, learning_max_duration_time_ms); METRIC_VAR_SET(learning_replicas_max_copy_file_bytes, learning_max_copy_file_size); METRIC_VAR_SET(bulk_load_running_count, bulk_load_running_count); METRIC_VAR_SET(bulk_load_ingestion_max_duration_ms, bulk_load_max_ingestion_time_ms); METRIC_VAR_SET(bulk_load_max_duration_ms, bulk_load_max_duration_time_ms); METRIC_VAR_SET(splitting_replicas, status_counts[partition_status::PS_PARTITION_SPLIT]); METRIC_VAR_SET(splitting_replicas_max_duration_ms, splitting_max_duration_time_ms); METRIC_VAR_SET(splitting_replicas_async_learn_max_duration_ms, splitting_max_async_learn_time_ms); METRIC_VAR_SET(splitting_replicas_max_copy_file_bytes, splitting_max_copy_file_size); LOG_INFO("finish replicas statistics, time used {}ns", dsn_now_ns() - start); } void replica_stub::on_disk_stat() { LOG_INFO("start to update disk stat"); uint64_t start = dsn_now_ns(); disk_cleaning_report report{}; dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report); _fs_manager.update_disk_stat(); update_disk_holding_replicas(); METRIC_VAR_SET(replica_error_dirs, report.error_replica_count); METRIC_VAR_SET(replica_garbage_dirs, report.garbage_replica_count); METRIC_VAR_SET(replica_tmp_dirs, report.disk_migrate_tmp_count); METRIC_VAR_SET(replica_origin_dirs, report.disk_migrate_origin_count); METRIC_VAR_INCREMENT_BY(replica_removed_dirs, report.remove_dir_count); LOG_INFO("finish to update disk stat, time_used_ns = {}", dsn_now_ns() - start); } task_ptr replica_stub::begin_open_replica( const app_info &app, gpid id, const std::shared_ptr<group_check_request> &group_check, const std::shared_ptr<configuration_update_request> &configuration_update) { _replicas_lock.lock_write(); if (_replicas.find(id) != _replicas.end()) { _replicas_lock.unlock_write(); LOG_INFO("open replica '{}.{}' failed coz replica is already opened", app.app_type, id); return nullptr; } if (_opening_replicas.find(id) != _opening_replicas.end()) { _replicas_lock.unlock_write(); LOG_INFO("open replica '{}.{}' failed coz replica is under opening", app.app_type, id); return nullptr; } auto it = _closing_replicas.find(id); if (it != _closing_replicas.end()) { task_ptr tsk = std::get<0>(it->second); replica_ptr rep = std::get<1>(it->second); if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) { // reopen it _closing_replicas.erase(it); METRIC_VAR_DECREMENT(closing_replicas); _replicas.emplace(id, rep); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); // unlock here to avoid dead lock _replicas_lock.unlock_write(); LOG_INFO("open replica '{}.{}' which is to be closed, reopen it", app.app_type, id); // open by add learner if (group_check != nullptr) { on_add_learner(*group_check); } } else { _replicas_lock.unlock_write(); LOG_INFO("open replica '{}.{}' failed coz replica is under closing", app.app_type, id); } return nullptr; } task_ptr task = tasking::enqueue( LPC_OPEN_REPLICA, &_tracker, std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update)); _opening_replicas[id] = task; METRIC_VAR_INCREMENT(opening_replicas); _closed_replicas.erase(id); _replicas_lock.unlock_write(); return task; } void replica_stub::open_replica( const app_info &app, gpid id, const std::shared_ptr<group_check_request> &group_check, const std::shared_ptr<configuration_update_request> &configuration_update) { replica_ptr rep; std::string dir; auto dn = _fs_manager.find_replica_dir(app.app_type, id); if (dn != nullptr) { dir = dn->replica_dir(app.app_type, id); CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir); // NOTICE: if partition is DDD, and meta select one replica as primary, it will execute the // load-process because of a.b.pegasus is exist, so it will never execute the restore // process below LOG_INFO("{}@{}: start to load replica {} group check, dir = {}", id, _primary_host_port_cache, group_check ? "with" : "without", dir); rep = load_replica(dn, dir); // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk // migration if (rep == nullptr) { const auto origin_dir_type = fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix); const auto origin_dn = _fs_manager.find_replica_dir(origin_dir_type, id); if (origin_dn != nullptr) { const auto origin_tmp_dir = origin_dn->replica_dir(origin_dir_type, id); CHECK(dsn::utils::filesystem::directory_exists(origin_tmp_dir), "dir({}) not exist", origin_tmp_dir); LOG_INFO("mark the dir {} as garbage, start revert and load disk migration origin " "replica data({})", dir, origin_tmp_dir); dsn::utils::filesystem::rename_path(dir, fmt::format("{}{}", dir, kFolderSuffixGar)); std::string origin_dir = origin_tmp_dir; // revert the origin replica dir boost::replace_first( origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); rep = load_replica(origin_dn, origin_dir); FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> void {}); } } } if (rep == nullptr) { // NOTICE: if dir a.b.pegasus does not exist, or .app-info does not exist, but the ballot > // 0, or the last_committed_decree > 0, start replica will fail if ((configuration_update != nullptr) && (configuration_update->info.is_stateful)) { CHECK(configuration_update->config.ballot == 0 && configuration_update->config.last_committed_decree == 0, "{}@{}: cannot load replica({}.{}), ballot = {}, " "last_committed_decree = {}, but it does not existed!", id, _primary_host_port_cache, id, app.app_type.c_str(), configuration_update->config.ballot, configuration_update->config.last_committed_decree); } // NOTICE: only new_replica_group's assign_primary will execute this; if server restart when // download restore-data from cold backup media, the a.b.pegasus will move to // a.b.pegasus.timestamp.err when replica-server load all the replicas, so restore-flow will // do it again bool restore_if_necessary = ((configuration_update != nullptr) && (configuration_update->type == config_type::CT_ASSIGN_PRIMARY) && (app.envs.find(backup_restore_constant::POLICY_NAME) != app.envs.end())); bool is_duplication_follower = ((configuration_update != nullptr) && (configuration_update->type == config_type::CT_ASSIGN_PRIMARY) && (app.envs.find(duplication_constants::kEnvMasterClusterKey) != app.envs.end()) && (app.envs.find(duplication_constants::kEnvMasterMetasKey) != app.envs.end())); // NOTICE: when we don't need execute restore-process, we should remove a.b.pegasus // directory because it don't contain the valid data dir and also we need create a new // replica(if contain valid data, it will execute load-process) if (!restore_if_necessary && ::dsn::utils::filesystem::directory_exists(dir)) { CHECK(::dsn::utils::filesystem::remove_path(dir), "remove useless directory({}) failed", dir); } rep = new_replica(id, app, restore_if_necessary, is_duplication_follower); } if (rep == nullptr) { LOG_WARNING("{}@{}: open replica failed, erase from opening replicas", id, _primary_host_port_cache); zauto_write_lock l(_replicas_lock); CHECK_GT_MSG(_opening_replicas.erase(id), 0, "replica {} is not in _opening_replicas", id); METRIC_VAR_DECREMENT(opening_replicas); return; } { zauto_write_lock l(_replicas_lock); CHECK_GT_MSG(_opening_replicas.erase(id), 0, "replica {} is not in _opening_replicas", id); METRIC_VAR_DECREMENT(opening_replicas); CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id); _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), rep)); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); } if (nullptr != group_check) { rpc::call_one_way_typed(primary_address(), RPC_LEARN_ADD_LEARNER, *group_check, group_check->config.pid.thread_hash()); } else if (nullptr != configuration_update) { rpc::call_one_way_typed(primary_address(), RPC_CONFIG_PROPOSAL, *configuration_update, configuration_update->config.pid.thread_hash()); } } replica *replica_stub::new_replica(gpid gpid, const app_info &app, bool restore_if_necessary, bool is_duplication_follower, const std::string &parent_dir) { dir_node *dn = nullptr; if (parent_dir.empty()) { dn = _fs_manager.create_replica_dir_if_necessary(app.app_type, gpid); } else { dn = _fs_manager.create_child_replica_dir(app.app_type, gpid, parent_dir); } if (dn == nullptr) { LOG_ERROR("could not allocate a new directory for replica {}", gpid); return nullptr; } const auto &dir = dn->replica_dir(app.app_type, gpid); CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir); auto *rep = new replica(this, gpid, app, dn, restore_if_necessary, is_duplication_follower); error_code err; if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err); clear_on_failure(rep); return nullptr; } if (is_duplication_follower && (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) { LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check " "previous detail error log", rep->name(), err); clear_on_failure(rep); return nullptr; } err = rep->initialize_on_new(); if (err != ERR_OK) { LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err); clear_on_failure(rep); return nullptr; } LOG_DEBUG("{}: new replica succeed", rep->name()); return rep; } replica *replica_stub::new_replica(gpid gpid, const app_info &app, bool restore_if_necessary, bool is_duplication_follower) { return new_replica(gpid, app, restore_if_necessary, is_duplication_follower, {}); } /*static*/ std::string replica_stub::get_replica_dir_name(const std::string &dir) { static const char splitters[] = {'\\', '/', 0}; return utils::get_last_component(dir, splitters); } /* static */ bool replica_stub::parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type) { std::vector<uint32_t> ids(2, 0); size_t begin = 0; for (auto &id : ids) { size_t end = dir_name.find('.', begin); if (end == std::string::npos) { return false; } if (!buf2uint32(std::string_view(dir_name.data() + begin, end - begin), id)) { return false; } begin = end + 1; } if (begin >= dir_name.size()) { return false; } pid.set_app_id(static_cast<int32_t>(ids[0])); pid.set_partition_index(static_cast<int32_t>(ids[1])); // TODO(wangdan): the 3rd parameter `count` does not support default argument for CentOS 7 // (gcc 7.3.1). After CentOS 7 is deprecated, consider dropping std::string::npos. app_type.assign(dir_name, begin, std::string::npos); return true; } bool replica_stub::validate_replica_dir(const std::string &dir, app_info &ai, gpid &pid, std::string &hint_message) { if (!utils::filesystem::directory_exists(dir)) { hint_message = fmt::format("replica dir '{}' not exist", dir); return false; } const auto &dir_name = get_replica_dir_name(dir); if (dir_name.empty()) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } std::string app_type; if (!parse_replica_dir_name(dir_name, pid, app_type)) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } replica_app_info rai(&ai); const auto ai_path = utils::filesystem::path_combine(dir, replica_app_info::kAppInfo); const auto err = rai.load(ai_path); if (ERR_OK != err) { hint_message = fmt::format("load app-info from '{}' failed, err = {}", ai_path, err); return false; } if (ai.app_type != app_type) { hint_message = fmt::format("unmatched app type '{}' for '{}'", ai.app_type, ai_path); return false; } if (pid.get_partition_index() >= ai.partition_count) { // Once the online partition split aborted, the partitions within the range of // [ai.partition_count, 2 * ai.partition_count) would become garbage. hint_message = fmt::format( "partition[{}], count={}, this replica may be partition split garbage partition, " "ignore it", pid, ai.partition_count); return false; } return true; } replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir) { FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> replica * { return nullptr; }); app_info ai; gpid pid; std::string hint_message; if (!validate_replica_dir(replica_dir, ai, pid, hint_message)) { LOG_ERROR("invalid replica dir '{}', hint={}", replica_dir, hint_message); return nullptr; } // The replica's directory must exist when creating a replica. CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir); auto *rep = new replica(this, pid, ai, disk_node, false); const auto err = rep->initialize_on_load(); if (err != ERR_OK) { LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}", rep->name(), disk_node->tag, replica_dir, err); delete rep; rep = nullptr; // clear work on failure if (dsn::utils::filesystem::directory_exists(replica_dir)) { move_to_err_path(replica_dir, "load replica"); METRIC_VAR_INCREMENT(moved_error_replicas); _fs_manager.remove_replica(pid); } return nullptr; } LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", rep->name(), disk_node->tag, replica_dir); return rep; } void replica_stub::clear_on_failure(replica *rep) { const auto rep_dir = rep->dir(); const auto pid = rep->get_gpid(); rep->close(); delete rep; rep = nullptr; // clear work on failure utils::filesystem::remove_path(rep_dir); _fs_manager.remove_replica(pid); } task_ptr replica_stub::begin_close_replica(replica_ptr r) { CHECK(r->status() == partition_status::PS_ERROR || r->status() == partition_status::PS_INACTIVE || r->disk_migrator()->status() >= disk_migration_status::MOVED, "invalid state(partition_status={}, migration_status={}) when calling " "replica({}) close", enum_to_string(r->status()), enum_to_string(r->disk_migrator()->status()), r->name()); gpid id = r->get_gpid(); zauto_write_lock l(_replicas_lock); if (_replicas.erase(id) == 0) { return nullptr; } METRIC_VAR_DECREMENT(total_replicas); int delay_ms = 0; if (r->status() == partition_status::PS_INACTIVE) { delay_ms = FLAGS_gc_memory_replica_interval_ms; LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE", r->name(), delay_ms); } app_info a_info = *(r->get_app_info()); replica_info r_info; get_replica_info(r_info, r); task_ptr task = tasking::enqueue( LPC_CLOSE_REPLICA, &_tracker, [=]() { close_replica(r); }, 0, std::chrono::milliseconds(delay_ms)); _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info)); METRIC_VAR_INCREMENT(closing_replicas); return task; } void replica_stub::close_replica(replica_ptr r) { LOG_INFO("{}: start to close replica", r->name()); gpid id = r->get_gpid(); std::string name = r->name(); r->close(); { zauto_write_lock l(_replicas_lock); auto find = _closing_replicas.find(id); CHECK(find != _closing_replicas.end(), "replica {} is not in _closing_replicas", name); _closed_replicas.emplace( id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second))); _closing_replicas.erase(find); METRIC_VAR_DECREMENT(closing_replicas); } _fs_manager.remove_replica(id); if (r->is_data_corrupted()) { move_to_err_path(r->dir(), "trash replica"); METRIC_VAR_INCREMENT(moved_error_replicas); } LOG_INFO("{}: finish to close replica", name); } void replica_stub::notify_replica_state_update(const replica_configuration &config, bool is_closing) { if (nullptr != _replica_state_subscriber) { if (_is_long_subscriber) { tasking::enqueue( LPC_REPLICA_STATE_CHANGE_NOTIFICATION, &_tracker, std::bind(_replica_state_subscriber, _primary_host_port, config, is_closing)); } else { _replica_state_subscriber(_primary_host_port, config, is_closing); } } } void replica_stub::trigger_checkpoint(replica_ptr r, bool is_emergency) { r->init_checkpoint(is_emergency); } void replica_stub::handle_log_failure(error_code err) { LOG_ERROR("handle log failure: {}", err); CHECK(s_not_exit_on_log_failure, ""); } void replica_stub::open_service() { register_rpc_handler(RPC_CONFIG_PROPOSAL, "ProposeConfig", &replica_stub::on_config_proposal); register_rpc_handler(RPC_PREPARE, "prepare", &replica_stub::on_prepare); register_rpc_handler(RPC_LEARN, "Learn", &replica_stub::on_learn); register_rpc_handler_with_rpc_holder(RPC_LEARN_COMPLETION_NOTIFY, "LearnNotify", &replica_stub::on_learn_completion_notification); register_rpc_handler(RPC_LEARN_ADD_LEARNER, "LearnAdd", &replica_stub::on_add_learner); register_rpc_handler(RPC_REMOVE_REPLICA, "remove", &replica_stub::on_remove); register_rpc_handler_with_rpc_holder( RPC_GROUP_CHECK, "GroupCheck", &replica_stub::on_group_check); register_rpc_handler_with_rpc_holder( RPC_QUERY_PN_DECREE, "query_decree", &replica_stub::on_query_decree); register_rpc_handler_with_rpc_holder( RPC_QUERY_REPLICA_INFO, "query_replica_info", &replica_stub::on_query_replica_info); register_rpc_handler_with_rpc_holder(RPC_QUERY_LAST_CHECKPOINT_INFO, "query_last_checkpoint_info", &replica_stub::on_query_last_checkpoint); register_rpc_handler_with_rpc_holder( RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info); register_rpc_handler_with_rpc_holder( RPC_REPLICA_DISK_MIGRATE, "disk_migrate_replica", &replica_stub::on_disk_migrate); register_rpc_handler_with_rpc_holder( RPC_QUERY_APP_INFO, "query_app_info", &replica_stub::on_query_app_info); register_rpc_handler_with_rpc_holder(RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT, "update_child_group_partition_count", &replica_stub::on_update_child_group_partition_count); register_rpc_handler_with_rpc_holder(RPC_SPLIT_NOTIFY_CATCH_UP, "child_notify_catch_up", &replica_stub::on_notify_primary_split_catch_up); register_rpc_handler_with_rpc_holder(RPC_BULK_LOAD, "bulk_load", &replica_stub::on_bulk_load); register_rpc_handler_with_rpc_holder( RPC_GROUP_BULK_LOAD, "group_bulk_load", &replica_stub::on_group_bulk_load); register_rpc_handler_with_rpc_holder( RPC_DETECT_HOTKEY, "detect_hotkey", &replica_stub::on_detect_hotkey); register_rpc_handler_with_rpc_holder( RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk); // nfs register_async_rpc_handler(dsn::service::RPC_NFS_COPY, "copy", &replica_stub::on_nfs_copy); register_async_rpc_handler( dsn::service::RPC_NFS_GET_FILE_SIZE, "get_file_size", &replica_stub::on_nfs_get_file_size); register_ctrl_command(); } #if !defined(DSN_ENABLE_GPERF) && defined(DSN_USE_JEMALLOC) void replica_stub::register_jemalloc_ctrl_command() { _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.dump-jemalloc-stats", "Dump stats of jemalloc", fmt::format("<{}> [buffer size]", kAllJeStatsTypesStr), [](const std::vector<std::string> &args) { if (args.empty()) { return std::string("invalid arguments"); } auto type = enum_from_string(args[0].c_str(), je_stats_type::INVALID); if (type == je_stats_type::INVALID) { return std::string("invalid stats type"); } std::string stats("\n"); if (args.size() == 1) { dsn::je_dump_stats(type, stats); return stats; } uint64_t buf_sz; if (!dsn::buf2uint64(args[1], buf_sz)) { return std::string("invalid buffer size"); } dsn::je_dump_stats(type, static_cast<size_t>(buf_sz), stats); return stats; })); } #endif void replica_stub::register_ctrl_command() { /// In simple_kv test, three replica apps are created, which means that three replica_stubs are /// initialized in simple_kv test. If we don't use std::call_once, these command are registered /// for three times. And in command_manager, one same command is not allowed to be registered /// more than twice times. That is why we use std::call_once here. Same situation in /// failure_detector::register_ctrl_commands and nfs_client_impl::register_cli_commands static std::once_flag flag; std::call_once(flag, [&]() { _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.kill_partition", "Kill partitions by (all, one app, one partition)", "[app_id [partition_index]]", [this](const std::vector<std::string> &args) { dsn::gpid pid; if (args.size() == 0) { pid.set_app_id(-1); pid.set_partition_index(-1); } else if (args.size() == 1) { pid.set_app_id(atoi(args[0].c_str())); pid.set_partition_index(-1); } else if (args.size() == 2) { pid.set_app_id(atoi(args[0].c_str())); pid.set_partition_index(atoi(args[1].c_str())); } else { return std::string(ERR_INVALID_PARAMETERS.to_string()); } dsn::error_code e = this->on_kill_replica(pid); return std::string(e.to_string()); })); _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _deny_client, "replica.deny-client", "control if deny client read & write request")); _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _verbose_client_log, "replica.verbose-client-log", "control if print verbose error log when reply read & write request")); _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _verbose_commit_log, "replica.verbose-commit-log", "control if print verbose log when commit mutation")); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.trigger-checkpoint", "Trigger replicas to do checkpoint by app_id or app_id.partition_id", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [this](const replica_ptr &rep) { tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER, rep->tracker(), std::bind(&replica_stub::trigger_checkpoint, this, rep, true), rep->get_gpid().thread_hash()); return std::string("triggered"); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.query-compact", "Query full compact status on the underlying storage engine by app_id or " "app_id.partition_id", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { return rep->query_manual_compact_state(); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.query-app-envs", "Query app envs on the underlying storage engine by app_id or app_id.partition_id", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { std::map<std::string, std::string> kv_map; rep->query_app_envs(kv_map); return dsn::utils::kv_map_to_string(kv_map, ',', '='); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.enable-plog-gc", "Enable plog garbage collection for replicas specified by comma-separated list " "of 'app_id' or 'app_id.partition_id', or all replicas for empty", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { rep->update_plog_gc_enabled(true); return rep->get_plog_gc_enabled_message(); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.disable-plog-gc", "Disable plog garbage collection for replicas specified by comma-separated list " "of 'app_id' or 'app_id.partition_id', or all replicas for empty", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { rep->update_plog_gc_enabled(false); return rep->get_plog_gc_enabled_message(); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.query-plog-gc-enabled-status", "Query if plog garbage collection is enabled or disabled for replicas specified by " "comma-separated list of 'app_id' or 'app_id.partition_id', or all replicas for empty", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { return rep->get_plog_gc_enabled_message(); }); })); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.query-progress", "Query the progress of decrees, including both local writes and duplications for " "replicas specified by comma-separated list of 'app_id' or 'app_id.partition_id', " "or all replicas for empty", "[id1,id2,...]", [this](const std::vector<std::string> &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { std::ostringstream out; rapidjson::OStreamWrapper wrapper(out); dsn::json::PrettyJsonWriter writer(wrapper); rep->encode_progress(writer); return out.str(); }); })); #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, "replica.release-tcmalloc-memory", "control if try to release tcmalloc memory")); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.get-tcmalloc-status", "Get the status of tcmalloc", "", [](const std::vector<std::string> &args) { char buf[4096]; MallocExtension::instance()->GetStats(buf, 4096); return std::string(buf); })); _cmds.emplace_back(::dsn::command_manager::instance().register_int_command( _mem_release_max_reserved_mem_percentage, FLAGS_mem_release_max_reserved_mem_percentage, "replica.mem-release-max-reserved-percentage", "control tcmalloc max reserved but not-used memory percentage", &check_mem_release_max_reserved_mem_percentage)); _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "replica.release-all-reserved-memory", "Release tcmalloc all reserved-not-used memory back to operating system", "", [this](const std::vector<std::string> &args) { auto release_bytes = gc_tcmalloc_memory(true); return "OK, release_bytes=" + std::to_string(release_bytes); })); #elif defined(DSN_USE_JEMALLOC) register_jemalloc_ctrl_command(); #endif }); } std::string replica_stub::exec_command_on_replica(const std::vector<std::string> &arg_str_list, bool allow_empty_args, std::function<std::string(const replica_ptr &)> func) { static const std::string kInvalidArguments("invalid arguments"); if (arg_str_list.empty() && !allow_empty_args) { return kInvalidArguments; } replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; } std::set<gpid> required_ids; replica_map_by_gpid choosed_rs; if (!arg_str_list.empty()) { for (const auto &arg_str : arg_str_list) { std::vector<std::string> args; utils::split_args(arg_str.c_str(), args, ','); if (args.empty()) { return kInvalidArguments; } for (const std::string &arg : args) { if (arg.empty()) { continue; } gpid id; if (id.parse_from(arg.c_str())) { // Format: app_id.partition_index required_ids.insert(id); auto find = rs.find(id); if (find != rs.end()) { choosed_rs[id] = find->second; } continue; } // Must be app_id. int32_t app_id = 0; if (!buf2int32(arg, app_id)) { return kInvalidArguments; } for (const auto &[_, rep] : rs) { id = rep->get_gpid(); if (id.get_app_id() == app_id) { choosed_rs[id] = rep; } } } } } else { // all replicas choosed_rs = rs; } std::vector<task_ptr> tasks; ::dsn::zlock results_lock; std::map<gpid, std::pair<partition_status::type, std::string>> results; // id => status,result for (auto &kv : choosed_rs) { replica_ptr rep = kv.second; task_ptr tsk = tasking::enqueue( LPC_EXEC_COMMAND_ON_REPLICA, rep->tracker(), [rep, &func, &results_lock, &results]() { partition_status::type status = rep->status(); if (status != partition_status::PS_PRIMARY && status != partition_status::PS_SECONDARY) { return; } std::string result = func(rep); ::dsn::zauto_lock l(results_lock); auto &value = results[rep->get_gpid()]; value.first = status; value.second = result; }, rep->get_gpid().thread_hash()); tasks.emplace_back(std::move(tsk)); } for (auto &tsk : tasks) { tsk->wait(); } int processed = results.size(); int not_found = 0; for (auto &id : required_ids) { if (results.find(id) == results.end()) { auto &value = results[id]; value.first = partition_status::PS_INVALID; value.second = "not found"; not_found++; } } std::stringstream query_state; query_state << processed << " processed, " << not_found << " not found"; for (auto &kv : results) { query_state << "\n " << kv.first << "@" << _primary_host_port_cache; if (kv.second.first != partition_status::PS_INVALID) query_state << "@" << (kv.second.first == partition_status::PS_PRIMARY ? "P" : "S"); query_state << " : " << kv.second.second; } return query_state.str(); } void replica_stub::close() { if (!_is_running) { return; } _tracker.cancel_outstanding_tasks(); // this replica may not be opened // or is already closed by calling tool_app::stop_all_apps() // in this case, just return if (_cmds.empty()) { return; } _cmds.clear(); if (_config_sync_timer_task != nullptr) { _config_sync_timer_task->cancel(true); _config_sync_timer_task = nullptr; } if (_duplication_sync_timer != nullptr) { _duplication_sync_timer->close(); _duplication_sync_timer = nullptr; } if (_config_query_task != nullptr) { _config_query_task->cancel(true); _config_query_task = nullptr; } _state = NS_Disconnected; if (_disk_stat_timer_task != nullptr) { _disk_stat_timer_task->cancel(true); _disk_stat_timer_task = nullptr; } if (_replicas_stat_timer_task != nullptr) { _replicas_stat_timer_task->cancel(true); _replicas_stat_timer_task = nullptr; } if (_mem_release_timer_task != nullptr) { _mem_release_timer_task->cancel(true); _mem_release_timer_task = nullptr; } wait_closing_replicas_finished(); { zauto_write_lock l(_replicas_lock); while (!_opening_replicas.empty()) { task_ptr task = _opening_replicas.begin()->second; _replicas_lock.unlock_write(); task->cancel(true); METRIC_VAR_DECREMENT(opening_replicas); _replicas_lock.lock_write(); _opening_replicas.erase(_opening_replicas.begin()); } while (!_replicas.empty()) { _replicas.begin()->second->close(); METRIC_VAR_DECREMENT(total_replicas); _replicas.erase(_replicas.begin()); } } _is_running = false; } #ifdef DSN_ENABLE_GPERF // Get tcmalloc numeric property (name is "prop") value. // Return -1 if get property failed (property we used will be greater than zero) // Properties can be found in 'gperftools/malloc_extension.h' static int64_t get_tcmalloc_numeric_property(const char *prop) { size_t value; if (!::MallocExtension::instance()->GetNumericProperty(prop, &value)) { LOG_ERROR("Failed to get tcmalloc property {}", prop); return -1; } return value; } uint64_t replica_stub::gc_tcmalloc_memory(bool release_all) { if (!_release_tcmalloc_memory) { _is_releasing_memory.store(false); return 0; } if (_is_releasing_memory.load()) { LOG_WARNING("This node is releasing memory..."); return 0; } _is_releasing_memory.store(true); int64_t total_allocated_bytes = get_tcmalloc_numeric_property("generic.current_allocated_bytes"); int64_t reserved_bytes = get_tcmalloc_numeric_property("tcmalloc.pageheap_free_bytes"); if (total_allocated_bytes == -1 || reserved_bytes == -1) { return 0; } int64_t max_reserved_bytes = release_all ? 0 : (total_allocated_bytes * _mem_release_max_reserved_mem_percentage / 100.0); if (reserved_bytes <= max_reserved_bytes) { return 0; } const int64_t expected_released_bytes = reserved_bytes - max_reserved_bytes; LOG_INFO("Memory release started, almost {} bytes will be released", expected_released_bytes); int64_t unreleased_bytes = expected_released_bytes; while (unreleased_bytes > 0) { // tcmalloc releasing memory will lock page heap, release 1MB at a time to avoid locking // page heap for long time static const int64_t kReleasedBytesEachTime = 1024 * 1024; ::MallocExtension::instance()->ReleaseToSystem(kReleasedBytesEachTime); unreleased_bytes -= kReleasedBytesEachTime; } METRIC_VAR_INCREMENT_BY(tcmalloc_released_bytes, expected_released_bytes); _is_releasing_memory.store(false); return expected_released_bytes; } #endif // // partition split // void replica_stub::create_child_replica(const host_port &primary_address, app_info app, ballot init_ballot, gpid child_gpid, gpid parent_gpid, const std::string &parent_dir) { replica_ptr child_replica = create_child_replica_if_not_found(child_gpid, &app, parent_dir); if (child_replica != nullptr) { LOG_INFO("app({}), create child replica ({}) succeed", app.app_name, child_gpid); tasking::enqueue(LPC_PARTITION_SPLIT, child_replica->tracker(), std::bind(&replica_split_manager::child_init_replica, child_replica->get_split_manager(), parent_gpid, primary_address, init_ballot), child_gpid.thread_hash()); } else { LOG_WARNING("failed to create child replica ({}), ignore it and wait next run", child_gpid); split_replica_error_handler( parent_gpid, std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); } } replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir) { FAIL_POINT_INJECT_F( "replica_stub_create_child_replica_if_not_found", [=](std::string_view) -> replica_ptr { const auto dn = _fs_manager.create_child_replica_dir(app->app_type, child_pid, parent_dir); CHECK_NOTNULL(dn, ""); auto *rep = new replica(this, child_pid, *app, dn, false); rep->_config.status = partition_status::PS_INACTIVE; _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); LOG_INFO("mock create_child_replica_if_not_found succeed"); return rep; }); zauto_write_lock l(_replicas_lock); const auto it = _replicas.find(child_pid); if (it != _replicas.end()) { return it->second; } if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { LOG_WARNING("failed create child replica({}) because it is under open", child_pid); return nullptr; } if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { LOG_WARNING("failed create child replica({}) because it is under close", child_pid); return nullptr; } replica *rep = new_replica(child_pid, *app, false, false, parent_dir); if (rep == nullptr) { return nullptr; } const auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); CHECK(pr.second, "child replica {} has been existed", rep->name()); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(child_pid); return rep; } // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::split_replica_error_handler(gpid pid, local_execution handler) { split_replica_exec(LPC_PARTITION_SPLIT_ERROR, pid, handler); } // ThreadPool: THREAD_POOL_REPLICATION dsn::error_code replica_stub::split_replica_exec(dsn::task_code code, gpid pid, local_execution handler) { FAIL_POINT_INJECT_F("replica_stub_split_replica_exec", [](std::string_view) { return ERR_OK; }); replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); if (replica && handler) { tasking::enqueue( code, replica.get()->tracker(), [handler, replica]() { handler(replica->get_split_manager()); }, pid.thread_hash()); return ERR_OK; } LOG_WARNING("replica({}) is invalid", pid); return ERR_OBJECT_NOT_FOUND; } // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc) { const notify_catch_up_request &request = rpc.request(); notify_cacth_up_response &response = rpc.response(); replica_ptr replica = get_replica(request.parent_gpid); if (replica != nullptr) { replica->get_split_manager()->parent_handle_child_catch_up(request, response); } else { response.err = ERR_OBJECT_NOT_FOUND; } } // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::on_update_child_group_partition_count(update_child_group_partition_count_rpc rpc) { const auto &request = rpc.request(); auto &response = rpc.response(); replica_ptr replica = get_replica(request.child_pid); if (replica != nullptr) { replica->get_split_manager()->on_update_child_group_partition_count(request, response); } else { response.err = ERR_OBJECT_NOT_FOUND; } } void replica_stub::update_disk_holding_replicas() { for (const auto &dn : _fs_manager.get_dir_nodes()) { dn->holding_primary_replicas.clear(); dn->holding_secondary_replicas.clear(); for (const auto &holding_replicas : dn->holding_replicas) { const auto &pids = holding_replicas.second; for (const auto &pid : pids) { const auto rep = get_replica(pid); if (rep == nullptr) { continue; } if (rep->status() == partition_status::PS_PRIMARY) { dn->holding_primary_replicas[holding_replicas.first].emplace(pid); } else if (rep->status() == partition_status::PS_SECONDARY) { dn->holding_secondary_replicas[holding_replicas.first].emplace(pid); } } } } } void replica_stub::on_bulk_load(bulk_load_rpc rpc) { const bulk_load_request &request = rpc.request(); bulk_load_response &response = rpc.response(); LOG_INFO("[{}@{}]: receive bulk load request", request.pid, _primary_host_port_cache); replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { rep->get_bulk_loader()->on_bulk_load(request, response); } else { LOG_ERROR("replica({}) is not existed", request.pid); response.err = ERR_OBJECT_NOT_FOUND; } } void replica_stub::on_group_bulk_load(group_bulk_load_rpc rpc) { const group_bulk_load_request &request = rpc.request(); group_bulk_load_response &response = rpc.response(); LOG_INFO("[{}@{}]: received group bulk load request, primary = {}, ballot = {}, " "meta_bulk_load_status = {}", request.config.pid, _primary_host_port_cache, FMT_HOST_PORT_AND_IP(request.config, primary), request.config.ballot, enum_to_string(request.meta_bulk_load_status)); replica_ptr rep = get_replica(request.config.pid); if (rep != nullptr) { rep->get_bulk_loader()->on_group_bulk_load(request, response); } else { LOG_ERROR("replica({}) is not existed", request.config.pid); response.err = ERR_OBJECT_NOT_FOUND; } } void replica_stub::on_detect_hotkey(detect_hotkey_rpc rpc) { const auto &request = rpc.request(); auto &response = rpc.response(); LOG_INFO("[{}@{}]: received detect hotkey request, hotkey_type = {}, detect_action = {}", request.pid, _primary_host_port_cache, enum_to_string(request.type), enum_to_string(request.action)); replica_ptr rep = get_replica(request.pid); if (rep != nullptr) { rep->on_detect_hotkey(request, response); } else { response.err = ERR_OBJECT_NOT_FOUND; response.err_hint = fmt::format("not find the replica {} \n", request.pid); } } void replica_stub::query_app_data_version( int32_t app_id, /*pidx => data_version*/ std::unordered_map<int32_t, uint32_t> &version_map) { zauto_read_lock l(_replicas_lock); for (const auto &kv : _replicas) { if (kv.first.get_app_id() == app_id) { replica_ptr rep = kv.second; if (rep != nullptr) { uint32_t data_version = rep->query_data_version(); version_map[kv.first.get_partition_index()] = data_version; } } } } void replica_stub::query_app_manual_compact_status( int32_t app_id, std::unordered_map<gpid, manual_compaction_status::type> &status) { zauto_read_lock l(_replicas_lock); for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { if (it->first.get_app_id() == app_id) { status[it->first] = it->second->get_manual_compact_status(); } } } void replica_stub::update_config(const std::string &name) { // The new value has been validated and FLAGS_* has been updated, it's safety to use it // directly. UPDATE_CONFIG(_config_sync_timer_task->update_interval, config_sync_interval_ms, name); } void replica_stub::wait_closing_replicas_finished() { zauto_write_lock l(_replicas_lock); while (!_closing_replicas.empty()) { auto task = std::get<0>(_closing_replicas.begin()->second); auto first_gpid = _closing_replicas.begin()->first; // TODO(yingchun): improve the code _replicas_lock.unlock_write(); task->wait(); _replicas_lock.lock_write(); // task will automatically remove this replica from '_closing_replicas' if (!_closing_replicas.empty()) { CHECK_NE_MSG(first_gpid, _closing_replicas.begin()->first, "this replica '{}' should has been removed", first_gpid); } } } } // namespace replication } // namespace dsn