in src/shell/commands/data_operations.cpp [1580:1963]
bool copy_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"target_cluster_name", required_argument, 0, 'c'},
{"target_app_name", required_argument, 0, 'a'},
{"partition", required_argument, 0, 'p'},
{"max_batch_count", required_argument, 0, 'b'},
{"timeout_ms", required_argument, 0, 't'},
{"hash_key_filter_type", required_argument, 0, 'h'},
{"hash_key_filter_pattern", required_argument, 0, 'x'},
{"sort_key_filter_type", required_argument, 0, 's'},
{"sort_key_filter_pattern", required_argument, 0, 'y'},
{"value_filter_type", required_argument, 0, 'v'},
{"value_filter_pattern", required_argument, 0, 'z'},
{"max_multi_set_concurrency", required_argument, 0, 'm'},
{"no_overwrite", no_argument, 0, 'n'},
{"no_value", no_argument, 0, 'i'},
{"geo_data", no_argument, 0, 'g'},
{"no_ttl", no_argument, 0, 'e'},
{0, 0, 0, 0}};
std::string target_cluster_name;
std::string target_app_name;
std::string target_geo_app_name;
int32_t partition = -1;
int max_batch_count = 500;
int max_multi_set_concurrency = 20;
int timeout_ms = sc->timeout_ms;
bool is_geo_data = false;
bool no_overwrite = false;
bool use_multi_set = false;
std::string hash_key_filter_type_name("no_filter");
std::string sort_key_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type sort_key_filter_type =
pegasus::pegasus_client::FT_NO_FILTER;
std::string sort_key_filter_pattern;
std::string value_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
std::string value_filter_pattern;
pegasus::pegasus_client::scan_options options;
options.return_expire_ts = true;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(
args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:m:o:nigeu", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'c':
target_cluster_name = optarg;
break;
case 'a':
target_app_name = optarg;
target_geo_app_name = target_app_name + "_geo";
break;
case 'p':
if (!dsn::buf2int32(optarg, partition)) {
fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
return false;
}
if (partition < 0) {
fprintf(stderr, "ERROR: partition should be greater than 0\n");
return false;
}
break;
case 'b':
if (!dsn::buf2int32(optarg, max_batch_count)) {
fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
return false;
}
break;
case 't':
if (!dsn::buf2int32(optarg, timeout_ms)) {
fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
return false;
}
break;
case 'h':
options.hash_key_filter_type = parse_filter_type(optarg, false);
if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
return false;
}
hash_key_filter_type_name = optarg;
break;
case 'x':
options.hash_key_filter_pattern = unescape_str(optarg);
break;
case 's':
sort_key_filter_type = parse_filter_type(optarg, true);
if (sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
return false;
}
sort_key_filter_type_name = optarg;
break;
case 'y':
sort_key_filter_pattern = unescape_str(optarg);
break;
case 'v':
value_filter_type = parse_filter_type(optarg, true);
if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid value_filter_type param\n");
return false;
}
value_filter_type_name = optarg;
break;
case 'z':
value_filter_pattern = unescape_str(optarg);
break;
case 'm':
if (!dsn::buf2int32(optarg, max_multi_set_concurrency)) {
fprintf(stderr, "ERROR: parse %s as max_multi_set_concurrency failed\n", optarg);
return false;
}
break;
case 'o':
if (!dsn::buf2int32(optarg, options.batch_size)) {
fprintf(stderr, "ERROR: parse %s as scan_option_batch_size failed\n", optarg);
return false;
}
break;
case 'n':
no_overwrite = true;
break;
case 'i':
options.no_value = true;
break;
case 'g':
is_geo_data = true;
break;
case 'e':
options.return_expire_ts = false;
break;
case 'u':
use_multi_set = true;
break;
default:
return false;
}
}
if (target_cluster_name.empty()) {
fprintf(stderr, "ERROR: target_cluster_name not specified\n");
return false;
}
if (target_app_name.empty()) {
fprintf(stderr, "ERROR: target_app_name not specified\n");
return false;
}
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER && options.no_value) {
fprintf(stderr, "ERROR: no_value should not be set when value_filter_type is set\n");
return false;
}
if (max_multi_set_concurrency <= 0) {
fprintf(stderr, "ERROR: max_multi_set_concurrency should be greater than 0\n");
return false;
}
if (use_multi_set && no_overwrite) {
fprintf(stderr, "ERROR: copy with multi_set not support no_overwrite!\n");
return false;
}
fprintf(stderr, "INFO: source_cluster_name = %s\n", sc->pg_client->get_cluster_name());
fprintf(stderr, "INFO: source_app_name = %s\n", sc->pg_client->get_app_name());
fprintf(stderr, "INFO: target_cluster_name = %s\n", target_cluster_name.c_str());
fprintf(stderr, "INFO: target_app_name = %s\n", target_app_name.c_str());
if (is_geo_data) {
fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str());
}
if (use_multi_set) {
fprintf(stderr,
"INFO: copy use asyncer_multi_set, max_multi_set_concurrency = %d\n",
max_multi_set_concurrency);
}
fprintf(stderr,
"INFO: partition = %s\n",
partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: hash_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: sort_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(sort_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: value_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(value_filter_pattern).c_str());
}
fprintf(stderr, "INFO: no_overwrite = %s\n", no_overwrite ? "true" : "false");
fprintf(stderr, "INFO: no_value = %s\n", options.no_value ? "true" : "false");
if (target_cluster_name == sc->pg_client->get_cluster_name() &&
target_app_name == sc->pg_client->get_app_name()) {
fprintf(stderr, "ERROR: source app and target app is the same\n");
return true;
}
pegasus::pegasus_client *target_client = pegasus::pegasus_client_factory::get_client(
target_cluster_name.c_str(), target_app_name.c_str());
if (target_client == nullptr) {
fprintf(stderr, "ERROR: get target app client failed\n");
return true;
}
int ret = target_client->exist("a", "b");
if (ret != pegasus::PERR_OK && ret != pegasus::PERR_NOT_FOUND) {
fprintf(
stderr, "ERROR: test target app failed: %s\n", target_client->get_error_string(ret));
return true;
}
std::unique_ptr<pegasus::geo::geo_client> target_geo_client;
if (is_geo_data) {
target_geo_client.reset(new pegasus::geo::geo_client("config.ini",
target_cluster_name.c_str(),
target_app_name.c_str(),
target_geo_app_name.c_str()));
}
std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
options.timeout_ms = timeout_ms;
if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
if (sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
options.sort_key_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
else
options.sort_key_filter_type = sort_key_filter_type;
options.sort_key_filter_pattern = sort_key_filter_pattern;
}
ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
if (ret != pegasus::PERR_OK) {
fprintf(stderr,
"ERROR: open source app scanner failed: %s\n",
sc->pg_client->get_error_string(ret));
return true;
}
fprintf(stderr,
"INFO: open source app scanner succeed, partition_count = %d\n",
(int)raw_scanners.size());
std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
for (auto p : raw_scanners)
scanners.push_back(p->get_smart_wrapper());
raw_scanners.clear();
if (partition != -1) {
if (partition >= scanners.size()) {
fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
return true;
}
pegasus::pegasus_client::pegasus_scanner_wrapper s = std::move(scanners[partition]);
scanners.clear();
scanners.push_back(std::move(s));
}
int split_count = scanners.size();
fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);
std::atomic_bool error_occurred(false);
std::vector<std::unique_ptr<scan_data_context>> contexts;
scan_data_operator op = SCAN_COPY;
if (is_geo_data) {
op = SCAN_GEN_GEO;
} else if (use_multi_set) {
fprintf(stderr,
"WARN: used multi_set will lose accurate ttl time per value! "
"ttl time will be assign the max value of this batch data.\n");
op = SCAN_AND_MULTI_SET;
fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n", FLAGS_worker_count);
// threadpool worker_count should greater than source app scanner count
if (FLAGS_worker_count <= split_count) {
fprintf(stderr,
"INFO: THREAD_POOL_DEFAULT worker_count should greater than source app scanner "
"count %d",
split_count);
return true;
}
}
for (int i = 0; i < split_count; i++) {
scan_data_context *context = new scan_data_context(op,
i,
max_batch_count,
timeout_ms,
scanners[i],
target_client,
target_geo_client.get(),
&error_occurred,
max_multi_set_concurrency);
context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
context->set_value_filter(value_filter_type, value_filter_pattern);
if (no_overwrite)
context->set_no_overwrite();
contexts.emplace_back(context);
if (op == SCAN_AND_MULTI_SET) {
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
} else {
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
}
}
// wait thread complete
int sleep_seconds = 0;
long last_total_rows = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
sleep_seconds++;
int completed_split_count = 0;
long cur_total_rows = 0;
for (int i = 0; i < split_count; i++) {
cur_total_rows += contexts[i]->split_rows.load();
if (op != SCAN_AND_MULTI_SET && contexts[i]->split_request_count.load() == 0) {
completed_split_count++;
} else if (contexts[i]->split_completed.load()) {
completed_split_count++;
}
}
if (error_occurred.load()) {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"%ld rows, error occurred, terminating...\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
cur_total_rows - last_total_rows);
} else {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"%ld rows\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
cur_total_rows - last_total_rows);
}
if (completed_split_count == split_count)
break;
last_total_rows = cur_total_rows;
}
if (error_occurred.load()) {
fprintf(stderr, "ERROR: error occurred, processing terminated\n");
}
long total_rows = 0;
for (int i = 0; i < split_count; i++) {
fprintf(stderr, "INFO: split[%d]: %ld rows\n", i, contexts[i]->split_rows.load());
total_rows += contexts[i]->split_rows.load();
}
fprintf(stderr,
"\nCopy %s, total %ld rows.\n",
error_occurred.load() ? "terminated" : "done",
total_rows);
return true;
}