src/client/replication_ddl_client.cpp (1,539 lines of code) (raw):

/* * The MIT License (MIT) * * Copyright (c) 2015 Microsoft Corporation * * -=- Robust Distributed System Nucleus (rDSN) -=- * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include "replication_ddl_client.h" // IWYU pragma: no_include <ext/alloc_traits.h> #include <string.h> #include <algorithm> #include <cctype> #include <cstdint> #include <fstream> #include <iomanip> #include <iostream> #include <set> #include "backup_types.h" #include "common//duplication_common.h" #include "common/bulk_load_common.h" #include "common/gpid.h" #include "common/manual_compact.h" #include "common/partition_split_common.h" #include "common/replication.codes.h" #include "common/replication_common.h" #include "common/replication_enums.h" #include "fmt/ostream.h" #include "meta/meta_rpc_types.h" #include "runtime/api_layer1.h" #include "runtime/rpc/group_address.h" #include "utils/error_code.h" #include "utils/fmt_logging.h" #include "utils/output_utils.h" #include "utils/time_utils.h" #include "utils/utils.h" DSN_DEFINE_uint32(ddl_client, ddl_client_max_attempt_count, 3, "The max count that attempt for failed requests to meta server."); DSN_DEFINE_validator(ddl_client_max_attempt_count, [](uint32_t value) -> bool { return value > 0; }); DSN_DEFINE_uint32(ddl_client, ddl_client_retry_interval_ms, 10 * 1000, "The retry interval after receiving error from meta server."); namespace dsn { namespace replication { #define VALIDATE_TABLE_NAME(app_name) \ do { \ if (app_name.empty() || \ !std::all_of(app_name.cbegin(), \ app_name.cend(), \ (bool (*)(int))replication_ddl_client::valid_app_char)) \ return FMT_ERR(ERR_INVALID_PARAMETERS, "Invalid name. Only 0-9a-zA-Z.:_ are valid!"); \ } while (false) using tp_output_format = ::dsn::utils::table_printer::output_format; replication_ddl_client::replication_ddl_client(const std::vector<dsn::rpc_address> &meta_servers) { _meta_server.assign_group("meta-servers"); for (const auto &m : meta_servers) { if (!_meta_server.group_address()->add(m)) { LOG_WARNING("duplicate adress {}", m); } } } replication_ddl_client::~replication_ddl_client() { _tracker.cancel_outstanding_tasks(); } dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_name, int partition_count, int max_replica_count) { do { uint32_t one_step_wait_sec = std::min(_max_wait_secs, 2U); std::this_thread::sleep_for(std::chrono::seconds(one_step_wait_sec)); _max_wait_secs -= one_step_wait_sec; auto query_req = std::make_shared<query_cfg_request>(); query_req->app_name = app_name; auto query_task = request_meta(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, query_req); query_task->wait(); if (query_task->error() == ERR_INVALID_STATE) { std::cout << app_name << " not ready yet, still waiting..." << std::endl; continue; } if (query_task->error() != dsn::ERR_OK) { std::cout << "create app " << app_name << " failed: [query] call server error: " << query_task->error().to_string() << std::endl; return query_task->error(); } dsn::query_cfg_response query_resp; ::dsn::unmarshall(query_task->get_response(), query_resp); if (query_resp.err != dsn::ERR_OK) { std::cout << "create app " << app_name << " failed: [query] received server error: " << query_resp.err.to_string() << std::endl; return query_resp.err; } CHECK_EQ(partition_count, query_resp.partition_count); int ready_count = 0; for (int i = 0; i < partition_count; i++) { const partition_configuration &pc = query_resp.partitions[i]; if (!pc.primary.is_invalid() && (pc.secondaries.size() + 1 >= max_replica_count)) { ready_count++; } } if (ready_count == partition_count) { std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count << ")" << std::endl; break; } std::cout << app_name << " not ready yet, still waiting... (" << ready_count << "/" << partition_count << ")" << std::endl; } while (_max_wait_secs > 0); return dsn::ERR_OK; } dsn::error_code replication_ddl_client::create_app(const std::string &app_name, const std::string &app_type, int partition_count, int replica_count, const std::map<std::string, std::string> &envs, bool is_stateless, bool success_if_exist) { if (partition_count < 1) { std::cout << "create app " << app_name << " failed: partition_count should >= 1" << std::endl; return ERR_INVALID_PARAMETERS; } if (replica_count < 1) { std::cout << "create app " << app_name << " failed: replica_count should >= 1" << std::endl; return ERR_INVALID_PARAMETERS; } if (app_name.empty() || !std::all_of(app_name.cbegin(), app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) { std::cout << "create app " << app_name << " failed: invalid app_name" << std::endl; return ERR_INVALID_PARAMETERS; } if (app_type.empty() || !std::all_of(app_type.cbegin(), app_type.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) { std::cout << "create app " << app_name << " failed: invalid app_type" << std::endl; return ERR_INVALID_PARAMETERS; } auto req = std::make_shared<configuration_create_app_request>(); req->app_name = app_name; req->options.partition_count = partition_count; req->options.replica_count = replica_count; req->options.success_if_exist = success_if_exist; req->options.app_type = app_type; req->options.envs = envs; req->options.is_stateful = !is_stateless; dsn::replication::configuration_create_app_response resp; auto resp_task = request_meta_and_wait_response(RPC_CM_CREATE_APP, req, resp); if (resp_task->error() != dsn::ERR_OK) { std::cout << "create app " << app_name << " failed: [create] call server error: " << resp_task->error().to_string() << std::endl; return resp_task->error(); } if (resp.err != dsn::ERR_OK) { std::cout << "create app " << app_name << " failed: [create] received server error: " << resp.err.to_string() << std::endl; return resp.err; } std::cout << "create app " << app_name << " succeed, waiting for app ready" << std::endl; dsn::error_code error = wait_app_ready(app_name, partition_count, replica_count); if (error == dsn::ERR_OK) std::cout << app_name << " is ready now!" << std::endl; return error; } dsn::error_code replication_ddl_client::drop_app(const std::string &app_name, int reserve_seconds) { if (app_name.empty() || !std::all_of(app_name.cbegin(), app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) return ERR_INVALID_PARAMETERS; auto req = std::make_shared<configuration_drop_app_request>(); req->app_name = app_name; req->options.success_if_not_exist = true; req->options.__set_reserve_seconds(reserve_seconds); dsn::replication::configuration_drop_app_response resp; auto resp_task = request_meta_and_wait_response(RPC_CM_DROP_APP, req, resp); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } if (resp.err != dsn::ERR_OK) { return resp.err; } return dsn::ERR_OK; } dsn::error_code replication_ddl_client::recall_app(int32_t app_id, const std::string &new_app_name) { if (!std::all_of(new_app_name.cbegin(), new_app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) return ERR_INVALID_PARAMETERS; auto req = std::make_shared<configuration_recall_app_request>(); req->app_id = app_id; req->new_app_name = new_app_name; dsn::replication::configuration_recall_app_response resp; auto resp_task = request_meta_and_wait_response(RPC_CM_RECALL_APP, req, resp); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } if (resp.err != dsn::ERR_OK) { return resp.err; } std::cout << "recall app ok, id(" << resp.info.app_id << "), " << "name(" << resp.info.app_name << "), " << "partition_count(" << resp.info.partition_count << "), wait it ready" << std::endl; return wait_app_ready( resp.info.app_name, resp.info.partition_count, resp.info.max_replica_count); } dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type status, std::vector<::dsn::app_info> &apps) { auto req = std::make_shared<configuration_list_apps_request>(); req->status = status; auto resp_task = request_meta(RPC_CM_LIST_APPS, req); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } dsn::replication::configuration_list_apps_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } apps = resp.infos; return dsn::ERR_OK; } dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type status, bool show_all, bool detailed, bool json, const std::string &file_name) { std::vector<::dsn::app_info> apps; auto r = list_apps(status, apps); if (r != dsn::ERR_OK) { return r; } // print configuration_list_apps_response std::streambuf *buf; std::ofstream of; if (!file_name.empty()) { of.open(file_name); buf = of.rdbuf(); } else { buf = std::cout.rdbuf(); } std::ostream out(buf); size_t max_app_name_size = 20; for (int i = 0; i < apps.size(); i++) { dsn::app_info info = apps[i]; if (!show_all && info.status != app_status::AS_AVAILABLE) { continue; } max_app_name_size = std::max(max_app_name_size, info.app_name.size() + 2); } dsn::utils::multi_table_printer mtp; dsn::utils::table_printer tp_general("general_info"); tp_general.add_title("app_id"); tp_general.add_column("status"); tp_general.add_column("app_name"); tp_general.add_column("app_type"); tp_general.add_column("partition_count"); tp_general.add_column("replica_count"); tp_general.add_column("is_stateful"); tp_general.add_column("create_time"); tp_general.add_column("drop_time"); tp_general.add_column("drop_expire"); tp_general.add_column("envs_count"); int available_app_count = 0; for (int i = 0; i < apps.size(); i++) { dsn::app_info info = apps[i]; if (!show_all && info.status != app_status::AS_AVAILABLE) { continue; } std::string status_str = enum_to_string(info.status); status_str = status_str.substr(status_str.find("AS_") + 3); std::string create_time = "-"; if (info.create_second > 0) { char buf[20]; dsn::utils::time_ms_to_date_time((uint64_t)info.create_second * 1000, buf, 20); buf[10] = '_'; create_time = buf; } std::string drop_time = "-"; std::string drop_expire_time = "-"; if (info.status == app_status::AS_AVAILABLE) { available_app_count++; } else if (info.status == app_status::AS_DROPPED && info.expire_second > 0) { if (info.drop_second > 0) { char buf[20]; dsn::utils::time_ms_to_date_time((uint64_t)info.drop_second * 1000, buf, 20); buf[10] = '_'; drop_time = buf; } if (info.expire_second > 0) { char buf[20]; dsn::utils::time_ms_to_date_time((uint64_t)info.expire_second * 1000, buf, 20); buf[10] = '_'; drop_expire_time = buf; } } tp_general.add_row(info.app_id); tp_general.append_data(status_str); tp_general.append_data(info.app_name); tp_general.append_data(info.app_type); tp_general.append_data(info.partition_count); tp_general.append_data(info.max_replica_count); tp_general.append_data(info.is_stateful); tp_general.append_data(create_time); tp_general.append_data(drop_time); tp_general.append_data(drop_expire_time); tp_general.append_data(info.envs.size()); } mtp.add(std::move(tp_general)); int total_fully_healthy_app_count = 0; int total_unhealthy_app_count = 0; int total_write_unhealthy_app_count = 0; int total_read_unhealthy_app_count = 0; if (detailed && available_app_count > 0) { dsn::utils::table_printer tp_health("healthy_info"); tp_health.add_title("app_id"); tp_health.add_column("app_name"); tp_health.add_column("partition_count"); tp_health.add_column("fully_healthy"); tp_health.add_column("unhealthy"); tp_health.add_column("write_unhealthy"); tp_health.add_column("read_unhealthy"); for (auto &info : apps) { if (info.status != app_status::AS_AVAILABLE) { continue; } int32_t app_id; int32_t partition_count; std::vector<partition_configuration> partitions; r = list_app(info.app_name, app_id, partition_count, partitions); if (r != dsn::ERR_OK) { LOG_ERROR("list app({}) failed, err = {}", info.app_name, r); return r; } CHECK_EQ(info.app_id, app_id); CHECK_EQ(info.partition_count, partition_count); int fully_healthy = 0; int write_unhealthy = 0; int read_unhealthy = 0; for (int i = 0; i < partitions.size(); i++) { const dsn::partition_configuration &p = partitions[i]; int replica_count = 0; if (!p.primary.is_invalid()) { replica_count++; } replica_count += p.secondaries.size(); if (!p.primary.is_invalid()) { if (replica_count >= p.max_replica_count) fully_healthy++; else if (replica_count < 2) write_unhealthy++; } else { write_unhealthy++; read_unhealthy++; } } tp_health.add_row(info.app_id); tp_health.append_data(info.app_name); tp_health.append_data(info.partition_count); tp_health.append_data(fully_healthy); tp_health.append_data(info.partition_count - fully_healthy); tp_health.append_data(write_unhealthy); tp_health.append_data(read_unhealthy); if (fully_healthy == info.partition_count) total_fully_healthy_app_count++; else total_unhealthy_app_count++; if (write_unhealthy > 0) total_write_unhealthy_app_count++; if (read_unhealthy > 0) total_read_unhealthy_app_count++; } mtp.add(std::move(tp_health)); } dsn::utils::table_printer tp_count("summary"); tp_count.add_row_name_and_data("total_app_count", available_app_count); if (detailed && available_app_count > 0) { tp_count.add_row_name_and_data("fully_healthy_app_count", total_fully_healthy_app_count); tp_count.add_row_name_and_data("unhealthy_app_count", total_unhealthy_app_count); tp_count.add_row_name_and_data("write_unhealthy_app_count", total_write_unhealthy_app_count); tp_count.add_row_name_and_data("read_unhealthy_app_count", total_read_unhealthy_app_count); } mtp.add(std::move(tp_count)); mtp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular); return dsn::ERR_OK; } dsn::error_code replication_ddl_client::list_nodes( const dsn::replication::node_status::type status, std::map<dsn::rpc_address, dsn::replication::node_status::type> &nodes) { auto req = std::make_shared<configuration_list_nodes_request>(); req->status = status; auto resp_task = request_meta(RPC_CM_LIST_NODES, req); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } dsn::replication::configuration_list_nodes_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } for (const dsn::replication::node_info &n : resp.infos) { nodes[n.address] = n.status; } return dsn::ERR_OK; } struct list_nodes_helper { std::string node_name; std::string node_status; int primary_count; int secondary_count; list_nodes_helper(const std::string &n, const std::string &s) : node_name(n), node_status(s), primary_count(0), secondary_count(0) { } }; std::string host_name_resolve(bool resolve_ip, std::string value) { if (resolve_ip) { std::string temp; if (dsn::utils::hostname_from_ip_port(value.c_str(), &temp)) return temp; } return value; } dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_status::type status, bool detailed, const std::string &file_name, bool resolve_ip) { std::map<dsn::rpc_address, dsn::replication::node_status::type> nodes; auto r = list_nodes(status, nodes); if (r != dsn::ERR_OK) { return r; } std::map<dsn::rpc_address, list_nodes_helper> tmp_map; int alive_node_count = 0; for (auto &kv : nodes) { if (kv.second == dsn::replication::node_status::NS_ALIVE) alive_node_count++; std::string status_str = enum_to_string(kv.second); status_str = status_str.substr(status_str.find("NS_") + 3); tmp_map.emplace( kv.first, list_nodes_helper(host_name_resolve(resolve_ip, kv.first.to_std_string()), status_str)); } if (detailed) { std::vector<::dsn::app_info> apps; r = list_apps(dsn::app_status::AS_AVAILABLE, apps); if (r != dsn::ERR_OK) { return r; } for (auto &app : apps) { int32_t app_id; int32_t partition_count; std::vector<partition_configuration> partitions; r = list_app(app.app_name, app_id, partition_count, partitions); if (r != dsn::ERR_OK) { return r; } for (int i = 0; i < partitions.size(); i++) { const dsn::partition_configuration &p = partitions[i]; if (!p.primary.is_invalid()) { auto find = tmp_map.find(p.primary); if (find != tmp_map.end()) { find->second.primary_count++; } } for (int j = 0; j < p.secondaries.size(); j++) { auto find = tmp_map.find(p.secondaries[j]); if (find != tmp_map.end()) { find->second.secondary_count++; } } } } } // print configuration_list_nodes_response std::streambuf *buf; std::ofstream of; if (!file_name.empty()) { of.open(file_name); buf = of.rdbuf(); } else { buf = std::cout.rdbuf(); } std::ostream out(buf); dsn::utils::table_printer tp; tp.add_title("address"); tp.add_column("status"); if (detailed) { tp.add_column("replica_count"); tp.add_column("primary_count"); tp.add_column("secondary_count"); } for (auto &kv : tmp_map) { tp.add_row(kv.second.node_name); tp.append_data(kv.second.node_status); if (detailed) { tp.append_data(kv.second.primary_count + kv.second.secondary_count); tp.append_data(kv.second.primary_count); tp.append_data(kv.second.secondary_count); } } tp.output(out); out << std::endl; dsn::utils::table_printer tp_count; tp_count.add_row_name_and_data("total_node_count", nodes.size()); tp_count.add_row_name_and_data("alive_node_count", alive_node_count); tp_count.add_row_name_and_data("unalive_node_count", nodes.size() - alive_node_count); tp_count.output(out); out << std::endl; return dsn::ERR_OK; #undef RESOLVE } dsn::error_code replication_ddl_client::cluster_name(int64_t timeout_ms, std::string &cluster_name) { auto req = std::make_shared<configuration_cluster_info_request>(); auto resp_task = request_meta(RPC_CM_CLUSTER_INFO, req, timeout_ms); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } configuration_cluster_info_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } cluster_name.clear(); for (int i = 0; i < resp.keys.size(); ++i) { if (resp.keys[i] == "cluster_name") { cluster_name = resp.values[i]; } } return cluster_name.empty() ? dsn::ERR_UNKNOWN : dsn::ERR_OK; } dsn::error_code replication_ddl_client::cluster_info(const std::string &file_name, bool resolve_ip, bool json) { auto req = std::make_shared<configuration_cluster_info_request>(); auto resp_task = request_meta(RPC_CM_CLUSTER_INFO, req); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } configuration_cluster_info_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } // print configuration_cluster_info_response std::streambuf *buf; std::ofstream of; if (!file_name.empty()) { of.open(file_name); buf = of.rdbuf(); } else { buf = std::cout.rdbuf(); } std::ostream out(buf); if (resolve_ip) { for (int i = 0; i < resp.keys.size(); ++i) { if (resp.keys[i] == "meta_servers") { dsn::utils::list_hostname_from_ip_port(resp.values[i].c_str(), &resp.values[i]); } else if (resp.keys[i] == "primary_meta_server") { dsn::utils::hostname_from_ip_port(resp.values[i].c_str(), &resp.values[i]); } } } dsn::utils::table_printer tp("cluster_info"); for (int i = 0; i < resp.keys.size(); i++) { tp.add_row_name_and_data(resp.keys[i], resp.values[i]); } tp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular); return dsn::ERR_OK; } dsn::error_code replication_ddl_client::list_app(const std::string &app_name, bool detailed, bool json, const std::string &file_name, bool resolve_ip) { dsn::utils::multi_table_printer mtp; dsn::utils::table_printer tp_params("parameters"); if (!(app_name.empty() && file_name.empty())) { if (!app_name.empty()) tp_params.add_row_name_and_data("app_name", app_name); if (!file_name.empty()) tp_params.add_row_name_and_data("out_file", file_name); } tp_params.add_row_name_and_data("detailed", detailed); mtp.add(std::move(tp_params)); int32_t app_id = 0; int32_t partition_count = 0; int32_t max_replica_count = 0; std::vector<partition_configuration> partitions; dsn::error_code err = list_app(app_name, app_id, partition_count, partitions); if (err != dsn::ERR_OK) { return err; } if (!partitions.empty()) { max_replica_count = partitions[0].max_replica_count; } // print query_cfg_response std::streambuf *buf; std::ofstream of; if (!file_name.empty()) { of.open(file_name); buf = of.rdbuf(); } else { buf = std::cout.rdbuf(); } std::ostream out(buf); dsn::utils::table_printer tp_general("general"); tp_general.add_row_name_and_data("app_name", app_name); tp_general.add_row_name_and_data("app_id", app_id); tp_general.add_row_name_and_data("partition_count", partition_count); tp_general.add_row_name_and_data("max_replica_count", max_replica_count); mtp.add(std::move(tp_general)); if (detailed) { dsn::utils::table_printer tp_details("replicas"); tp_details.add_title("pidx"); tp_details.add_column("ballot"); tp_details.add_column("replica_count"); tp_details.add_column("primary"); tp_details.add_column("secondaries"); std::map<rpc_address, std::pair<int, int>> node_stat; int total_prim_count = 0; int total_sec_count = 0; int fully_healthy = 0; int write_unhealthy = 0; int read_unhealthy = 0; for (const auto &p : partitions) { int replica_count = 0; if (!p.primary.is_invalid()) { replica_count++; node_stat[p.primary].first++; total_prim_count++; } replica_count += p.secondaries.size(); total_sec_count += p.secondaries.size(); if (!p.primary.is_invalid()) { if (replica_count >= p.max_replica_count) fully_healthy++; else if (replica_count < 2) write_unhealthy++; } else { write_unhealthy++; read_unhealthy++; } tp_details.add_row(p.pid.get_partition_index()); tp_details.append_data(p.ballot); std::stringstream oss; oss << replica_count << "/" << p.max_replica_count; tp_details.append_data(oss.str()); tp_details.append_data( (p.primary.is_invalid() ? "-" : host_name_resolve(resolve_ip, p.primary.to_std_string()))); oss.str(""); oss << "["; // TODO (yingchun) join for (int j = 0; j < p.secondaries.size(); j++) { if (j != 0) oss << ","; oss << host_name_resolve(resolve_ip, p.secondaries[j].to_std_string()); node_stat[p.secondaries[j]].second++; } oss << "]"; tp_details.append_data(oss.str()); } mtp.add(std::move(tp_details)); // 'node' section. dsn::utils::table_printer tp_nodes("nodes"); tp_nodes.add_title("node"); tp_nodes.add_column("primary"); tp_nodes.add_column("secondary"); tp_nodes.add_column("total"); for (auto &kv : node_stat) { tp_nodes.add_row(host_name_resolve(resolve_ip, kv.first.to_std_string())); tp_nodes.append_data(kv.second.first); tp_nodes.append_data(kv.second.second); tp_nodes.append_data(kv.second.first + kv.second.second); } tp_nodes.add_row(""); tp_nodes.append_data(total_prim_count); tp_nodes.append_data(total_sec_count); tp_nodes.append_data(total_prim_count + total_sec_count); mtp.add(std::move(tp_nodes)); // healthy partition count section. dsn::utils::table_printer tp_hpc("healthy"); tp_hpc.add_row_name_and_data("fully_healthy_partition_count", fully_healthy); tp_hpc.add_row_name_and_data("unhealthy_partition_count", partition_count - fully_healthy); tp_hpc.add_row_name_and_data("write_unhealthy_partition_count", write_unhealthy); tp_hpc.add_row_name_and_data("read_unhealthy_partition_count", read_unhealthy); mtp.add(std::move(tp_hpc)); } mtp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular); return dsn::ERR_OK; #undef RESOLVE } dsn::error_code replication_ddl_client::list_app(const std::string &app_name, int32_t &app_id, int32_t &partition_count, std::vector<partition_configuration> &partitions) { if (app_name.empty() || !std::all_of(app_name.cbegin(), app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) return ERR_INVALID_PARAMETERS; auto req = std::make_shared<query_cfg_request>(); req->app_name = app_name; auto resp_task = request_meta(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, req); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } dsn::query_cfg_response resp; dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } app_id = resp.app_id; partition_count = resp.partition_count; partitions = resp.partitions; return dsn::ERR_OK; } dsn::replication::configuration_meta_control_response replication_ddl_client::control_meta_function_level(meta_function_level::type level) { auto req = std::make_shared<configuration_meta_control_request>(); req->level = level; auto response_task = request_meta(RPC_CM_CONTROL_META, req); response_task->wait(); configuration_meta_control_response resp; if (response_task->error() != dsn::ERR_OK) { resp.err = response_task->error(); } else { dsn::unmarshall(response_task->get_response(), resp); } return resp; } dsn::error_code replication_ddl_client::send_balancer_proposal(const configuration_balancer_request &request) { auto req = std::make_shared<configuration_balancer_request>(request); auto response_task = request_meta(RPC_CM_PROPOSE_BALANCER, req); response_task->wait(); if (response_task->error() != dsn::ERR_OK) return response_task->error(); dsn::replication::configuration_balancer_response resp; dsn::unmarshall(response_task->get_response(), resp); return resp.err; } dsn::error_code replication_ddl_client::do_recovery(const std::vector<rpc_address> &replica_nodes, int wait_seconds, bool skip_bad_nodes, bool skip_lost_partitions, const std::string &outfile) { std::streambuf *buf; std::ofstream of; if (!outfile.empty()) { of.open(outfile); buf = of.rdbuf(); } else { buf = std::cout.rdbuf(); } std::ostream out(buf); auto req = std::make_shared<configuration_recovery_request>(); req->recovery_set.clear(); for (const dsn::rpc_address &node : replica_nodes) { if (std::find(req->recovery_set.begin(), req->recovery_set.end(), node) != req->recovery_set.end()) { out << "duplicate replica node " << node.to_string() << ", just ingore it" << std::endl; } else { req->recovery_set.push_back(node); } } if (req->recovery_set.empty()) { out << "node set for recovery it empty" << std::endl; return ERR_INVALID_PARAMETERS; } req->skip_bad_nodes = skip_bad_nodes; req->skip_lost_partitions = skip_lost_partitions; out << "Wait seconds: " << wait_seconds << std::endl; out << "Skip bad nodes: " << (skip_bad_nodes ? "true" : "false") << std::endl; out << "Skip lost partitions: " << (skip_lost_partitions ? "true" : "false") << std::endl; out << "Node list:" << std::endl; out << "=============================" << std::endl; for (auto &node : req->recovery_set) { out << node.to_string() << std::endl; } out << "=============================" << std::endl; auto response_task = request_meta(RPC_CM_START_RECOVERY, req, wait_seconds * 1000); bool wait_done = false; for (int i = 0; i < wait_seconds; ++i) { wait_done = response_task->wait(1000); if (wait_done) break; else out << "Wait recovery for " << i << " seconds" << std::endl; } if (!wait_done || response_task->get_response() == NULL) { out << "Wait recovery failed, administrator should check the meta for progress" << std::endl; return dsn::ERR_TIMEOUT; } else { configuration_recovery_response resp; dsn::unmarshall(response_task->get_response(), resp); out << "Recover result: " << resp.err.to_string() << std::endl; if (!resp.hint_message.empty()) { out << "=============================" << std::endl; out << resp.hint_message; out << "=============================" << std::endl; } return resp.err; } } dsn::error_code replication_ddl_client::do_restore(const std::string &backup_provider_name, const std::string &cluster_name, const std::string &policy_name, int64_t timestamp, const std::string &old_app_name, int32_t old_app_id, const std::string &new_app_name, bool skip_bad_partition, const std::string &restore_path) { if (old_app_name.empty() || !std::all_of(old_app_name.cbegin(), old_app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) { std::cout << "restore app " << old_app_name << " failed: invalid old_app_name" << std::endl; return ERR_INVALID_PARAMETERS; } if (new_app_name.empty() || !std::all_of(new_app_name.cbegin(), new_app_name.cend(), (bool (*)(int))replication_ddl_client::valid_app_char)) { std::cout << "restore app " << new_app_name << " failed: invalid new_app_name" << std::endl; return ERR_INVALID_PARAMETERS; } auto req = std::make_shared<configuration_restore_request>(); req->cluster_name = cluster_name; req->policy_name = policy_name; req->app_name = old_app_name; req->app_id = old_app_id; req->new_app_name = new_app_name; req->backup_provider_name = backup_provider_name; req->time_stamp = timestamp; req->skip_bad_partition = skip_bad_partition; if (!restore_path.empty()) { req->__set_restore_path(restore_path); std::cout << "restore app from the specified path : " << restore_path << std::endl; } auto resp_task = request_meta(RPC_CM_START_RESTORE, req); bool finish = false; while (!finish) { std::cout << "sleep 1 second to wait complete..." << std::endl; finish = resp_task->wait(1000); } if (resp_task->error() != ERR_OK) { return resp_task->error(); } else { configuration_create_app_response resp; dsn::unmarshall(resp_task->get_response(), resp); if (resp.err == ERR_OBJECT_NOT_FOUND) { std::cout << "restore app failed: couldn't find valid app metadata" << std::endl; } else if (resp.err == ERR_OK) { std::cout << "\t" << "new app_id = " << resp.appid << std::endl; } return resp.err; } } dsn::error_code replication_ddl_client::add_backup_policy(const std::string &policy_name, const std::string &backup_provider_type, const std::vector<int32_t> &app_ids, int64_t backup_interval_seconds, int32_t backup_history_cnt, const std::string &start_time) { auto req = std::make_shared<configuration_add_backup_policy_request>(); req->policy_name = policy_name; req->backup_provider_type = backup_provider_type; req->app_ids = app_ids; req->backup_interval_seconds = backup_interval_seconds; req->backup_history_count_to_keep = backup_history_cnt; req->start_time = start_time; auto resp_task = request_meta(RPC_CM_ADD_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_add_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else { std::cout << "add backup policy succeed, policy_name = " << policy_name << std::endl; } return ERR_OK; } error_with<start_backup_app_response> replication_ddl_client::backup_app( int32_t app_id, const std::string &backup_provider_type, const std::string &backup_path) { auto req = std::make_unique<start_backup_app_request>(); req->app_id = app_id; req->backup_provider_type = backup_provider_type; if (!backup_path.empty()) { req->__set_backup_path(backup_path); } return call_rpc_sync(start_backup_app_rpc(std::move(req), RPC_CM_START_BACKUP_APP)); } error_with<query_backup_status_response> replication_ddl_client::query_backup(int32_t app_id, int64_t backup_id) { auto req = std::make_unique<query_backup_status_request>(); req->app_id = app_id; if (backup_id > 0) { req->__set_backup_id(backup_id); } return call_rpc_sync(query_backup_status_rpc(std::move(req), RPC_CM_QUERY_BACKUP_STATUS)); } dsn::error_code replication_ddl_client::disable_backup_policy(const std::string &policy_name) { auto req = std::make_shared<configuration_modify_backup_policy_request>(); req->policy_name = policy_name; req->__set_is_disable(true); auto resp_task = request_meta(RPC_CM_MODIFY_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_modify_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else { std::cout << "disable policy result: " << resp.err.to_string() << std::endl; if (!resp.hint_message.empty()) { std::cout << "=============================" << std::endl; std::cout << resp.hint_message << std::endl; std::cout << "=============================" << std::endl; } return resp.err; } } dsn::error_code replication_ddl_client::enable_backup_policy(const std::string &policy_name) { auto req = std::make_shared<configuration_modify_backup_policy_request>(); req->policy_name = policy_name; req->__set_is_disable(false); auto resp_task = request_meta(RPC_CM_MODIFY_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_modify_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else if (resp.err == ERR_BUSY) { std::cout << "policy is under backup, please try disable later" << std::endl; return ERR_OK; } else { std::cout << "enable policy result: " << resp.err.to_string() << std::endl; if (!resp.hint_message.empty()) { std::cout << "=============================" << std::endl; std::cout << resp.hint_message << std::endl; std::cout << "=============================" << std::endl; } return resp.err; } } // help functions // TODO (yingchun) use join template <typename T> // make sure T support cout << T; std::string print_set(const std::set<T> &set) { std::stringstream ss; ss << "{"; auto begin = set.begin(); auto end = set.end(); for (auto it = begin; it != end; it++) { if (it != begin) { ss << ", "; } ss << *it; } ss << "}"; return ss.str(); } static void print_policy_entry(const policy_entry &entry) { dsn::utils::table_printer tp; tp.add_row_name_and_data(" name", entry.policy_name); tp.add_row_name_and_data(" backup_provider_type", entry.backup_provider_type); tp.add_row_name_and_data(" backup_interval", entry.backup_interval_seconds + "s"); tp.add_row_name_and_data(" app_ids", print_set(entry.app_ids)); tp.add_row_name_and_data(" start_time", entry.start_time); tp.add_row_name_and_data(" status", entry.is_disable ? "disabled" : "enabled"); tp.add_row_name_and_data(" backup_history_count", entry.backup_history_count_to_keep); tp.output(std::cout); } static void print_backup_entry(const backup_entry &bentry) { char start_time[30] = {'\0'}; char end_time[30] = {'\0'}; ::dsn::utils::time_ms_to_date_time(bentry.start_time_ms, start_time, 30); if (bentry.end_time_ms == 0) { end_time[0] = '-'; end_time[1] = '\0'; } else { ::dsn::utils::time_ms_to_date_time(bentry.end_time_ms, end_time, 30); } dsn::utils::table_printer tp; tp.add_row_name_and_data(" id", bentry.backup_id); tp.add_row_name_and_data(" start_time", start_time); tp.add_row_name_and_data(" end_time", end_time); tp.add_row_name_and_data(" app_ids", print_set(bentry.app_ids)); tp.output(std::cout); } dsn::error_code replication_ddl_client::ls_backup_policy() { auto req = std::make_shared<configuration_query_backup_policy_request>(); req->policy_names.clear(); req->backup_info_count = 0; auto resp_task = request_meta(RPC_CM_QUERY_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_query_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else { for (int32_t idx = 0; idx < resp.policys.size(); idx++) { std::cout << "[" << idx + 1 << "]" << std::endl; print_policy_entry(resp.policys[idx]); std::cout << std::endl; } } return ERR_OK; } dsn::error_code replication_ddl_client::query_backup_policy(const std::vector<std::string> &policy_names, int backup_info_cnt) { auto req = std::make_shared<configuration_query_backup_policy_request>(); req->policy_names = policy_names; req->backup_info_count = backup_info_cnt; auto resp_task = request_meta(RPC_CM_QUERY_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_query_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else { for (int32_t idx = 0; idx < resp.policys.size(); idx++) { if (idx != 0) { std::cout << "************************" << std::endl; } const policy_entry &pentry = resp.policys[idx]; std::cout << "policy_info:" << std::endl; print_policy_entry(pentry); std::cout << std::endl << "backup_infos:" << std::endl; const std::vector<backup_entry> &backup_infos = resp.backup_infos[idx]; for (int idx = 0; idx < backup_infos.size(); idx++) { std::cout << "[" << (idx + 1) << "]" << std::endl; print_backup_entry(backup_infos[idx]); } } } return ERR_OK; } dsn::error_code replication_ddl_client::update_backup_policy(const std::string &policy_name, const std::vector<int32_t> &add_appids, const std::vector<int32_t> &removal_appids, int64_t new_backup_interval_sec, int32_t backup_history_count_to_keep, const std::string &start_time) { auto req = std::make_shared<configuration_modify_backup_policy_request>(); req->policy_name = policy_name; if (!add_appids.empty()) { req->__set_add_appids(add_appids); } if (!removal_appids.empty()) { req->__set_removal_appids(removal_appids); } if (new_backup_interval_sec > 0) { req->__set_new_backup_interval_sec(new_backup_interval_sec); } if (backup_history_count_to_keep > 0) { req->__set_backup_history_count_to_keep(backup_history_count_to_keep); } if (!start_time.empty()) { req->__set_start_time(start_time); } auto resp_task = request_meta(RPC_CM_MODIFY_BACKUP_POLICY, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_modify_backup_policy_response resp; ::dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != ERR_OK) { return resp.err; } else { std::cout << "Modify policy result: " << resp.err.to_string() << std::endl; if (!resp.hint_message.empty()) { std::cout << "=============================" << std::endl; std::cout << resp.hint_message << std::endl; std::cout << "=============================" << std::endl; } return resp.err; } } dsn::error_code replication_ddl_client::query_restore(int32_t restore_app_id, bool detailed) { if (restore_app_id <= 0) { return ERR_INVALID_PARAMETERS; } auto req = std::make_shared<configuration_query_restore_request>(); req->restore_app_id = restore_app_id; auto resp_task = request_meta(RPC_CM_QUERY_RESTORE_STATUS, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_query_restore_response response; ::dsn::unmarshall(resp_task->get_response(), response); if (response.err == ERR_OK) { int overall_progress = 0; for (const auto &p : response.restore_progress) { overall_progress += p; } overall_progress = overall_progress / response.restore_progress.size(); overall_progress = overall_progress / 10; if (detailed) { int width = strlen("restore_status"); std::cout << std::setw(width) << std::left << "pid" << std::setw(width) << std::left << "progress(%)" << std::setw(width) << std::left << "restore_status" << std::endl; for (int idx = 0; idx < response.restore_status.size(); idx++) { std::string restore_status = std::string("unknown"); if (response.restore_status[idx] == ::dsn::ERR_OK) { restore_status = (response.restore_progress[idx] == 1000) ? "ok" : "ongoing"; } else if (response.restore_status[idx] == ERR_IGNORE_BAD_DATA) { restore_status = "skip"; } int progress = response.restore_progress[idx] / 10; std::cout << std::setw(width) << std::left << idx << std::setw(width) << std::left << progress << std::setw(width) << std::left << restore_status << std::endl; } std::cout << std::endl << "the overall progress of restore is " << overall_progress << "%" << std::endl; std::cout << std::endl << "annotations:" << std::endl; std::cout << " ok : mean restore complete" << std::endl; std::cout << " ongoing : mean restore is under going" << std::endl; std::cout << " skip : data on cold backup media is damaged, but skip the damaged partition" << std::endl; std::cout << " unknown : invalid, should login server and check it" << std::endl; } else { std::cout << "the overall progress of restore is " << overall_progress << "%" << std::endl; } } else if (response.err == ERR_APP_NOT_EXIST) { std::cout << "invalid restore_app_id(" << restore_app_id << ")" << std::endl; } else if (response.err == ERR_APP_DROPPED) { std::cout << "restore failed, because some partition's data is damaged on cold backup media" << std::endl; } return ERR_OK; } error_with<duplication_add_response> replication_ddl_client::add_dup( std::string app_name, std::string remote_cluster_name, bool is_duplicating_checkpoint) { auto req = std::make_unique<duplication_add_request>(); req->app_name = std::move(app_name); req->remote_cluster_name = std::move(remote_cluster_name); req->is_duplicating_checkpoint = is_duplicating_checkpoint; return call_rpc_sync(duplication_add_rpc(std::move(req), RPC_CM_ADD_DUPLICATION)); } error_with<duplication_modify_response> replication_ddl_client::change_dup_status( std::string app_name, int dupid, duplication_status::type status) { auto req = std::make_unique<duplication_modify_request>(); req->app_name = std::move(app_name); req->dupid = dupid; req->__set_status(status); return call_rpc_sync(duplication_modify_rpc(std::move(req), RPC_CM_MODIFY_DUPLICATION)); } error_with<duplication_modify_response> replication_ddl_client::update_dup_fail_mode( std::string app_name, int dupid, duplication_fail_mode::type fmode) { if (_duplication_fail_mode_VALUES_TO_NAMES.find(fmode) == _duplication_fail_mode_VALUES_TO_NAMES.end()) { return FMT_ERR(ERR_INVALID_PARAMETERS, "unexpected duplication_fail_mode {}", fmode); } auto req = std::make_unique<duplication_modify_request>(); req->app_name = std::move(app_name); req->dupid = dupid; req->__set_fail_mode(fmode); return call_rpc_sync(duplication_modify_rpc(std::move(req), RPC_CM_MODIFY_DUPLICATION)); } error_with<duplication_query_response> replication_ddl_client::query_dup(std::string app_name) { auto req = std::make_unique<duplication_query_request>(); req->app_name = std::move(app_name); return call_rpc_sync(duplication_query_rpc(std::move(req), RPC_CM_QUERY_DUPLICATION)); } bool replication_ddl_client::valid_app_char(int c) { return (bool)std::isalnum(c) || c == '_' || c == '.' || c == ':'; } namespace { bool need_retry(uint32_t attempt_count, const dsn::error_code &err) { // For successful request, no need to retry. if (err == dsn::ERR_OK) { return false; } // As long as the max attempt count has not been reached, just do retry; // otherwise, do not attempt again. return attempt_count < FLAGS_ddl_client_max_attempt_count; } } // anonymous namespace void replication_ddl_client::end_meta_request(const rpc_response_task_ptr &callback, uint32_t attempt_count, const error_code &err, dsn::message_ex *request, dsn::message_ex *resp) { LOG_INFO( "send request to meta server: rpc_code={}, err={}, attempt_count={}, max_attempt_count={}", request->local_rpc_code, err, attempt_count, FLAGS_ddl_client_max_attempt_count); if (!need_retry(attempt_count, err)) { callback->enqueue(err, (message_ex *)resp); return; } rpc::call(_meta_server, request, &_tracker, [this, attempt_count, callback]( error_code err, dsn::message_ex *request, dsn::message_ex *response) mutable { FAIL_POINT_INJECT_NOT_RETURN_F( "ddl_client_request_meta", [&err, this](dsn::string_view str) { err = pop_mock_error(); }); end_meta_request(callback, attempt_count + 1, err, request, response); }); } dsn::error_code replication_ddl_client::get_app_envs(const std::string &app_name, std::map<std::string, std::string> &envs) { std::vector<::dsn::app_info> apps; auto r = list_apps(dsn::app_status::AS_AVAILABLE, apps); if (r != dsn::ERR_OK) { return r; } for (auto &app : apps) { if (app.app_name == app_name) { envs = app.envs; return dsn::ERR_OK; } } return dsn::ERR_OBJECT_NOT_FOUND; } error_with<configuration_update_app_env_response> replication_ddl_client::set_app_envs(const std::string &app_name, const std::vector<std::string> &keys, const std::vector<std::string> &values) { auto req = std::make_unique<configuration_update_app_env_request>(); req->__set_app_name(app_name); req->__set_keys(keys); req->__set_values(values); req->__set_op(app_env_operation::type::APP_ENV_OP_SET); return call_rpc_sync(update_app_env_rpc(std::move(req), RPC_CM_UPDATE_APP_ENV)); } ::dsn::error_code replication_ddl_client::del_app_envs(const std::string &app_name, const std::vector<std::string> &keys) { auto req = std::make_shared<configuration_update_app_env_request>(); req->__set_app_name(app_name); req->__set_op(app_env_operation::type::APP_ENV_OP_DEL); req->__set_keys(keys); auto resp_task = request_meta(RPC_CM_UPDATE_APP_ENV, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_update_app_env_response response; ::dsn::unmarshall(resp_task->get_response(), response); if (response.err != ERR_OK) { return response.err; } else { std::cout << "del app envs succeed" << std::endl; if (!response.hint_message.empty()) { std::cout << "=============================" << std::endl; std::cout << response.hint_message << std::endl; std::cout << "=============================" << std::endl; } } return ERR_OK; } ::dsn::error_code replication_ddl_client::clear_app_envs(const std::string &app_name, bool clear_all, const std::string &prefix) { auto req = std::make_shared<configuration_update_app_env_request>(); req->__set_app_name(app_name); req->__set_op(app_env_operation::type::APP_ENV_OP_CLEAR); if (clear_all) { req->__set_clear_prefix(""); } else { CHECK(!prefix.empty(), "prefix can not be empty"); req->__set_clear_prefix(prefix); } auto resp_task = request_meta(RPC_CM_UPDATE_APP_ENV, req); resp_task->wait(); if (resp_task->error() != ERR_OK) { return resp_task->error(); } configuration_update_app_env_response response; ::dsn::unmarshall(resp_task->get_response(), response); if (response.err != ERR_OK) { return response.err; } else { std::cout << "clear app envs succeed" << std::endl; if (!response.hint_message.empty()) { std::cout << "=============================" << std::endl; std::cout << response.hint_message << std::endl; std::cout << "=============================" << std::endl; } } return ERR_OK; } dsn::error_code replication_ddl_client::ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &ddd_partitions) { auto req = std::make_shared<ddd_diagnose_request>(); req->pid = pid; auto resp_task = request_meta(RPC_CM_DDD_DIAGNOSE, req); resp_task->wait(); if (resp_task->error() != dsn::ERR_OK) { return resp_task->error(); } ddd_diagnose_response resp; dsn::unmarshall(resp_task->get_response(), resp); if (resp.err != dsn::ERR_OK) { return resp.err; } ddd_partitions = std::move(resp.partitions); return dsn::ERR_OK; } void replication_ddl_client::query_disk_info( const std::vector<dsn::rpc_address> &targets, const std::string &app_name, /*out*/ std::map<dsn::rpc_address, error_with<query_disk_info_response>> &resps) { std::map<dsn::rpc_address, query_disk_info_rpc> query_disk_info_rpcs; for (const auto &target : targets) { auto request = std::make_unique<query_disk_info_request>(); request->node = target; request->app_name = app_name; query_disk_info_rpcs.emplace(target, query_disk_info_rpc(std::move(request), RPC_QUERY_DISK_INFO)); } call_rpcs_sync(query_disk_info_rpcs, resps); } error_with<start_bulk_load_response> replication_ddl_client::start_bulk_load(const std::string &app_name, const std::string &cluster_name, const std::string &file_provider_type, const std::string &remote_root_path, const bool ingest_behind) { auto req = std::make_unique<start_bulk_load_request>(); req->app_name = app_name; req->cluster_name = cluster_name; req->file_provider_type = file_provider_type; req->remote_root_path = remote_root_path; req->ingest_behind = ingest_behind; return call_rpc_sync(start_bulk_load_rpc(std::move(req), RPC_CM_START_BULK_LOAD)); } error_with<control_bulk_load_response> replication_ddl_client::control_bulk_load(const std::string &app_name, const bulk_load_control_type::type control_type) { auto req = std::make_unique<control_bulk_load_request>(); req->app_name = app_name; req->type = control_type; return call_rpc_sync(control_bulk_load_rpc(std::move(req), RPC_CM_CONTROL_BULK_LOAD)); } error_with<query_bulk_load_response> replication_ddl_client::query_bulk_load(const std::string &app_name) { auto req = std::make_unique<query_bulk_load_request>(); req->app_name = app_name; return call_rpc_sync(query_bulk_load_rpc(std::move(req), RPC_CM_QUERY_BULK_LOAD_STATUS)); } error_with<clear_bulk_load_state_response> replication_ddl_client::clear_bulk_load(const std::string &app_name) { auto req = std::make_unique<clear_bulk_load_state_request>(); req->app_name = app_name; return call_rpc_sync(clear_bulk_load_rpc(std::move(req), RPC_CM_CLEAR_BULK_LOAD)); } error_code replication_ddl_client::detect_hotkey(const dsn::rpc_address &target, detect_hotkey_request &req, detect_hotkey_response &resp) { std::map<dsn::rpc_address, detect_hotkey_rpc> detect_hotkey_rpcs; auto request = std::make_unique<detect_hotkey_request>(req); detect_hotkey_rpcs.emplace(target, detect_hotkey_rpc(std::move(request), RPC_DETECT_HOTKEY)); std::map<dsn::rpc_address, error_with<detect_hotkey_response>> resps; call_rpcs_sync(detect_hotkey_rpcs, resps); resp = resps.begin()->second.get_value(); return resps.begin()->second.get_error().code(); } error_with<start_partition_split_response> replication_ddl_client::start_partition_split(const std::string &app_name, int new_partition_count) { auto req = std::make_unique<start_partition_split_request>(); req->__set_app_name(app_name); req->__set_new_partition_count(new_partition_count); return call_rpc_sync(start_split_rpc(std::move(req), RPC_CM_START_PARTITION_SPLIT)); } error_with<control_split_response> replication_ddl_client::pause_partition_split(const std::string &app_name, const int32_t parent_pidx) { return control_partition_split(app_name, split_control_type::PAUSE, parent_pidx, 0); } error_with<control_split_response> replication_ddl_client::restart_partition_split(const std::string &app_name, const int32_t parent_pidx) { return control_partition_split(app_name, split_control_type::RESTART, parent_pidx, 0); } error_with<control_split_response> replication_ddl_client::cancel_partition_split(const std::string &app_name, const int32_t old_partition_count) { return control_partition_split(app_name, split_control_type::CANCEL, -1, old_partition_count); } error_with<control_split_response> replication_ddl_client::control_partition_split(const std::string &app_name, split_control_type::type control_type, const int32_t parent_pidx, const int32_t old_partition_count) { auto req = std::make_unique<control_split_request>(); req->__set_app_name(app_name); req->__set_control_type(control_type); req->__set_parent_pidx(parent_pidx); req->__set_old_partition_count(old_partition_count); return call_rpc_sync(control_split_rpc(std::move(req), RPC_CM_CONTROL_PARTITION_SPLIT)); } error_with<query_split_response> replication_ddl_client::query_partition_split(const std::string &app_name) { auto req = std::make_unique<query_split_request>(); req->__set_app_name(app_name); return call_rpc_sync(query_split_rpc(std::move(req), RPC_CM_QUERY_PARTITION_SPLIT)); } error_with<add_new_disk_response> replication_ddl_client::add_new_disk(const rpc_address &target_node, const std::string &disk_str) { auto req = std::make_unique<add_new_disk_request>(); req->disk_str = disk_str; std::map<rpc_address, add_new_disk_rpc> add_new_disk_rpcs; add_new_disk_rpcs.emplace(target_node, add_new_disk_rpc(std::move(req), RPC_ADD_NEW_DISK)); std::map<rpc_address, error_with<add_new_disk_response>> resps; call_rpcs_sync(add_new_disk_rpcs, resps); return resps.begin()->second.get_value(); } error_with<start_app_manual_compact_response> replication_ddl_client::start_app_manual_compact( const std::string &app_name, bool bottommost, const int32_t level, const int32_t max_count) { auto req = std::make_unique<start_app_manual_compact_request>(); req->app_name = app_name; req->__set_trigger_time(dsn_now_s()); req->__set_target_level(level); req->__set_bottommost(bottommost); if (max_count > 0) { req->__set_max_running_count(max_count); } return call_rpc_sync(start_manual_compact_rpc(std::move(req), RPC_CM_START_MANUAL_COMPACT)); } error_with<query_app_manual_compact_response> replication_ddl_client::query_app_manual_compact(const std::string &app_name) { auto req = std::make_unique<query_app_manual_compact_request>(); req->app_name = app_name; return call_rpc_sync( query_manual_compact_rpc(std::move(req), RPC_CM_QUERY_MANUAL_COMPACT_STATUS)); } error_with<configuration_get_max_replica_count_response> replication_ddl_client::get_max_replica_count(const std::string &app_name) { auto req = std::make_unique<configuration_get_max_replica_count_request>(); req->__set_app_name(app_name); return call_rpc_sync( configuration_get_max_replica_count_rpc(std::move(req), RPC_CM_GET_MAX_REPLICA_COUNT)); } error_with<configuration_set_max_replica_count_response> replication_ddl_client::set_max_replica_count(const std::string &app_name, int32_t max_replica_count) { auto req = std::make_unique<configuration_set_max_replica_count_request>(); req->__set_app_name(app_name); req->__set_max_replica_count(max_replica_count); return call_rpc_sync( configuration_set_max_replica_count_rpc(std::move(req), RPC_CM_SET_MAX_REPLICA_COUNT)); } error_with<configuration_rename_app_response> replication_ddl_client::rename_app(const std::string &old_app_name, const std::string &new_app_name) { VALIDATE_TABLE_NAME(old_app_name); VALIDATE_TABLE_NAME(new_app_name); auto req = std::make_unique<configuration_rename_app_request>(); req->__set_old_app_name(old_app_name); req->__set_new_app_name(new_app_name); return call_rpc_sync(configuration_rename_app_rpc(std::move(req), RPC_CM_RENAME_APP)); } } // namespace replication } // namespace dsn