bool copy_data()

in src/shell/commands/data_operations.cpp [1580:1963]


bool copy_data(command_executor *e, shell_context *sc, arguments args)
{
    static struct option long_options[] = {{"target_cluster_name", required_argument, 0, 'c'},
                                           {"target_app_name", required_argument, 0, 'a'},
                                           {"partition", required_argument, 0, 'p'},
                                           {"max_batch_count", required_argument, 0, 'b'},
                                           {"timeout_ms", required_argument, 0, 't'},
                                           {"hash_key_filter_type", required_argument, 0, 'h'},
                                           {"hash_key_filter_pattern", required_argument, 0, 'x'},
                                           {"sort_key_filter_type", required_argument, 0, 's'},
                                           {"sort_key_filter_pattern", required_argument, 0, 'y'},
                                           {"value_filter_type", required_argument, 0, 'v'},
                                           {"value_filter_pattern", required_argument, 0, 'z'},
                                           {"max_multi_set_concurrency", required_argument, 0, 'm'},
                                           {"no_overwrite", no_argument, 0, 'n'},
                                           {"no_value", no_argument, 0, 'i'},
                                           {"geo_data", no_argument, 0, 'g'},
                                           {"no_ttl", no_argument, 0, 'e'},
                                           {0, 0, 0, 0}};

    std::string target_cluster_name;
    std::string target_app_name;
    std::string target_geo_app_name;
    int32_t partition = -1;
    int max_batch_count = 500;
    int max_multi_set_concurrency = 20;
    int timeout_ms = sc->timeout_ms;
    bool is_geo_data = false;
    bool no_overwrite = false;
    bool use_multi_set = false;
    std::string hash_key_filter_type_name("no_filter");
    std::string sort_key_filter_type_name("no_filter");
    pegasus::pegasus_client::filter_type sort_key_filter_type =
        pegasus::pegasus_client::FT_NO_FILTER;
    std::string sort_key_filter_pattern;
    std::string value_filter_type_name("no_filter");
    pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
    std::string value_filter_pattern;
    pegasus::pegasus_client::scan_options options;
    options.return_expire_ts = true;

    optind = 0;
    while (true) {
        int option_index = 0;
        int c;
        c = getopt_long(
            args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:m:o:nigeu", long_options, &option_index);
        if (c == -1)
            break;
        switch (c) {
        case 'c':
            target_cluster_name = optarg;
            break;
        case 'a':
            target_app_name = optarg;
            target_geo_app_name = target_app_name + "_geo";
            break;
        case 'p':
            if (!dsn::buf2int32(optarg, partition)) {
                fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
                return false;
            }
            if (partition < 0) {
                fprintf(stderr, "ERROR: partition should be greater than 0\n");
                return false;
            }
            break;
        case 'b':
            if (!dsn::buf2int32(optarg, max_batch_count)) {
                fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
                return false;
            }
            break;
        case 't':
            if (!dsn::buf2int32(optarg, timeout_ms)) {
                fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
                return false;
            }
            break;
        case 'h':
            options.hash_key_filter_type = parse_filter_type(optarg, false);
            if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
                return false;
            }
            hash_key_filter_type_name = optarg;
            break;
        case 'x':
            options.hash_key_filter_pattern = unescape_str(optarg);
            break;
        case 's':
            sort_key_filter_type = parse_filter_type(optarg, true);
            if (sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
                return false;
            }
            sort_key_filter_type_name = optarg;
            break;
        case 'y':
            sort_key_filter_pattern = unescape_str(optarg);
            break;
        case 'v':
            value_filter_type = parse_filter_type(optarg, true);
            if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid value_filter_type param\n");
                return false;
            }
            value_filter_type_name = optarg;
            break;
        case 'z':
            value_filter_pattern = unescape_str(optarg);
            break;
        case 'm':
            if (!dsn::buf2int32(optarg, max_multi_set_concurrency)) {
                fprintf(stderr, "ERROR: parse %s as max_multi_set_concurrency failed\n", optarg);
                return false;
            }
            break;
        case 'o':
            if (!dsn::buf2int32(optarg, options.batch_size)) {
                fprintf(stderr, "ERROR: parse %s as scan_option_batch_size failed\n", optarg);
                return false;
            }
            break;
        case 'n':
            no_overwrite = true;
            break;
        case 'i':
            options.no_value = true;
            break;
        case 'g':
            is_geo_data = true;
            break;
        case 'e':
            options.return_expire_ts = false;
            break;
        case 'u':
            use_multi_set = true;
            break;
        default:
            return false;
        }
    }

    if (target_cluster_name.empty()) {
        fprintf(stderr, "ERROR: target_cluster_name not specified\n");
        return false;
    }

    if (target_app_name.empty()) {
        fprintf(stderr, "ERROR: target_app_name not specified\n");
        return false;
    }

    if (max_batch_count <= 1) {
        fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
        return false;
    }

    if (timeout_ms <= 0) {
        fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
        return false;
    }

    if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER && options.no_value) {
        fprintf(stderr, "ERROR: no_value should not be set when value_filter_type is set\n");
        return false;
    }

    if (max_multi_set_concurrency <= 0) {
        fprintf(stderr, "ERROR: max_multi_set_concurrency should be greater than 0\n");
        return false;
    }

    if (use_multi_set && no_overwrite) {
        fprintf(stderr, "ERROR: copy with multi_set not support no_overwrite!\n");
        return false;
    }

    fprintf(stderr, "INFO: source_cluster_name = %s\n", sc->pg_client->get_cluster_name());
    fprintf(stderr, "INFO: source_app_name = %s\n", sc->pg_client->get_app_name());
    fprintf(stderr, "INFO: target_cluster_name = %s\n", target_cluster_name.c_str());
    fprintf(stderr, "INFO: target_app_name = %s\n", target_app_name.c_str());
    if (is_geo_data) {
        fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str());
    }
    if (use_multi_set) {
        fprintf(stderr,
                "INFO: copy use asyncer_multi_set, max_multi_set_concurrency = %d\n",
                max_multi_set_concurrency);
    }
    fprintf(stderr,
            "INFO: partition = %s\n",
            partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
    fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
    fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
    fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
    if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: hash_key_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
    if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: sort_key_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(sort_key_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
    if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: value_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(value_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: no_overwrite = %s\n", no_overwrite ? "true" : "false");
    fprintf(stderr, "INFO: no_value = %s\n", options.no_value ? "true" : "false");

    if (target_cluster_name == sc->pg_client->get_cluster_name() &&
        target_app_name == sc->pg_client->get_app_name()) {
        fprintf(stderr, "ERROR: source app and target app is the same\n");
        return true;
    }

    pegasus::pegasus_client *target_client = pegasus::pegasus_client_factory::get_client(
        target_cluster_name.c_str(), target_app_name.c_str());
    if (target_client == nullptr) {
        fprintf(stderr, "ERROR: get target app client failed\n");
        return true;
    }

    int ret = target_client->exist("a", "b");
    if (ret != pegasus::PERR_OK && ret != pegasus::PERR_NOT_FOUND) {
        fprintf(
            stderr, "ERROR: test target app failed: %s\n", target_client->get_error_string(ret));
        return true;
    }

    std::unique_ptr<pegasus::geo::geo_client> target_geo_client;
    if (is_geo_data) {
        target_geo_client.reset(new pegasus::geo::geo_client("config.ini",
                                                             target_cluster_name.c_str(),
                                                             target_app_name.c_str(),
                                                             target_geo_app_name.c_str()));
    }

    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
    options.timeout_ms = timeout_ms;
    if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        if (sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
            options.sort_key_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
        else
            options.sort_key_filter_type = sort_key_filter_type;
        options.sort_key_filter_pattern = sort_key_filter_pattern;
    }
    ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
    if (ret != pegasus::PERR_OK) {
        fprintf(stderr,
                "ERROR: open source app scanner failed: %s\n",
                sc->pg_client->get_error_string(ret));
        return true;
    }
    fprintf(stderr,
            "INFO: open source app scanner succeed, partition_count = %d\n",
            (int)raw_scanners.size());

    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
    for (auto p : raw_scanners)
        scanners.push_back(p->get_smart_wrapper());
    raw_scanners.clear();

    if (partition != -1) {
        if (partition >= scanners.size()) {
            fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
            return true;
        }
        pegasus::pegasus_client::pegasus_scanner_wrapper s = std::move(scanners[partition]);
        scanners.clear();
        scanners.push_back(std::move(s));
    }
    int split_count = scanners.size();
    fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);

    std::atomic_bool error_occurred(false);
    std::vector<std::unique_ptr<scan_data_context>> contexts;

    scan_data_operator op = SCAN_COPY;
    if (is_geo_data) {
        op = SCAN_GEN_GEO;
    } else if (use_multi_set) {
        fprintf(stderr,
                "WARN: used multi_set will lose accurate ttl time per value! "
                "ttl time will be assign the max value of this batch data.\n");
        op = SCAN_AND_MULTI_SET;

        fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n", FLAGS_worker_count);
        // threadpool worker_count should greater than source app scanner count
        if (FLAGS_worker_count <= split_count) {
            fprintf(stderr,
                    "INFO: THREAD_POOL_DEFAULT worker_count should greater than source app scanner "
                    "count %d",
                    split_count);
            return true;
        }
    }

    for (int i = 0; i < split_count; i++) {
        scan_data_context *context = new scan_data_context(op,
                                                           i,
                                                           max_batch_count,
                                                           timeout_ms,
                                                           scanners[i],
                                                           target_client,
                                                           target_geo_client.get(),
                                                           &error_occurred,
                                                           max_multi_set_concurrency);
        context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
        context->set_value_filter(value_filter_type, value_filter_pattern);
        if (no_overwrite)
            context->set_no_overwrite();
        contexts.emplace_back(context);
        if (op == SCAN_AND_MULTI_SET) {
            dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
        } else {
            dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
        }
    }

    // wait thread complete
    int sleep_seconds = 0;
    long last_total_rows = 0;
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        sleep_seconds++;
        int completed_split_count = 0;
        long cur_total_rows = 0;
        for (int i = 0; i < split_count; i++) {
            cur_total_rows += contexts[i]->split_rows.load();
            if (op != SCAN_AND_MULTI_SET && contexts[i]->split_request_count.load() == 0) {
                completed_split_count++;
            } else if (contexts[i]->split_completed.load()) {
                completed_split_count++;
            }
        }
        if (error_occurred.load()) {
            fprintf(stderr,
                    "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
                    "%ld rows, error occurred, terminating...\n",
                    sleep_seconds,
                    completed_split_count,
                    split_count,
                    cur_total_rows,
                    cur_total_rows - last_total_rows);
        } else {
            fprintf(stderr,
                    "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
                    "%ld rows\n",
                    sleep_seconds,
                    completed_split_count,
                    split_count,
                    cur_total_rows,
                    cur_total_rows - last_total_rows);
        }
        if (completed_split_count == split_count)
            break;
        last_total_rows = cur_total_rows;
    }

    if (error_occurred.load()) {
        fprintf(stderr, "ERROR: error occurred, processing terminated\n");
    }

    long total_rows = 0;
    for (int i = 0; i < split_count; i++) {
        fprintf(stderr, "INFO: split[%d]: %ld rows\n", i, contexts[i]->split_rows.load());
        total_rows += contexts[i]->split_rows.load();
    }

    fprintf(stderr,
            "\nCopy %s, total %ld rows.\n",
            error_occurred.load() ? "terminated" : "done",
            total_rows);

    return true;
}