in src/shell/commands/data_operations.cpp [2305:2704]
bool count_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"precise", no_argument, 0, 'c'},
{"partition", required_argument, 0, 'p'},
{"max_batch_count", required_argument, 0, 'b'},
{"timeout_ms", required_argument, 0, 't'},
{"hash_key_filter_type", required_argument, 0, 'h'},
{"hash_key_filter_pattern", required_argument, 0, 'x'},
{"sort_key_filter_type", required_argument, 0, 's'},
{"sort_key_filter_pattern", required_argument, 0, 'y'},
{"value_filter_type", required_argument, 0, 'v'},
{"value_filter_pattern", required_argument, 0, 'z'},
{"diff_hash_key", no_argument, 0, 'd'},
{"stat_size", no_argument, 0, 'a'},
{"top_count", required_argument, 0, 'n'},
{"run_seconds", required_argument, 0, 'r'},
{0, 0, 0, 0}};
// "count_data" usually need scan all online records to get precise result, which may affect
// cluster availability, so here define precise = false defaultly and it will return estimate
// count immediately.
bool precise = false;
bool need_scan = false;
int32_t partition = -1;
int max_batch_count = 500;
int timeout_ms = sc->timeout_ms;
std::string hash_key_filter_type_name("no_filter");
std::string sort_key_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type sort_key_filter_type =
pegasus::pegasus_client::FT_NO_FILTER;
std::string sort_key_filter_pattern;
std::string value_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
std::string value_filter_pattern;
bool diff_hash_key = false;
bool stat_size = false;
int top_count = 0;
int run_seconds = 0;
pegasus::pegasus_client::scan_options options;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(
args.argc, args.argv, "cp:b:t:h:x:s:y:v:z:dan:r:", long_options, &option_index);
if (c == -1)
break;
// input any valid parameter means you want to get precise count by scanning.
need_scan = true;
switch (c) {
case 'c':
precise = true;
break;
case 'p':
if (!dsn::buf2int32(optarg, partition)) {
fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
return false;
}
if (partition < 0) {
fprintf(stderr, "ERROR: partition should be greater than 0\n");
return false;
}
break;
case 'b':
if (!dsn::buf2int32(optarg, max_batch_count)) {
fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
return false;
}
break;
case 't':
if (!dsn::buf2int32(optarg, timeout_ms)) {
fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
return false;
}
break;
case 'h':
options.hash_key_filter_type = parse_filter_type(optarg, false);
if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
return false;
}
hash_key_filter_type_name = optarg;
break;
case 'x':
options.hash_key_filter_pattern = unescape_str(optarg);
break;
case 's':
sort_key_filter_type = parse_filter_type(optarg, true);
if (sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
return false;
}
sort_key_filter_type_name = optarg;
break;
case 'y':
sort_key_filter_pattern = unescape_str(optarg);
break;
case 'v':
value_filter_type = parse_filter_type(optarg, true);
if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid value_filter_type param\n");
return false;
}
value_filter_type_name = optarg;
break;
case 'z':
value_filter_pattern = unescape_str(optarg);
break;
case 'd':
diff_hash_key = true;
break;
case 'a':
stat_size = true;
break;
case 'n':
if (!dsn::buf2int32(optarg, top_count)) {
fprintf(stderr, "parse %s as top_count failed\n", optarg);
return false;
}
break;
case 'r':
if (!dsn::buf2int32(optarg, run_seconds)) {
fprintf(stderr, "parse %s as run_seconds failed\n", optarg);
return false;
}
break;
default:
return false;
}
}
if (!precise) {
if (need_scan) {
fprintf(stderr,
"ERROR: you must input [-c|--precise] flag when you expect to get precise "
"result by scaning all record online\n");
return false;
}
std::vector<row_data> rows;
const std::string table_name(sc->pg_client->get_app_name());
CHECK(!table_name.empty(), "table_name must be non-empty, see data_operations()");
if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) {
fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed");
return true;
}
rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
auto &sum = rows.back();
for (size_t i = 0; i < rows.size() - 1; ++i) {
const row_data &row = rows[i];
sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
}
::dsn::utils::table_printer tp("count_data");
tp.add_title("pidx");
tp.add_column("estimate_count");
for (const row_data &row : rows) {
tp.add_row(row.row_name);
tp.append_data(row.rdb_estimate_num_keys);
}
tp.output(std::cout, tp_output_format::kTabular);
return true;
}
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
if (top_count < 0) {
fprintf(stderr, "ERROR: top_count should be no less than 0\n");
return false;
}
if (run_seconds < 0) {
fprintf(stderr, "ERROR: run_seconds should be no less than 0\n");
return false;
}
fprintf(stderr, "INFO: cluster_name = %s\n", sc->pg_client->get_cluster_name());
fprintf(stderr, "INFO: app_name = %s\n", sc->pg_client->get_app_name());
fprintf(stderr,
"INFO: partition = %s\n",
partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: hash_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: sort_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(sort_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: value_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(value_filter_pattern).c_str());
}
fprintf(stderr, "INFO: diff_hash_key = %s\n", diff_hash_key ? "true" : "false");
fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false");
fprintf(stderr, "INFO: top_count = %d\n", top_count);
fprintf(stderr, "INFO: run_seconds = %d\n", run_seconds);
std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
options.timeout_ms = timeout_ms;
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
if (sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
options.sort_key_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
else
options.sort_key_filter_type = sort_key_filter_type;
options.sort_key_filter_pattern = sort_key_filter_pattern;
}
if (stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER)
options.no_value = false;
else
options.no_value = true;
// Decide whether real data should be returned to client. Once the real data is
// decided not to be returned to client side: option `only_return_count` will be
// used.
if (diff_hash_key || stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER ||
sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT) {
options.only_return_count = false;
} else {
options.only_return_count = true;
fprintf(stderr, "INFO: scanner only return kv count, not return value\n");
}
int ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
if (ret != pegasus::PERR_OK) {
fprintf(
stderr, "ERROR: open app scanner failed: %s\n", sc->pg_client->get_error_string(ret));
return true;
}
fprintf(
stderr, "INFO: open app scanner succeed, partition_count = %d\n", (int)raw_scanners.size());
std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
for (auto p : raw_scanners)
scanners.push_back(p->get_smart_wrapper());
raw_scanners.clear();
if (partition != -1) {
if (partition >= scanners.size()) {
fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
return true;
}
pegasus::pegasus_client::pegasus_scanner_wrapper s = std::move(scanners[partition]);
scanners.clear();
scanners.push_back(std::move(s));
}
int split_count = scanners.size();
fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);
std::atomic_bool error_occurred(false);
std::vector<std::unique_ptr<scan_data_context>> contexts;
std::shared_ptr<rocksdb::Statistics> statistics = rocksdb::CreateDBStatistics();
for (int i = 0; i < split_count; i++) {
scan_data_context *context = new scan_data_context(SCAN_COUNT,
i,
max_batch_count,
timeout_ms,
scanners[i],
sc->pg_client,
nullptr,
&error_occurred,
0,
stat_size,
statistics,
top_count,
diff_hash_key);
context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
context->set_value_filter(value_filter_type, value_filter_pattern);
contexts.emplace_back(context);
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
}
int sleep_seconds = 0;
long last_total_rows = 0;
bool stopped_by_wait_seconds = false;
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
sleep_seconds++;
if (run_seconds > 0 && !stopped_by_wait_seconds && sleep_seconds >= run_seconds) {
// here use compare-and-swap primitive:
// - if error_occurred is already set true by scanners as error occured, then
// stopped_by_wait_seconds will be assigned as false.
// - else, error_occurred will be set true, and stopped_by_wait_seconds will be
// assigned as true.
bool expected = false;
stopped_by_wait_seconds = error_occurred.compare_exchange_strong(expected, true);
}
int completed_split_count = 0;
long cur_total_rows = 0;
long cur_total_hash_key_count = 0;
for (int i = 0; i < split_count; i++) {
cur_total_rows += contexts[i]->split_rows.load();
if (diff_hash_key)
cur_total_hash_key_count += contexts[i]->split_hash_key_count.load();
if (contexts[i]->split_request_count.load() == 0)
completed_split_count++;
}
char hash_key_count_str[100];
hash_key_count_str[0] = '\0';
if (diff_hash_key) {
sprintf(hash_key_count_str, " (%ld hash keys)", cur_total_hash_key_count);
}
if (!stopped_by_wait_seconds && error_occurred.load()) {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
"%ld rows, error occurred, terminating...\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
hash_key_count_str,
cur_total_rows - last_total_rows);
} else {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
"%ld rows\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
hash_key_count_str,
cur_total_rows - last_total_rows);
}
if (completed_split_count == split_count)
break;
last_total_rows = cur_total_rows;
if (stat_size && sleep_seconds % 10 == 0) {
print_current_scan_state(contexts, "partially", stat_size, statistics, diff_hash_key);
}
}
if (error_occurred.load()) {
if (stopped_by_wait_seconds) {
fprintf(stderr, "INFO: reached run seconds, terminate processing\n");
} else {
fprintf(stderr, "ERROR: error occurred, terminate processing\n");
}
}
std::string stop_desc;
if (error_occurred.load()) {
if (stopped_by_wait_seconds) {
stop_desc = "terminated as run time used out";
} else {
stop_desc = "terminated as error occurred";
}
} else {
stop_desc = "done";
}
print_current_scan_state(contexts, stop_desc, stat_size, statistics, diff_hash_key);
if (stat_size) {
if (top_count > 0) {
top_container::top_heap heap;
for (int i = 0; i < split_count; i++) {
top_container::top_heap &h = contexts[i]->top_rows.all();
while (!h.empty()) {
heap.push(h.top());
h.pop();
}
}
for (int i = 1; i <= top_count && !heap.empty(); i++) {
const top_container::top_heap_item &item = heap.top();
fprintf(stderr,
"[top][%d].hash_key = \"%s\"\n",
i,
pegasus::utils::c_escape_string(item.hash_key, sc->escape_all).c_str());
fprintf(stderr,
"[top][%d].sort_key = \"%s\"\n",
i,
pegasus::utils::c_escape_string(item.sort_key, sc->escape_all).c_str());
fprintf(stderr, "[top][%d].row_size = %ld\n", i, item.row_size);
heap.pop();
}
}
}
return true;
}