bool multi_del_range()

in src/shell/commands/data_operations.cpp [494:689]


bool multi_del_range(command_executor *e, shell_context *sc, arguments args)
{
    if (args.argc < 4)
        return false;

    std::string hash_key = sds_to_string(args.argv[1]);
    std::string start_sort_key = sds_to_string(args.argv[2]);
    std::string stop_sort_key = sds_to_string(args.argv[3]);
    pegasus::pegasus_client::scan_options options;
    options.no_value = true;
    options.timeout_ms = sc->timeout_ms;
    std::string sort_key_filter_type_name("no_filter");
    bool silent = false;
    FILE *file = stderr;
    int batch_del_count = 100;

    static struct option long_options[] = {{"start_inclusive", required_argument, 0, 'a'},
                                           {"stop_inclusive", required_argument, 0, 'b'},
                                           {"sort_key_filter_type", required_argument, 0, 's'},
                                           {"sort_key_filter_pattern", required_argument, 0, 'y'},
                                           {"output", required_argument, 0, 'o'},
                                           {"silent", no_argument, 0, 'i'},
                                           {0, 0, 0, 0}};

    escape_sds_argv(args.argc, args.argv);
    optind = 0;
    while (true) {
        int option_index = 0;
        int c;
        c = getopt_long(args.argc, args.argv, "a:b:s:y:o:i", long_options, &option_index);
        if (c == -1)
            break;
        switch (c) {
        case 'a':
            if (!dsn::buf2bool(optarg, options.start_inclusive)) {
                fprintf(stderr, "invalid start_inclusive param\n");
                return false;
            }
            break;
        case 'b':
            if (!dsn::buf2bool(optarg, options.stop_inclusive)) {
                fprintf(stderr, "invalid stop_inclusive param\n");
                return false;
            }
            break;
        case 's':
            options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
                ::dsn::apps::_filter_type_VALUES_TO_NAMES,
                std::string("ft_match_") + optarg,
                ::dsn::apps::filter_type::FT_NO_FILTER);
            if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "invalid sort_key_filter_type param\n");
                return false;
            }
            sort_key_filter_type_name = optarg;
            break;
        case 'y':
            options.sort_key_filter_pattern = unescape_str(optarg);
            break;
        case 'o':
            file = fopen(optarg, "w");
            if (!file) {
                fprintf(stderr, "open filename %s failed\n", optarg);
                return false;
            }
            break;
        case 'i':
            silent = true;
            break;
        default:
            return false;
        }
    }

    fprintf(stderr, "hash_key: \"%s\"\n", pegasus::utils::c_escape_string(hash_key).c_str());
    fprintf(stderr,
            "start_sort_key: \"%s\"\n",
            pegasus::utils::c_escape_string(start_sort_key).c_str());
    fprintf(stderr, "start_inclusive: %s\n", options.start_inclusive ? "true" : "false");
    fprintf(
        stderr, "stop_sort_key: \"%s\"\n", pegasus::utils::c_escape_string(stop_sort_key).c_str());
    fprintf(stderr, "stop_inclusive: %s\n", options.stop_inclusive ? "true" : "false");
    fprintf(stderr, "sort_key_filter_type: %s\n", sort_key_filter_type_name.c_str());
    if (options.sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "sort_key_filter_pattern: \"%s\"\n",
                pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str());
    }
    fprintf(stderr, "silent: %s\n", silent ? "true" : "false");
    fprintf(stderr, "\n");

    int count = 0;
    bool error_occured = false;
    pegasus::pegasus_client::pegasus_scanner *scanner = nullptr;
    int ret = sc->pg_client->get_scanner(hash_key, start_sort_key, stop_sort_key, options, scanner);
    if (ret != pegasus::PERR_OK) {
        fprintf(file, "ERROR: get scanner failed: %s\n", sc->pg_client->get_error_string(ret));
        if (file != stderr) {
            fprintf(
                stderr, "ERROR: get scanner failed: %s\n", sc->pg_client->get_error_string(ret));
        }
        error_occured = true;
    } else {
        std::string tmp_hash_key;
        std::string sort_key;
        std::string value;
        pegasus::pegasus_client::internal_info info;
        std::set<std::string> sort_keys;
        while (true) {
            int scan_ret = scanner->next(tmp_hash_key, sort_key, value, &info);
            if (scan_ret != pegasus::PERR_SCAN_COMPLETE && scan_ret != pegasus::PERR_OK) {
                fprintf(file,
                        "ERROR: scan data failed: %s {app_id=%d, partition_index=%d, server=%s}\n",
                        sc->pg_client->get_error_string(scan_ret),
                        info.app_id,
                        info.partition_index,
                        info.server.c_str());
                if (file != stderr) {
                    fprintf(
                        stderr,
                        "ERROR: scan data failed: %s {app_id=%d, partition_index=%d, server=%s}\n",
                        sc->pg_client->get_error_string(scan_ret),
                        info.app_id,
                        info.partition_index,
                        info.server.c_str());
                }
                error_occured = true;
                break;
            }

            if (scan_ret == pegasus::PERR_OK) {
                sort_keys.emplace(std::move(sort_key));
            }

            if (sort_keys.size() > 0 &&
                (sort_keys.size() >= batch_del_count || scan_ret == pegasus::PERR_SCAN_COMPLETE)) {
                int64_t del_count;
                pegasus::pegasus_client::internal_info del_info;
                int del_ret = sc->pg_client->multi_del(
                    hash_key, sort_keys, del_count, sc->timeout_ms, &del_info);
                if (del_ret != pegasus::PERR_OK) {
                    fprintf(file,
                            "ERROR: delete data failed: %s {app_id=%d, partition_index=%d, "
                            "server=%s}\n",
                            sc->pg_client->get_error_string(del_ret),
                            del_info.app_id,
                            del_info.partition_index,
                            del_info.server.c_str());
                    if (file != stderr) {
                        fprintf(stderr,
                                "ERROR: delete data failed: %s {app_id=%d, partition_index=%d, "
                                "server=%s}\n",
                                sc->pg_client->get_error_string(del_ret),
                                del_info.app_id,
                                del_info.partition_index,
                                del_info.server.c_str());
                    }
                    error_occured = true;
                    break;
                } else {
                    count += del_count;
                    if (!silent) {
                        for (auto &k : sort_keys) {
                            fprintf(file,
                                    "Deleted: \"%s\"\n",
                                    pegasus::utils::c_escape_string(k, sc->escape_all).c_str());
                        }
                    }
                    sort_keys.clear();
                }
            }

            if (scan_ret == pegasus::PERR_SCAN_COMPLETE) {
                break;
            }
        }
    }

    if (scanner) {
        delete scanner;
    }

    if (file != stderr) {
        fclose(file);
    }

    if (error_occured) {
        fprintf(stderr, "\nTerminated for error, %d sort keys deleted.\n", count);
    } else {
        if (file == stderr && !silent && count > 0) {
            fprintf(stderr, "\n");
        }
        fprintf(stderr, "OK, %d sort keys deleted.\n", count);
    }
    return true;
}