in src/shell/commands/data_operations.cpp [1964:2215]
bool clear_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"partition", required_argument, 0, 'p'},
{"max_batch_count", required_argument, 0, 'b'},
{"timeout_ms", required_argument, 0, 't'},
{"hash_key_filter_type", required_argument, 0, 'h'},
{"hash_key_filter_pattern", required_argument, 0, 'x'},
{"sort_key_filter_type", required_argument, 0, 's'},
{"sort_key_filter_pattern", required_argument, 0, 'y'},
{"value_filter_type", required_argument, 0, 'v'},
{"value_filter_pattern", required_argument, 0, 'z'},
{"force", no_argument, 0, 'f'},
{0, 0, 0, 0}};
int32_t partition = -1;
int max_batch_count = 500;
int timeout_ms = sc->timeout_ms;
std::string hash_key_filter_type_name("no_filter");
std::string sort_key_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type sort_key_filter_type =
pegasus::pegasus_client::FT_NO_FILTER;
std::string sort_key_filter_pattern;
std::string value_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
std::string value_filter_pattern;
bool force = false;
pegasus::pegasus_client::scan_options options;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "p:b:t:h:x:s:y:v:z:f", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'p':
if (!dsn::buf2int32(optarg, partition)) {
fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
return false;
}
if (partition < 0) {
fprintf(stderr, "ERROR: partition should be greater than 0\n");
return false;
}
break;
case 'b':
if (!dsn::buf2int32(optarg, max_batch_count)) {
fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
return false;
}
break;
case 't':
if (!dsn::buf2int32(optarg, timeout_ms)) {
fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
return false;
}
break;
case 'h':
options.hash_key_filter_type = parse_filter_type(optarg, false);
if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
return false;
}
hash_key_filter_type_name = optarg;
break;
case 'x':
options.hash_key_filter_pattern = unescape_str(optarg);
break;
case 's':
sort_key_filter_type = parse_filter_type(optarg, true);
if (sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
return false;
}
sort_key_filter_type_name = optarg;
break;
case 'y':
sort_key_filter_pattern = unescape_str(optarg);
break;
case 'v':
value_filter_type = parse_filter_type(optarg, true);
if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid value_filter_type param\n");
return false;
}
value_filter_type_name = optarg;
break;
case 'z':
value_filter_pattern = unescape_str(optarg);
break;
case 'f':
force = true;
break;
default:
return false;
}
}
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
fprintf(stderr, "INFO: cluster_name = %s\n", sc->pg_client->get_cluster_name());
fprintf(stderr, "INFO: app_name = %s\n", sc->pg_client->get_app_name());
fprintf(stderr,
"INFO: partition = %s\n",
partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: hash_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: sort_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(sort_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: value_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(value_filter_pattern).c_str());
}
fprintf(stderr, "INFO: force = %s\n", force ? "true" : "false");
if (!force) {
fprintf(stderr,
"ERROR: be careful to clear data!!! Please specify --force if you are "
"determined to do.\n");
return false;
}
std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
options.timeout_ms = timeout_ms;
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
if (sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
options.sort_key_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
else
options.sort_key_filter_type = sort_key_filter_type;
options.sort_key_filter_pattern = sort_key_filter_pattern;
}
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER)
options.no_value = false;
else
options.no_value = true;
int ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
if (ret != pegasus::PERR_OK) {
fprintf(
stderr, "ERROR: open app scanner failed: %s\n", sc->pg_client->get_error_string(ret));
return true;
}
fprintf(
stderr, "INFO: open app scanner succeed, partition_count = %d\n", (int)raw_scanners.size());
std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
for (auto p : raw_scanners)
scanners.push_back(p->get_smart_wrapper());
raw_scanners.clear();
if (partition != -1) {
if (partition >= scanners.size()) {
fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
return true;
}
pegasus::pegasus_client::pegasus_scanner_wrapper s = std::move(scanners[partition]);
scanners.clear();
scanners.push_back(std::move(s));
}
int split_count = scanners.size();
fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);
std::atomic_bool error_occurred(false);
std::vector<std::unique_ptr<scan_data_context>> contexts;
for (int i = 0; i < split_count; i++) {
scan_data_context *context = new scan_data_context(SCAN_CLEAR,
i,
max_batch_count,
timeout_ms,
scanners[i],
sc->pg_client,
nullptr,
&error_occurred);
context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
context->set_value_filter(value_filter_type, value_filter_pattern);
contexts.emplace_back(context);
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
}
int sleep_seconds = 0;
long last_total_rows = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
sleep_seconds++;
int completed_split_count = 0;
long cur_total_rows = 0;
for (int i = 0; i < split_count; i++) {
cur_total_rows += contexts[i]->split_rows.load();
if (contexts[i]->split_request_count.load() == 0)
completed_split_count++;
}
if (error_occurred.load()) {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"%ld rows, error occurred, terminating...\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
cur_total_rows - last_total_rows);
} else {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"%ld rows\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
cur_total_rows - last_total_rows);
}
if (completed_split_count == split_count)
break;
last_total_rows = cur_total_rows;
}
if (error_occurred.load()) {
fprintf(stderr, "ERROR: error occurred, terminate processing\n");
}
long total_rows = 0;
for (int i = 0; i < split_count; i++) {
fprintf(stderr, "INFO: split[%d]: %ld rows\n", i, contexts[i]->split_rows.load());
total_rows += contexts[i]->split_rows.load();
}
fprintf(stderr,
"\nClear %s, total %ld rows.\n",
error_occurred.load() ? "terminated" : "done",
total_rows);
return true;
}