src/shell/commands/table_management.cpp (970 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// IWYU pragma: no_include <bits/getopt_core.h>
// IWYU pragma: no_include <bits/std_abs.h>
#include <fmt/core.h>
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <cstdint>
#include <fstream>
#include <initializer_list>
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "client/replication_ddl_client.h"
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
#include "pegasus_utils.h"
#include "rpc/rpc_host_port.h"
#include "shell/argh.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "shell/sds/sds.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
#include "utils_types.h"
DSN_DEFINE_uint32(shell, tables_sample_interval_ms, 1000, "The interval between sampling metrics.");
DSN_DEFINE_validator(tables_sample_interval_ms, [](uint32_t value) -> bool { return value > 0; });
double convert_to_ratio(double hit, double total)
{
return std::abs(total) < 1e-6 ? 0 : hit / total;
}
bool ls_apps(command_executor *e, shell_context *sc, arguments args)
{
// ls [-a|--all] [-d|--detailed] [-j|--json] [-o|--output file_name]
// [-s|--status all|available|creating|dropping|dropped]
// [-p|--app_name_pattern str] [-m|--match_type all|exact|anywhere|prefix|postfix]
// All valid parameters and flags are given as follows.
static const std::set<std::string> params = {
"o", "output", "s", "status", "p", "app_name_pattern", "m", "match_type"};
static const std::set<std::string> flags = {"a", "all", "d", "detailed", "j", "json"};
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
// Check if input parameters and flags are valid.
const auto &check = validate_cmd(cmd, params, flags, 0);
if (!check) {
SHELL_PRINTLN_ERROR("{}", check.description());
return false;
}
const bool show_all = cmd[{"-a", "--all"}];
const bool detailed = cmd[{"-d", "--detailed"}];
const bool json = cmd[{"-j", "--json"}];
const std::string output_file(cmd({"-o", "--output"}, "").str());
const std::string status_str(cmd({"-s", "--status"}, "").str());
auto status = dsn::app_status::AS_INVALID;
if (status_str.empty()) {
// `show_all` functions only when target `status` is not specified.
if (!show_all) {
// That `show_all` is not given means just showing available tables.
status = dsn::app_status::AS_AVAILABLE;
}
} else if (status_str != "all") {
status = type_from_string(dsn::_app_status_VALUES_TO_NAMES,
fmt::format("as_{}", status_str),
dsn::app_status::AS_INVALID);
SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(status != dsn::app_status::AS_INVALID,
"parse {} as app_status::type failed",
status_str);
}
// Read the parttern of table name with empty string as default.
const std::string app_name_pattern(cmd({"-p", "--app_name_pattern"}, "").str());
// Read the match type of the pattern for table name with "matching all" as default,
// typically requesting all tables owned by this cluster.
auto match_type = dsn::utils::pattern_match_type::PMT_MATCH_ALL;
PARSE_OPT_ENUM(match_type, dsn::utils::pattern_match_type::PMT_INVALID, {"-m", "--match_type"});
const auto &result = sc->ddl_client->list_apps(
detailed, json, output_file, status, app_name_pattern, match_type);
if (!result) {
fmt::println("list apps failed, error={}", result);
}
return true;
}
bool query_app(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc <= 1)
return false;
static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
{"resolve_ip", no_argument, 0, 'r'},
{"output", required_argument, 0, 'o'},
{"json", no_argument, 0, 'j'},
{0, 0, 0, 0}};
std::string app_name = args.argv[1];
std::string out_file;
bool detailed = false;
bool resolve_ip = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "dro:j", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'd':
detailed = true;
break;
case 'r':
resolve_ip = true;
break;
case 'j':
json = true;
break;
case 'o':
out_file = optarg;
break;
default:
return false;
}
}
if (app_name.empty()) {
std::cout << "ERROR: null app name" << std::endl;
return false;
}
::dsn::error_code err =
sc->ddl_client->list_app(app_name, detailed, json, out_file, resolve_ip);
if (err != ::dsn::ERR_OK) {
std::cout << "query app " << app_name << " failed, error=" << err << std::endl;
}
return true;
}
namespace {
dsn::metric_filters sst_stat_filters(int32_t table_id)
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"replica"};
filters.entity_attrs = {"table_id", std::to_string(table_id)};
filters.entity_metrics = {"rdb_total_sst_files", "rdb_total_sst_size_mb"};
return filters;
}
dsn::error_s parse_sst_stat(const std::string &json_string,
std::map<int32_t, double> &count_map,
std::map<int32_t, double> &disk_map)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot);
for (const auto &entity : query_snapshot.entities) {
if (dsn_unlikely(entity.type != "replica")) {
return FMT_ERR(dsn::ERR_INVALID_DATA,
"non-replica entity should not be included: {}",
entity.type);
}
int32_t partition_id;
RETURN_NOT_OK(dsn::parse_metric_partition_id(entity.attributes, partition_id));
for (const auto &m : entity.metrics) {
if (m.name == "rdb_total_sst_files") {
count_map[partition_id] = m.value;
} else if (m.name == "rdb_total_sst_size_mb") {
disk_map[partition_id] = m.value;
}
}
}
return dsn::error_s::ok();
}
} // anonymous namespace
bool app_disk(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc <= 1)
return false;
static struct option long_options[] = {{"resolve_ip", no_argument, 0, 'r'},
{"detailed", no_argument, 0, 'd'},
{"json", no_argument, 0, 'j'},
{"output", required_argument, 0, 'o'},
{0, 0, 0, 0}};
std::string app_name = args.argv[1];
std::string out_file;
bool detailed = false;
bool json = false;
bool resolve_ip = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "drjo:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'd':
detailed = true;
break;
case 'r':
resolve_ip = true;
break;
case 'j':
json = true;
break;
case 'o':
out_file = optarg;
break;
default:
return false;
}
}
if (app_name.empty()) {
std::cout << "ERROR: null app name" << std::endl;
return false;
}
std::streambuf *buf;
std::ofstream of;
if (!out_file.empty()) {
of.open(out_file);
buf = of.rdbuf();
} else {
buf = std::cout.rdbuf();
}
std::ostream out(buf);
dsn::utils::multi_table_printer mtp;
dsn::utils::table_printer tp_params("parameters");
if (!(app_name.empty() && out_file.empty())) {
if (!app_name.empty())
tp_params.add_row_name_and_data("app_name", app_name);
if (!out_file.empty())
tp_params.add_row_name_and_data("out_file", out_file);
}
tp_params.add_row_name_and_data("detailed", detailed);
mtp.add(std::move(tp_params));
int32_t app_id = 0;
int32_t partition_count = 0;
int32_t max_replica_count = 0;
std::vector<dsn::partition_configuration> pcs;
dsn::error_code err = sc->ddl_client->list_app(app_name, app_id, partition_count, pcs);
if (err != ::dsn::ERR_OK) {
std::cout << "ERROR: list app " << app_name << " failed, error=" << err << std::endl;
return true;
}
if (!pcs.empty()) {
max_replica_count = pcs[0].max_replica_count;
}
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
std::cout << "ERROR: get replica server node list failed" << std::endl;
return true;
}
const auto &results = get_metrics(nodes, sst_stat_filters(app_id).to_query_string());
std::map<dsn::host_port, std::map<int32_t, double>> disk_map;
std::map<dsn::host_port, std::map<int32_t, double>> count_map;
for (size_t i = 0; i < nodes.size(); ++i) {
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "sst");
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
parse_sst_stat(results[i].body(), count_map[nodes[i].hp], disk_map[nodes[i].hp]),
nodes[i],
"parse sst stats");
}
::dsn::utils::table_printer tp_general("result");
tp_general.add_row_name_and_data("app_name", app_name);
tp_general.add_row_name_and_data("app_id", app_id);
tp_general.add_row_name_and_data("partition_count", partition_count);
tp_general.add_row_name_and_data("max_replica_count", max_replica_count);
::dsn::utils::table_printer tp_details("details");
if (detailed) {
tp_details.add_title("pidx");
tp_details.add_column("ballot");
tp_details.add_column("replica_count");
tp_details.add_column("primary");
tp_details.add_column("secondaries");
}
double disk_used_for_primary_replicas = 0;
int primary_replicas_count = 0;
double disk_used_for_all_replicas = 0;
int all_replicas_count = 0;
for (const auto &pc : pcs) {
std::string primary_str("-");
if (pc.hp_primary) {
bool disk_found = false;
double disk_value = 0;
auto f1 = disk_map.find(pc.hp_primary);
if (f1 != disk_map.end()) {
auto &sub_map = f1->second;
auto f2 = sub_map.find(pc.pid.get_partition_index());
if (f2 != sub_map.end()) {
disk_found = true;
disk_value = f2->second;
disk_used_for_primary_replicas += disk_value;
primary_replicas_count++;
disk_used_for_all_replicas += disk_value;
all_replicas_count++;
}
}
bool count_found = false;
double count_value = 0;
auto f3 = count_map.find(pc.hp_primary);
if (f3 != count_map.end()) {
auto &sub_map = f3->second;
auto f4 = sub_map.find(pc.pid.get_partition_index());
if (f4 != sub_map.end()) {
count_found = true;
count_value = f4->second;
}
}
std::stringstream oss;
oss << replication_ddl_client::node_name(pc.hp_primary, resolve_ip) << "(";
if (disk_found)
oss << disk_value;
else
oss << "-";
oss << ",";
if (count_found)
oss << "#" << count_value;
else
oss << "-";
oss << ")";
primary_str = oss.str();
}
std::string secondary_str;
{
std::stringstream oss;
oss << "[";
for (int j = 0; j < pc.hp_secondaries.size(); j++) {
if (j != 0)
oss << ",";
bool found = false;
double value = 0;
auto f1 = disk_map.find(pc.hp_secondaries[j]);
if (f1 != disk_map.end()) {
auto &sub_map = f1->second;
auto f2 = sub_map.find(pc.pid.get_partition_index());
if (f2 != sub_map.end()) {
found = true;
value = f2->second;
disk_used_for_all_replicas += value;
all_replicas_count++;
}
}
bool count_found = false;
double count_value = 0;
auto f3 = count_map.find(pc.hp_secondaries[j]);
if (f3 != count_map.end()) {
auto &sub_map = f3->second;
auto f3 = sub_map.find(pc.pid.get_partition_index());
if (f3 != sub_map.end()) {
count_found = true;
count_value = f3->second;
}
}
oss << replication_ddl_client::node_name(pc.hp_secondaries[j], resolve_ip) << "(";
if (found)
oss << value;
else
oss << "-";
oss << ",";
if (count_found)
oss << "#" << count_value;
else
oss << "-";
oss << ")";
}
oss << "]";
secondary_str = oss.str();
}
if (detailed) {
tp_details.add_row(std::to_string(pc.pid.get_partition_index()));
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format(
"{}/{}", pc.hp_secondaries.size() + (pc.hp_primary ? 1 : 0), pc.max_replica_count));
tp_details.append_data(primary_str);
tp_details.append_data(secondary_str);
}
}
tp_general.add_row_name_and_data("disk_used_for_primary_replicas(MB)",
disk_used_for_primary_replicas);
tp_general.add_row_name_and_data("disk_used_for_all_replicas(MB)", disk_used_for_all_replicas);
tp_general.add_row_name_and_data("partitions not counted",
std::to_string(partition_count - primary_replicas_count) +
"/" + std::to_string(partition_count));
tp_general.add_row_name_and_data(
"replicas not counted",
std::to_string(partition_count * max_replica_count - all_replicas_count) + "/" +
std::to_string(partition_count * max_replica_count));
mtp.add(std::move(tp_general));
if (detailed) {
mtp.add(std::move(tp_details));
}
mtp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);
return true;
}
bool app_stat(command_executor *, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"app_name", required_argument, nullptr, 'a'},
{"only_qps", no_argument, nullptr, 'q'},
{"only_usage", no_argument, nullptr, 'u'},
{"json", no_argument, nullptr, 'j'},
{"output", required_argument, nullptr, 'o'},
{"sample_interval_ms", required_argument, nullptr, 'i'},
{nullptr, 0, nullptr, 0}};
std::string app_name;
std::string out_file;
bool only_qps = false;
bool only_usage = false;
bool json = false;
uint32_t sample_interval_ms = FLAGS_tables_sample_interval_ms;
optind = 0;
while (true) {
int option_index = 0;
int c = getopt_long(args.argc, args.argv, "a:qujo:i:", long_options, &option_index);
if (c == -1) {
// -1 means all command-line options have been parsed.
break;
}
switch (c) {
case 'a':
app_name = optarg;
break;
case 'q':
only_qps = true;
break;
case 'u':
only_usage = true;
break;
case 'j':
json = true;
break;
case 'o':
out_file = optarg;
break;
case 'i':
RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
break;
default:
return false;
}
}
if (only_qps && only_usage) {
std::cout << "ERROR: only_qps and only_usage should not be set at the same time"
<< std::endl;
return true;
}
std::vector<row_data> rows;
if (!get_app_stat(sc, app_name, sample_interval_ms, rows)) {
std::cout << "ERROR: query app stat from server failed" << std::endl;
return true;
}
rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
auto &sum = rows.back();
for (size_t i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
sum.partition_count += row.partition_count;
sum.get_qps += row.get_qps;
sum.multi_get_qps += row.multi_get_qps;
sum.batch_get_qps += row.batch_get_qps;
sum.put_qps += row.put_qps;
sum.multi_put_qps += row.multi_put_qps;
sum.remove_qps += row.remove_qps;
sum.multi_remove_qps += row.multi_remove_qps;
sum.incr_qps += row.incr_qps;
sum.check_and_set_qps += row.check_and_set_qps;
sum.check_and_mutate_qps += row.check_and_mutate_qps;
sum.scan_qps += row.scan_qps;
sum.recent_read_cu += row.recent_read_cu;
sum.recent_write_cu += row.recent_write_cu;
sum.recent_expire_count += row.recent_expire_count;
sum.recent_filter_count += row.recent_filter_count;
sum.recent_abnormal_count += row.recent_abnormal_count;
sum.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
sum.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
sum.recent_read_throttling_delay_count += row.recent_read_throttling_delay_count;
sum.recent_read_throttling_reject_count += row.recent_read_throttling_reject_count;
sum.recent_backup_request_throttling_delay_count +=
row.recent_backup_request_throttling_delay_count;
sum.recent_backup_request_throttling_reject_count +=
row.recent_backup_request_throttling_reject_count;
sum.recent_write_splitting_reject_count += row.recent_write_splitting_reject_count;
sum.recent_read_splitting_reject_count += row.recent_read_splitting_reject_count;
sum.recent_write_bulk_load_ingestion_reject_count +=
row.recent_write_bulk_load_ingestion_reject_count;
sum.storage_mb += row.storage_mb;
sum.storage_count += row.storage_count;
sum.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
sum.rdb_block_cache_total_count += row.rdb_block_cache_total_count;
sum.rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
sum.rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
sum.rdb_bf_seek_negatives += row.rdb_bf_seek_negatives;
sum.rdb_bf_seek_total += row.rdb_bf_seek_total;
sum.rdb_bf_point_positive_true += row.rdb_bf_point_positive_true;
sum.rdb_bf_point_positive_total += row.rdb_bf_point_positive_total;
sum.rdb_bf_point_negatives += row.rdb_bf_point_negatives;
}
std::streambuf *buf;
std::ofstream of;
if (!out_file.empty()) {
of.open(out_file);
buf = of.rdbuf();
} else {
buf = std::cout.rdbuf();
}
std::ostream out(buf);
::dsn::utils::table_printer tp("app_stat", 2 /* tabular_width */, 3 /* precision */);
tp.add_title(app_name.empty() ? "app_name" : "pidx");
if (app_name.empty()) {
tp.add_column("app_id", tp_alignment::kRight);
tp.add_column("pcount", tp_alignment::kRight);
}
if (!only_usage) {
tp.add_column("GET", tp_alignment::kRight);
tp.add_column("MGET", tp_alignment::kRight);
tp.add_column("BGET", tp_alignment::kRight);
tp.add_column("PUT", tp_alignment::kRight);
tp.add_column("MPUT", tp_alignment::kRight);
tp.add_column("DEL", tp_alignment::kRight);
tp.add_column("MDEL", tp_alignment::kRight);
tp.add_column("INCR", tp_alignment::kRight);
tp.add_column("CAS", tp_alignment::kRight);
tp.add_column("CAM", tp_alignment::kRight);
tp.add_column("SCAN", tp_alignment::kRight);
tp.add_column("RCU", tp_alignment::kRight);
tp.add_column("WCU", tp_alignment::kRight);
tp.add_column("expire", tp_alignment::kRight);
tp.add_column("filter", tp_alignment::kRight);
tp.add_column("abnormal", tp_alignment::kRight);
tp.add_column("delay", tp_alignment::kRight);
tp.add_column("reject", tp_alignment::kRight);
}
if (!only_qps) {
tp.add_column("file_mb", tp_alignment::kRight);
tp.add_column("file_num", tp_alignment::kRight);
tp.add_column("mem_tbl_mb", tp_alignment::kRight);
tp.add_column("mem_idx_mb", tp_alignment::kRight);
}
tp.add_column("hit_rate", tp_alignment::kRight);
tp.add_column("seek_n_rate", tp_alignment::kRight);
tp.add_column("point_n_rate", tp_alignment::kRight);
tp.add_column("point_fp_rate", tp_alignment::kRight);
for (row_data &row : rows) {
tp.add_row(row.row_name);
if (app_name.empty()) {
tp.append_data(row.app_id);
tp.append_data(row.partition_count);
}
if (!only_usage) {
tp.append_data(row.get_qps);
tp.append_data(row.multi_get_qps);
tp.append_data(row.batch_get_qps);
tp.append_data(row.put_qps);
tp.append_data(row.multi_put_qps);
tp.append_data(row.remove_qps);
tp.append_data(row.multi_remove_qps);
tp.append_data(row.incr_qps);
tp.append_data(row.check_and_set_qps);
tp.append_data(row.check_and_mutate_qps);
tp.append_data(row.scan_qps);
tp.append_data(row.recent_read_cu);
tp.append_data(row.recent_write_cu);
tp.append_data(row.recent_expire_count);
tp.append_data(row.recent_filter_count);
tp.append_data(row.recent_abnormal_count);
tp.append_data(row.recent_write_throttling_delay_count);
tp.append_data(row.recent_write_throttling_reject_count);
}
if (!only_qps) {
tp.append_data(row.storage_mb);
tp.append_data((uint64_t)row.storage_count);
tp.append_data(row.rdb_memtable_mem_usage / (1 << 20U));
tp.append_data(row.rdb_index_and_filter_blocks_mem_usage / (1 << 20U));
}
tp.append_data(
convert_to_ratio(row.rdb_block_cache_hit_count, row.rdb_block_cache_total_count));
tp.append_data(convert_to_ratio(row.rdb_bf_seek_negatives, row.rdb_bf_seek_total));
tp.append_data(
convert_to_ratio(row.rdb_bf_point_negatives,
row.rdb_bf_point_negatives + row.rdb_bf_point_positive_total));
tp.append_data(
convert_to_ratio(row.rdb_bf_point_positive_total - row.rdb_bf_point_positive_true,
(row.rdb_bf_point_positive_total - row.rdb_bf_point_positive_true) +
row.rdb_bf_point_negatives));
}
// TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
tp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);
return true;
}
bool create_app(command_executor *e, shell_context *sc, arguments args)
{
// create <app_name> [-p|--partition_count num] [-r|--replica_count num] [-f|--fail_if_exist]
// [-i|--atomic_idempotent] [-e|--envs k1=v1,k2=v2...]
// All valid parameters and flags are given as follows.
static const std::set<std::string> params = {
"p", "partition_count", "r", "replica_count", "e", "envs"};
static const std::set<std::string> flags = {"f", "fail_if_exist", "i", "atomic_idempotent"};
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
// Check if the input parameters and flags are valid, and there is exact one positional
// argument (i.e. app_name).
const auto &check = validate_cmd(cmd, params, flags, 1);
if (!check) {
SHELL_PRINTLN_ERROR("{}", check.description());
return false;
}
// Get the only positional argument as app_name.
const std::string app_name(cmd(1).str());
int32_t partition_count = 0;
PARSE_OPT_INT(partition_count, 4, {"-p", "--partition_count"});
int32_t replica_count = 0;
PARSE_OPT_INT(replica_count, 3, {"-r", "--replica_count"});
// To get `success_if_exist`, just apply logical NOT to the flag `fail_if_exist`.
const bool success_if_exist = !cmd[{"-f", "--fail_if_exist"}];
const bool atomic_idempotent = cmd[{"-i", "--atomic_idempotent"}];
// Parse each env name/value pair with specified delimiters.
std::map<std::string, std::string> envs;
PARSE_OPT_KV_MAP(envs, ',', '=', {"-e", "--envs"});
const dsn::error_code err = sc->ddl_client->create_app(app_name,
"pegasus",
partition_count,
replica_count,
envs,
false,
success_if_exist,
atomic_idempotent);
if (err == ::dsn::ERR_OK) {
std::cout << "create app \"" << pegasus::utils::c_escape_string(app_name) << "\" succeed"
<< std::endl;
} else {
std::cout << "create app \"" << pegasus::utils::c_escape_string(app_name)
<< "\" failed, error = " << err << std::endl;
}
return true;
}
bool drop_app(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"reserve_seconds", required_argument, 0, 'r'},
{0, 0, 0, 0}};
if (args.argc < 2)
return false;
std::string app_name = args.argv[1];
int reserve_seconds = 0;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "r:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'r':
if (!dsn::buf2int32(optarg, reserve_seconds)) {
fprintf(stderr, "parse %s as reserve_seconds failed\n", optarg);
return false;
}
break;
default:
return false;
}
}
std::cout << "reserve_seconds = " << reserve_seconds << std::endl;
::dsn::error_code err = sc->ddl_client->drop_app(app_name, reserve_seconds);
if (err == ::dsn::ERR_OK)
std::cout << "drop app " << app_name << " succeed" << std::endl;
else
std::cout << "drop app " << app_name << " failed, error=" << err << std::endl;
return true;
}
bool rename_app(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc <= 2) {
return false;
}
const std::string old_app_name = args.argv[1];
const std::string new_app_name = args.argv[2];
auto err_resp = sc->ddl_client->rename_app(old_app_name, new_app_name);
auto err = err_resp.get_error();
const auto &resp = err_resp.get_value();
if (dsn_likely(err.is_ok())) {
err = dsn::error_s::make(resp.err);
}
if (err.is_ok()) {
fmt::print(stdout,
"rename app ok, old_app_name({}), new_app_name({})\n",
old_app_name,
new_app_name);
} else {
std::string error_message(resp.err.to_string());
if (!resp.hint_message.empty()) {
error_message += ", ";
error_message += resp.hint_message;
}
fmt::print(stderr,
"rename app failed, old_app_name({}), new_app_name({}), failed: {}\n",
old_app_name,
new_app_name,
error_message);
}
return true;
}
bool recall_app(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc <= 1)
return false;
int id;
std::string new_name = "";
if (!dsn::buf2int32(args.argv[1], id)) {
fprintf(stderr, "ERROR: parse %s as id failed\n", args.argv[1]);
return false;
}
if (args.argc >= 3) {
new_name = args.argv[2];
}
::dsn::error_code err = sc->ddl_client->recall_app(id, new_name);
if (dsn::ERR_OK == err)
std::cout << "recall app " << id << " succeed" << std::endl;
else
std::cout << "recall app " << id << " failed, error=" << err << std::endl;
return true;
}
bool get_app_envs(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"json", no_argument, 0, 'j'}, {0, 0, 0, 0}};
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c = getopt_long(args.argc, args.argv, "j", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'j':
json = true;
break;
default:
return false;
}
}
if (sc->current_app_name.empty()) {
fprintf(stderr, "No app is using now\nUSAGE: use [app_name]\n");
return true;
}
std::map<std::string, std::string> envs;
::dsn::error_code ret = sc->ddl_client->get_app_envs(sc->current_app_name, envs);
if (ret != ::dsn::ERR_OK) {
fprintf(stderr, "get app env failed with err = %s\n", ret.to_string());
return true;
}
::dsn::utils::table_printer tp("app_envs");
for (auto &kv : envs) {
tp.add_row_name_and_data(kv.first, kv.second);
}
tp.output(std::cout, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);
return true;
}
bool set_app_envs(command_executor *e, shell_context *sc, arguments args)
{
if (sc->current_app_name.empty()) {
fprintf(stderr, "No app is using now\nUSAGE: use [app_name]\n");
return true;
}
if (args.argc < 3) {
return false;
}
if (((args.argc - 1) & 0x01) == 1) {
// key & value count must equal 2*n(n >= 1)
fprintf(stderr, "need speficy the value for key = %s\n", args.argv[args.argc - 1]);
return true;
}
std::vector<std::string> keys;
std::vector<std::string> values;
int idx = 1;
while (idx < args.argc) {
keys.emplace_back(args.argv[idx++]);
values.emplace_back(args.argv[idx++]);
}
auto err_resp = sc->ddl_client->set_app_envs(sc->current_app_name, keys, values);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint_msg = err_resp.get_value().hint_message;
}
if (!err.is_ok()) {
fmt::print(stderr, "set app envs failed with error {} [hint:\"{}\"]!\n", err, hint_msg);
} else {
fmt::print(stdout, "set app envs succeed\n");
}
return true;
}
bool del_app_envs(command_executor *e, shell_context *sc, arguments args)
{
if (sc->current_app_name.empty()) {
fprintf(stderr, "No app is using now\nUSAGE: use [app_name]\n");
return true;
}
if (args.argc <= 1) {
return false;
}
std::vector<std::string> keys;
for (int idx = 1; idx < args.argc; idx++) {
keys.emplace_back(args.argv[idx]);
}
::dsn::error_code ret = sc->ddl_client->del_app_envs(sc->current_app_name, keys);
if (ret != ::dsn::ERR_OK) {
fprintf(stderr, "del app env failed with err = %s\n", ret.to_string());
}
return true;
}
bool clear_app_envs(command_executor *e, shell_context *sc, arguments args)
{
if (sc->current_app_name.empty()) {
fprintf(stderr, "No app is using now\nUSAGE: use [app_name]\n");
return true;
}
static struct option long_options[] = {
{"all", no_argument, 0, 'a'}, {"prefix", required_argument, 0, 'p'}, {0, 0, 0, 0}};
bool clear_all = false;
std::string prefix;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "ap:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'a':
clear_all = true;
break;
case 'p':
prefix = optarg;
break;
default:
return false;
}
}
if (!clear_all && prefix.empty()) {
fprintf(stderr, "must specify one of --all and --prefix options\n");
return false;
}
::dsn::error_code ret = sc->ddl_client->clear_app_envs(sc->current_app_name, clear_all, prefix);
if (ret != dsn::ERR_OK) {
fprintf(stderr, "clear app envs failed with err = %s\n", ret.to_string());
}
return true;
}
bool get_max_replica_count(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"json", no_argument, 0, 'j'}, {0, 0, 0, 0}};
if (args.argc < 2) {
return false;
}
std::string app_name(args.argv[1]);
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c = getopt_long(args.argc, args.argv, "j", long_options, &option_index);
if (c == -1) {
break;
}
switch (c) {
case 'j':
json = true;
break;
default:
return false;
}
}
auto err_resp = sc->ddl_client->get_max_replica_count(app_name);
auto err = err_resp.get_error();
const auto &resp = err_resp.get_value();
if (err.is_ok()) {
err = dsn::error_s::make(resp.err);
}
std::string escaped_app_name(pegasus::utils::c_escape_string(app_name));
if (!err.is_ok()) {
fmt::print(stderr, "get replica count of app({}) failed: {}\n", escaped_app_name, err);
return true;
}
dsn::utils::table_printer tp("max_replica_count");
tp.add_row_name_and_data("max_replica_count", resp.max_replica_count);
tp.output(std::cout, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);
return true;
}
bool set_max_replica_count(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc < 3) {
return false;
}
int new_max_replica_count;
if (!dsn::buf2int32(args.argv[2], new_max_replica_count)) {
fmt::print(stderr, "parse '{}' as replica count failed\n", args.argv[2]);
return false;
}
if (new_max_replica_count < 1) {
fmt::print(stderr, "replica count should be >= 1\n");
return false;
}
std::string app_name(args.argv[1]);
std::string escaped_app_name(pegasus::utils::c_escape_string(app_name));
std::string action(fmt::format(
"set the replica count of app({}) to {}", escaped_app_name, new_max_replica_count));
if (!confirm_unsafe_command(action)) {
return true;
}
auto err_resp = sc->ddl_client->set_max_replica_count(app_name, new_max_replica_count);
auto err = err_resp.get_error();
const auto &resp = err_resp.get_value();
if (dsn_likely(err.is_ok())) {
err = dsn::error_s::make(resp.err);
}
if (err.is_ok()) {
fmt::print(stdout,
"set replica count of app({}) from {} to {}: {}\n",
escaped_app_name,
resp.old_max_replica_count,
new_max_replica_count,
resp.hint_message.empty() ? "success" : resp.hint_message);
} else {
std::string error_message(resp.err.to_string());
if (!resp.hint_message.empty()) {
error_message += ", ";
error_message += resp.hint_message;
}
fmt::print(
stderr, "set replica count of app({}) failed: {}\n", escaped_app_name, error_message);
}
return true;
}
bool get_atomic_idempotent(command_executor *e, shell_context *sc, arguments args)
{
// get_atomic_idempotent <app_name> [-j|--json]
// All valid flags are given as follows.
static const std::set<std::string> flags = {"j", "json"};
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
// Check if the input flags are valid, and there is exact one positional argument
// (i.e. app_name).
const auto &check = validate_cmd(cmd, {}, flags, 1);
if (!check) {
SHELL_PRINTLN_ERROR("{}", check.description());
return false;
}
// Get the only positional argument as app_name.
const std::string app_name(cmd(1).str());
const bool json = cmd[{"-j", "--json"}];
const auto &result = sc->ddl_client->get_atomic_idempotent(app_name);
auto status = result.get_error();
if (status) {
status = FMT_ERR(result.get_value().err, result.get_value().hint_message);
}
if (!status) {
SHELL_PRINTLN_ERROR("get_atomic_idempotent failed, error={}", status);
return true;
}
dsn::utils::table_printer printer("atomic_idempotent");
printer.add_row_name_and_data("atomic_idempotent", result.get_value().atomic_idempotent);
dsn::utils::output(json, printer);
return true;
}
namespace {
bool set_atomic_idempotent(command_executor *e,
shell_context *sc,
arguments args,
bool atomic_idempotent)
{
// <enable|disable>_atomic_idempotent <app_name>
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
// Check if there is exact one positional argument (i.e. app_name).
const auto &check = validate_cmd(cmd, {}, {}, 1);
if (!check) {
SHELL_PRINTLN_ERROR("{}", check.description());
return false;
}
// Get the only positional argument as app_name.
const std::string app_name(cmd(1).str());
const auto &result = sc->ddl_client->set_atomic_idempotent(app_name, atomic_idempotent);
auto status = result.get_error();
if (status) {
status = FMT_ERR(result.get_value().err, result.get_value().hint_message);
}
if (!status) {
SHELL_PRINTLN_ERROR("set_atomic_idempotent failed, error={}", status);
return true;
}
const auto &resp = result.get_value();
SHELL_PRINTLN_OK("set_atomic_idempotent from {} to {}: {}\n",
resp.old_atomic_idempotent,
atomic_idempotent,
resp.hint_message.empty() ? "succeed" : resp.hint_message);
return true;
}
} // anonymous namespace
bool enable_atomic_idempotent(command_executor *e, shell_context *sc, arguments args)
{
// enable_atomic_idempotent <app_name>
return set_atomic_idempotent(e, sc, args, true);
}
bool disable_atomic_idempotent(command_executor *e, shell_context *sc, arguments args)
{
// disable_atomic_idempotent <app_name>
return set_atomic_idempotent(e, sc, args, false);
}