src/meta/duplication/meta_duplication_service.cpp (520 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <fmt/core.h> #include <fmt/ostream.h> #include <algorithm> #include <cstdint> #include <iosfwd> #include <queue> #include <type_traits> #include "common//duplication_common.h" #include "common/common.h" #include "common/gpid.h" #include "common/replication.codes.h" #include "common/replication_other_types.h" #include "dsn.layer2_types.h" #include "duplication_types.h" #include "meta/meta_service.h" #include "meta/meta_state_service_utils.h" #include "meta_admin_types.h" #include "meta_duplication_service.h" #include "metadata_types.h" #include "runtime/api_layer1.h" #include "runtime/rpc/group_address.h" #include "runtime/rpc/rpc_address.h" #include "runtime/rpc/rpc_message.h" #include "runtime/rpc/serialization.h" #include "runtime/task/async_calls.h" #include "utils/blob.h" #include "utils/chrono_literals.h" #include "utils/error_code.h" #include "utils/errors.h" #include "utils/fail_point.h" #include "utils/fmt_logging.h" #include "utils/ports.h" #include "utils/string_conv.h" #include "utils/string_view.h" #include "utils/zlocks.h" namespace dsn { namespace replication { using namespace literals::chrono_literals; // ThreadPool(READ): THREAD_POOL_META_SERVER void meta_duplication_service::query_duplication_info(const duplication_query_request &request, duplication_query_response &response) { LOG_INFO("query duplication info for app: {}", request.app_name); response.err = ERR_OK; { zauto_read_lock l(app_lock()); std::shared_ptr<app_state> app = _state->get_app(request.app_name); if (!app || app->status != app_status::AS_AVAILABLE) { response.err = ERR_APP_NOT_EXIST; } else { response.appid = app->app_id; for (auto &dup_id_to_info : app->duplications) { const duplication_info_s_ptr &dup = dup_id_to_info.second; dup->append_if_valid_for_query(*app, response.entry_list); } } } } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::modify_duplication(duplication_modify_rpc rpc) { const auto &request = rpc.request(); auto &response = rpc.response(); LOG_INFO("modify duplication({}) to [status={},fail_mode={}] for app({})", request.dupid, request.__isset.status ? duplication_status_to_string(request.status) : "nil", request.__isset.fail_mode ? duplication_fail_mode_to_string(request.fail_mode) : "nil", request.app_name); dupid_t dupid = request.dupid; std::shared_ptr<app_state> app = _state->get_app(request.app_name); if (!app || app->status != app_status::AS_AVAILABLE) { response.err = ERR_APP_NOT_EXIST; return; } auto it = app->duplications.find(dupid); if (it == app->duplications.end()) { response.err = ERR_OBJECT_NOT_FOUND; return; } duplication_info_s_ptr dup = it->second; auto to_status = request.__isset.status ? request.status : dup->status(); auto to_fail_mode = request.__isset.fail_mode ? request.fail_mode : dup->fail_mode(); response.err = dup->alter_status(to_status, to_fail_mode); if (response.err != ERR_OK) { return; } if (!dup->is_altering()) { return; } // validation passed do_modify_duplication(app, dup, rpc); } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::do_modify_duplication(std::shared_ptr<app_state> &app, duplication_info_s_ptr &dup, duplication_modify_rpc &rpc) { if (rpc.request().status == duplication_status::DS_REMOVED) { _meta_svc->get_meta_storage()->delete_node_recursively( std::string(dup->store_path), [rpc, this, app, dup]() { dup->persist_status(); rpc.response().err = ERR_OK; rpc.response().appid = app->app_id; if (rpc.request().status == duplication_status::DS_REMOVED) { zauto_write_lock l(app_lock()); app->duplications.erase(dup->id); refresh_duplicating_no_lock(app); } }); return; } // store the duplication in requested status. blob value = dup->to_json_blob(); _meta_svc->get_meta_storage()->set_data( std::string(dup->store_path), std::move(value), [rpc, app, dup]() { dup->persist_status(); rpc.response().err = ERR_OK; rpc.response().appid = app->app_id; }); } // This call will not recreate if the duplication // with the same app name and remote end point already exists. // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::add_duplication(duplication_add_rpc rpc) { const auto &request = rpc.request(); auto &response = rpc.response(); LOG_INFO("add duplication for app({}), remote cluster name is {}", request.app_name, request.remote_cluster_name); response.err = ERR_OK; if (request.remote_cluster_name == get_current_cluster_name()) { response.err = ERR_INVALID_PARAMETERS; response.__set_hint("illegal operation: adding duplication to itself"); return; } auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name); if (!remote_cluster_id.is_ok()) { response.err = ERR_INVALID_PARAMETERS; response.__set_hint(fmt::format("get_duplication_cluster_id({}) failed, error: {}", request.remote_cluster_name, remote_cluster_id.get_error())); return; } std::vector<rpc_address> meta_list; if (!dsn::replication::replica_helper::load_meta_servers( meta_list, duplication_constants::kClustersSectionName.c_str(), request.remote_cluster_name.c_str())) { response.err = ERR_INVALID_PARAMETERS; response.__set_hint(fmt::format("failed to find cluster[{}] address in config [{}]", request.remote_cluster_name, duplication_constants::kClustersSectionName)); return; } auto app = _state->get_app(request.app_name); if (!app || app->status != app_status::AS_AVAILABLE) { response.err = ERR_APP_NOT_EXIST; return; } duplication_info_s_ptr dup; for (const auto &ent : app->duplications) { auto it = ent.second; if (it->follower_cluster_name == request.remote_cluster_name) { dup = ent.second; break; } } if (!dup) { dup = new_dup_from_init(request.remote_cluster_name, std::move(meta_list), app); } do_add_duplication(app, dup, rpc); } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &app, duplication_info_s_ptr &dup, duplication_add_rpc &rpc) { const auto err = dup->start(rpc.request().is_duplicating_checkpoint); if (dsn_unlikely(err != ERR_OK)) { LOG_ERROR("start dup[{}({})] failed: err = {}", app->app_name, dup->id, err.to_string()); return; } blob value = dup->to_json_blob(); std::queue<std::string> nodes({get_duplication_path(*app), std::to_string(dup->id)}); _meta_svc->get_meta_storage()->create_node_recursively( std::move(nodes), std::move(value), [app, this, dup, rpc]() mutable { LOG_INFO("[{}] add duplication successfully [app_name: {}, follower: {}]", dup->log_prefix(), app->app_name, dup->follower_cluster_name); // The duplication starts only after it's been persisted. dup->persist_status(); auto &resp = rpc.response(); resp.err = ERR_OK; resp.appid = app->app_id; resp.dupid = dup->id; zauto_write_lock l(app_lock()); refresh_duplicating_no_lock(app); }); } /// get all available apps on node `ns` void meta_duplication_service::get_all_available_app( const node_state &ns, std::map<int32_t, std::shared_ptr<app_state>> &app_map) const { ns.for_each_partition([this, &ns, &app_map](const gpid &pid) -> bool { if (ns.served_as(pid) != partition_status::PS_PRIMARY) { return true; } std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id()); if (!app || app->status != app_status::AS_AVAILABLE) { return true; } // must have duplication if (app->duplications.empty()) { return true; } if (app_map.find(app->app_id) == app_map.end()) { app_map.emplace(std::make_pair(pid.get_app_id(), std::move(app))); } return true; }); } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc) { auto &request = rpc.request(); auto &response = rpc.response(); response.err = ERR_OK; node_state *ns = get_node_state(_state->_nodes, request.node, false); if (ns == nullptr) { LOG_WARNING("node({}) is not found in meta server", request.node.to_string()); response.err = ERR_OBJECT_NOT_FOUND; return; } std::map<int32_t, std::shared_ptr<app_state>> app_map; get_all_available_app(*ns, app_map); for (const auto &kv : app_map) { int32_t app_id = kv.first; const auto &app = kv.second; for (const auto &kv2 : app->duplications) { dupid_t dup_id = kv2.first; const auto &dup = kv2.second; if (dup->is_invalid_status()) { continue; } if (dup->status() < duplication_status::DS_LOG && dup->all_checkpoint_has_prepared()) { if (dup->status() == duplication_status::DS_PREPARE) { create_follower_app_for_duplication(dup, app); } else if (dup->status() == duplication_status::DS_APP) { check_follower_app_if_create_completed(dup); } } response.dup_map[app_id][dup_id] = dup->to_duplication_entry(); // report progress periodically for each duplications dup->report_progress_if_time_up(); } } /// update progress for (const auto &kv : request.confirm_list) { gpid gpid = kv.first; auto it = app_map.find(gpid.get_app_id()); if (it == app_map.end()) { // app is unsynced // Since duplication-sync separates with config-sync, it's not guaranteed to have the // latest state. duplication-sync has a loose consistency requirement. continue; } std::shared_ptr<app_state> &app = it->second; for (const duplication_confirm_entry &confirm : kv.second) { auto it2 = app->duplications.find(confirm.dupid); if (it2 == app->duplications.end()) { // dup is unsynced continue; } duplication_info_s_ptr &dup = it2->second; if (dup->is_invalid_status()) { continue; } do_update_partition_confirmed(dup, rpc, gpid.get_partition_index(), confirm); } } } void meta_duplication_service::create_follower_app_for_duplication( const std::shared_ptr<duplication_info> &dup, const std::shared_ptr<app_state> &app) { configuration_create_app_request request; request.app_name = app->app_name; request.options.app_type = app->app_type; request.options.partition_count = app->partition_count; request.options.replica_count = app->max_replica_count; request.options.success_if_exist = false; request.options.envs = app->envs; request.options.is_stateful = app->is_stateful; // add envs for follower table, which will use it know itself is `follower` and load master info // - env map: // `kDuplicationEnvMasterClusterKey=>{master_cluster_name}` // `kDuplicationEnvMasterMetasKey=>{master_meta_list}` request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, get_current_cluster_name()); request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, _meta_svc->get_meta_list_string()); rpc_address meta_servers; meta_servers.assign_group(dup->follower_cluster_name.c_str()); meta_servers.group_address()->add_list(dup->follower_cluster_metas); dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP); dsn::marshall(msg, request); rpc::call( meta_servers, msg, _meta_svc->tracker(), [=](error_code err, configuration_create_app_response &&resp) mutable { FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok", [&](string_view s) -> void { err = ERR_OK; }); error_code create_err = err == ERR_OK ? resp.err : err; error_code update_err = ERR_NO_NEED_OPERATE; FAIL_POINT_INJECT_NOT_RETURN_F("persist_dup_status_failed", [&](string_view s) -> void { create_err = ERR_OK; }); if (create_err == ERR_OK) { update_err = dup->alter_status(duplication_status::DS_APP); } FAIL_POINT_INJECT_F("persist_dup_status_failed", [&](string_view s) -> void { return; }); if (update_err == ERR_OK) { blob value = dup->to_json_blob(); // Note: this function is `async`, it may not be persisted completed // after executing, now using `_is_altering` to judge whether `updating` or // `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY` _meta_svc->get_meta_storage()->set_data(std::string(dup->store_path), std::move(value), [=]() { dup->persist_status(); }); } else { LOG_ERROR("created follower app[{}.{}] to trigger duplicate checkpoint failed: " "duplication_status = {}, create_err = {}, update_err = {}", dup->follower_cluster_name, dup->app_name, duplication_status_to_string(dup->status()), create_err.to_string(), update_err.to_string()); } }); } void meta_duplication_service::check_follower_app_if_create_completed( const std::shared_ptr<duplication_info> &dup) { rpc_address meta_servers; meta_servers.assign_group(dup->follower_cluster_name.c_str()); meta_servers.group_address()->add_list(dup->follower_cluster_metas); query_cfg_request meta_config_request; meta_config_request.app_name = dup->app_name; dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX); dsn::marshall(msg, meta_config_request); rpc::call(meta_servers, msg, _meta_svc->tracker(), [=](error_code err, query_cfg_response &&resp) mutable { FAIL_POINT_INJECT_NOT_RETURN_F("create_app_ok", [&](string_view s) -> void { err = ERR_OK; int count = dup->partition_count; while (count-- > 0) { partition_configuration p; p.primary = rpc_address("127.0.0.1", 34801); p.secondaries.emplace_back(rpc_address("127.0.0.2", 34801)); p.secondaries.emplace_back(rpc_address("127.0.0.3", 34801)); resp.partitions.emplace_back(p); } }); // - ERR_INCONSISTENT_STATE: partition count of response isn't equal with local // - ERR_INACTIVE_STATE: the follower table hasn't been healthy error_code query_err = err == ERR_OK ? resp.err : err; if (query_err == ERR_OK) { if (resp.partitions.size() != dup->partition_count) { query_err = ERR_INCONSISTENT_STATE; } else { for (const auto &partition : resp.partitions) { if (partition.primary.is_invalid()) { query_err = ERR_INACTIVE_STATE; break; } if (partition.secondaries.empty()) { query_err = ERR_NOT_ENOUGH_MEMBER; break; } for (const auto &secondary : partition.secondaries) { if (secondary.is_invalid()) { query_err = ERR_INACTIVE_STATE; break; } } } } } error_code update_err = ERR_NO_NEED_OPERATE; if (query_err == ERR_OK) { update_err = dup->alter_status(duplication_status::DS_LOG); } FAIL_POINT_INJECT_F("persist_dup_status_failed", [&](string_view s) -> void { return; }); if (update_err == ERR_OK) { blob value = dup->to_json_blob(); // Note: this function is `async`, it may not be persisted completed // after executing, now using `_is_altering` to judge whether `updating` or // `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY` _meta_svc->get_meta_storage()->set_data(std::string(dup->store_path), std::move(value), [dup]() { dup->persist_status(); }); } else { LOG_ERROR( "query follower app[{}.{}] replica configuration completed, result: " "duplication_status = {}, query_err = {}, update_err = {}", dup->follower_cluster_name, dup->app_name, duplication_status_to_string(dup->status()), query_err.to_string(), update_err); } }); } void meta_duplication_service::do_update_partition_confirmed( duplication_info_s_ptr &dup, duplication_sync_rpc &rpc, int32_t partition_idx, const duplication_confirm_entry &confirm_entry) { if (dup->alter_progress(partition_idx, confirm_entry)) { std::string path = get_partition_path(dup, std::to_string(partition_idx)); blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); _meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable { if (data.length() == 0) { _meta_svc->get_meta_storage()->create_node( std::string(path), std::move(value), [=]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); } else { _meta_svc->get_meta_storage()->set_data( std::string(path), std::move(value), [=]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); } // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. }); } } std::shared_ptr<duplication_info> meta_duplication_service::new_dup_from_init(const std::string &follower_cluster_name, std::vector<rpc_address> &&follower_cluster_metas, std::shared_ptr<app_state> &app) const { duplication_info_s_ptr dup; // use current time to identify this duplication. auto dupid = static_cast<dupid_t>(dsn_now_ms() / 1000); { zauto_write_lock l(app_lock()); // hold write lock here to ensure that dupid is unique while (app->duplications.find(dupid) != app->duplications.end()) dupid++; std::string dup_path = get_duplication_path(*app, std::to_string(dupid)); dup = std::make_shared<duplication_info>(dupid, app->app_id, app->app_name, app->partition_count, dsn_now_ms(), follower_cluster_name, std::move(follower_cluster_metas), std::move(dup_path)); for (int32_t i = 0; i < app->partition_count; i++) { dup->init_progress(i, invalid_decree); } app->duplications.emplace(dup->id, dup); } return dup; } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::recover_from_meta_state() { LOG_INFO("recovering duplication states from meta storage"); // /<app>/duplication/<dupid>/<partition_idx> // | |-> confirmed_decree // | // |-> json of dup info for (const auto &kv : _state->_exist_apps) { std::shared_ptr<app_state> app = kv.second; if (app->status != app_status::AS_AVAILABLE) { continue; } _meta_svc->get_meta_storage()->get_children( get_duplication_path(*app), [this, app](bool node_exists, const std::vector<std::string> &dup_id_list) { if (!node_exists) { // if there's no duplication return; } for (const std::string &raw_dup_id : dup_id_list) { dupid_t dup_id; if (!buf2int32(raw_dup_id, dup_id)) { // unlikely LOG_ERROR("invalid duplication path: {}", get_duplication_path(*app, raw_dup_id)); return; } do_restore_duplication(dup_id, app); } }); } } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::do_restore_duplication_progress( const duplication_info_s_ptr &dup, const std::shared_ptr<app_state> &app) { for (int partition_idx = 0; partition_idx < app->partition_count; partition_idx++) { std::string str_pidx = std::to_string(partition_idx); // <app_path>/duplication/<dup_id>/<partition_index> std::string partition_path = get_partition_path(dup, str_pidx); _meta_svc->get_meta_storage()->get_data( std::move(partition_path), [dup, partition_idx](const blob &value) { // value is confirmed_decree encoded in string. if (value.size() == 0) { // not found dup->init_progress(partition_idx, invalid_decree); return; } int64_t confirmed_decree = invalid_decree; if (!buf2int64(value, confirmed_decree)) { LOG_ERROR("[{}] invalid confirmed_decree {} on partition_idx {}", dup->log_prefix(), value.to_string(), partition_idx); return; // fail fast } dup->init_progress(partition_idx, confirmed_decree); LOG_INFO( "[{}] initialize progress from metastore [partition_idx: {}, confirmed: {}]", dup->log_prefix(), partition_idx, confirmed_decree); }); } } // ThreadPool(WRITE): THREAD_POOL_META_STATE void meta_duplication_service::do_restore_duplication(dupid_t dup_id, std::shared_ptr<app_state> app) { std::string store_path = get_duplication_path(*app, std::to_string(dup_id)); // restore duplication info from json _meta_svc->get_meta_storage()->get_data( std::string(store_path), [ dup_id, this, app = std::move(app), store_path ](const blob &json) { zauto_write_lock l(app_lock()); auto dup = duplication_info::decode_from_blob( dup_id, app->app_id, app->app_name, app->partition_count, store_path, json); if (nullptr == dup) { LOG_ERROR("failed to decode json \"{}\" on path {}", json.to_string(), store_path); return; // fail fast } if (!dup->is_invalid_status()) { app->duplications[dup->id] = dup; refresh_duplicating_no_lock(app); // restore progress do_restore_duplication_progress(dup, app); } }); } } // namespace replication } // namespace dsn