bool count_data()

in src/shell/commands/data_operations.cpp [2305:2704]


bool count_data(command_executor *e, shell_context *sc, arguments args)
{
    static struct option long_options[] = {{"precise", no_argument, 0, 'c'},
                                           {"partition", required_argument, 0, 'p'},
                                           {"max_batch_count", required_argument, 0, 'b'},
                                           {"timeout_ms", required_argument, 0, 't'},
                                           {"hash_key_filter_type", required_argument, 0, 'h'},
                                           {"hash_key_filter_pattern", required_argument, 0, 'x'},
                                           {"sort_key_filter_type", required_argument, 0, 's'},
                                           {"sort_key_filter_pattern", required_argument, 0, 'y'},
                                           {"value_filter_type", required_argument, 0, 'v'},
                                           {"value_filter_pattern", required_argument, 0, 'z'},
                                           {"diff_hash_key", no_argument, 0, 'd'},
                                           {"stat_size", no_argument, 0, 'a'},
                                           {"top_count", required_argument, 0, 'n'},
                                           {"run_seconds", required_argument, 0, 'r'},
                                           {0, 0, 0, 0}};

    // "count_data" usually need scan all online records to get precise result, which may affect
    // cluster availability, so here define precise = false defaultly and it will return estimate
    // count immediately.
    bool precise = false;
    bool need_scan = false;
    int32_t partition = -1;
    int max_batch_count = 500;
    int timeout_ms = sc->timeout_ms;
    std::string hash_key_filter_type_name("no_filter");
    std::string sort_key_filter_type_name("no_filter");
    pegasus::pegasus_client::filter_type sort_key_filter_type =
        pegasus::pegasus_client::FT_NO_FILTER;
    std::string sort_key_filter_pattern;
    std::string value_filter_type_name("no_filter");
    pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
    std::string value_filter_pattern;
    bool diff_hash_key = false;
    bool stat_size = false;
    int top_count = 0;
    int run_seconds = 0;
    pegasus::pegasus_client::scan_options options;

    optind = 0;
    while (true) {
        int option_index = 0;
        int c;
        c = getopt_long(
            args.argc, args.argv, "cp:b:t:h:x:s:y:v:z:dan:r:", long_options, &option_index);
        if (c == -1)
            break;
        // input any valid parameter means you want to get precise count by scanning.
        need_scan = true;
        switch (c) {
        case 'c':
            precise = true;
            break;
        case 'p':
            if (!dsn::buf2int32(optarg, partition)) {
                fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
                return false;
            }
            if (partition < 0) {
                fprintf(stderr, "ERROR: partition should be greater than 0\n");
                return false;
            }
            break;
        case 'b':
            if (!dsn::buf2int32(optarg, max_batch_count)) {
                fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
                return false;
            }
            break;
        case 't':
            if (!dsn::buf2int32(optarg, timeout_ms)) {
                fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
                return false;
            }
            break;
        case 'h':
            options.hash_key_filter_type = parse_filter_type(optarg, false);
            if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
                return false;
            }
            hash_key_filter_type_name = optarg;
            break;
        case 'x':
            options.hash_key_filter_pattern = unescape_str(optarg);
            break;
        case 's':
            sort_key_filter_type = parse_filter_type(optarg, true);
            if (sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
                return false;
            }
            sort_key_filter_type_name = optarg;
            break;
        case 'y':
            sort_key_filter_pattern = unescape_str(optarg);
            break;
        case 'v':
            value_filter_type = parse_filter_type(optarg, true);
            if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
                fprintf(stderr, "ERROR: invalid value_filter_type param\n");
                return false;
            }
            value_filter_type_name = optarg;
            break;
        case 'z':
            value_filter_pattern = unescape_str(optarg);
            break;
        case 'd':
            diff_hash_key = true;
            break;
        case 'a':
            stat_size = true;
            break;
        case 'n':
            if (!dsn::buf2int32(optarg, top_count)) {
                fprintf(stderr, "parse %s as top_count failed\n", optarg);
                return false;
            }
            break;
        case 'r':
            if (!dsn::buf2int32(optarg, run_seconds)) {
                fprintf(stderr, "parse %s as run_seconds failed\n", optarg);
                return false;
            }
            break;
        default:
            return false;
        }
    }

    if (!precise) {
        if (need_scan) {
            fprintf(stderr,
                    "ERROR: you must input [-c|--precise] flag when you expect to get precise "
                    "result by scaning all record online\n");
            return false;
        }

        std::vector<row_data> rows;
        const std::string table_name(sc->pg_client->get_app_name());
        CHECK(!table_name.empty(), "table_name must be non-empty, see data_operations()");

        if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) {
            fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed");
            return true;
        }

        rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
        auto &sum = rows.back();
        for (size_t i = 0; i < rows.size() - 1; ++i) {
            const row_data &row = rows[i];
            sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
        }

        ::dsn::utils::table_printer tp("count_data");
        tp.add_title("pidx");
        tp.add_column("estimate_count");
        for (const row_data &row : rows) {
            tp.add_row(row.row_name);
            tp.append_data(row.rdb_estimate_num_keys);
        }

        tp.output(std::cout, tp_output_format::kTabular);
        return true;
    }

    if (max_batch_count <= 1) {
        fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
        return false;
    }

    if (timeout_ms <= 0) {
        fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
        return false;
    }

    if (top_count < 0) {
        fprintf(stderr, "ERROR: top_count should be no less than 0\n");
        return false;
    }

    if (run_seconds < 0) {
        fprintf(stderr, "ERROR: run_seconds should be no less than 0\n");
        return false;
    }

    fprintf(stderr, "INFO: cluster_name = %s\n", sc->pg_client->get_cluster_name());
    fprintf(stderr, "INFO: app_name = %s\n", sc->pg_client->get_app_name());
    fprintf(stderr,
            "INFO: partition = %s\n",
            partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
    fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
    fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
    fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
    if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: hash_key_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
    if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: sort_key_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(sort_key_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
    if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        fprintf(stderr,
                "INFO: value_filter_pattern = \"%s\"\n",
                pegasus::utils::c_escape_string(value_filter_pattern).c_str());
    }
    fprintf(stderr, "INFO: diff_hash_key = %s\n", diff_hash_key ? "true" : "false");
    fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false");
    fprintf(stderr, "INFO: top_count = %d\n", top_count);
    fprintf(stderr, "INFO: run_seconds = %d\n", run_seconds);

    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
    options.timeout_ms = timeout_ms;
    if (sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
        if (sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
            options.sort_key_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
        else
            options.sort_key_filter_type = sort_key_filter_type;
        options.sort_key_filter_pattern = sort_key_filter_pattern;
    }
    if (stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER)
        options.no_value = false;
    else
        options.no_value = true;

    // Decide whether real data should be returned to client. Once the real data is
    // decided not to be returned to client side: option `only_return_count` will be
    // used.
    if (diff_hash_key || stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER ||
        sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT) {
        options.only_return_count = false;
    } else {
        options.only_return_count = true;
        fprintf(stderr, "INFO: scanner only return kv count, not return value\n");
    }

    int ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
    if (ret != pegasus::PERR_OK) {
        fprintf(
            stderr, "ERROR: open app scanner failed: %s\n", sc->pg_client->get_error_string(ret));
        return true;
    }
    fprintf(
        stderr, "INFO: open app scanner succeed, partition_count = %d\n", (int)raw_scanners.size());

    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
    for (auto p : raw_scanners)
        scanners.push_back(p->get_smart_wrapper());
    raw_scanners.clear();

    if (partition != -1) {
        if (partition >= scanners.size()) {
            fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
            return true;
        }
        pegasus::pegasus_client::pegasus_scanner_wrapper s = std::move(scanners[partition]);
        scanners.clear();
        scanners.push_back(std::move(s));
    }
    int split_count = scanners.size();
    fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);

    std::atomic_bool error_occurred(false);
    std::vector<std::unique_ptr<scan_data_context>> contexts;
    std::shared_ptr<rocksdb::Statistics> statistics = rocksdb::CreateDBStatistics();
    for (int i = 0; i < split_count; i++) {
        scan_data_context *context = new scan_data_context(SCAN_COUNT,
                                                           i,
                                                           max_batch_count,
                                                           timeout_ms,
                                                           scanners[i],
                                                           sc->pg_client,
                                                           nullptr,
                                                           &error_occurred,
                                                           0,
                                                           stat_size,
                                                           statistics,
                                                           top_count,
                                                           diff_hash_key);
        context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
        context->set_value_filter(value_filter_type, value_filter_pattern);
        contexts.emplace_back(context);
        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
    }

    int sleep_seconds = 0;
    long last_total_rows = 0;
    bool stopped_by_wait_seconds = false;
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        sleep_seconds++;
        if (run_seconds > 0 && !stopped_by_wait_seconds && sleep_seconds >= run_seconds) {
            // here use compare-and-swap primitive:
            // - if error_occurred is already set true by scanners as error occured, then
            //   stopped_by_wait_seconds will be assigned as false.
            // - else, error_occurred will be set true, and stopped_by_wait_seconds will be
            //   assigned as true.
            bool expected = false;
            stopped_by_wait_seconds = error_occurred.compare_exchange_strong(expected, true);
        }
        int completed_split_count = 0;
        long cur_total_rows = 0;
        long cur_total_hash_key_count = 0;
        for (int i = 0; i < split_count; i++) {
            cur_total_rows += contexts[i]->split_rows.load();
            if (diff_hash_key)
                cur_total_hash_key_count += contexts[i]->split_hash_key_count.load();
            if (contexts[i]->split_request_count.load() == 0)
                completed_split_count++;
        }
        char hash_key_count_str[100];
        hash_key_count_str[0] = '\0';
        if (diff_hash_key) {
            sprintf(hash_key_count_str, " (%ld hash keys)", cur_total_hash_key_count);
        }
        if (!stopped_by_wait_seconds && error_occurred.load()) {
            fprintf(stderr,
                    "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
                    "%ld rows, error occurred, terminating...\n",
                    sleep_seconds,
                    completed_split_count,
                    split_count,
                    cur_total_rows,
                    hash_key_count_str,
                    cur_total_rows - last_total_rows);
        } else {
            fprintf(stderr,
                    "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
                    "%ld rows\n",
                    sleep_seconds,
                    completed_split_count,
                    split_count,
                    cur_total_rows,
                    hash_key_count_str,
                    cur_total_rows - last_total_rows);
        }
        if (completed_split_count == split_count)
            break;
        last_total_rows = cur_total_rows;
        if (stat_size && sleep_seconds % 10 == 0) {
            print_current_scan_state(contexts, "partially", stat_size, statistics, diff_hash_key);
        }
    }

    if (error_occurred.load()) {
        if (stopped_by_wait_seconds) {
            fprintf(stderr, "INFO: reached run seconds, terminate processing\n");
        } else {
            fprintf(stderr, "ERROR: error occurred, terminate processing\n");
        }
    }

    std::string stop_desc;
    if (error_occurred.load()) {
        if (stopped_by_wait_seconds) {
            stop_desc = "terminated as run time used out";
        } else {
            stop_desc = "terminated as error occurred";
        }
    } else {
        stop_desc = "done";
    }

    print_current_scan_state(contexts, stop_desc, stat_size, statistics, diff_hash_key);

    if (stat_size) {
        if (top_count > 0) {
            top_container::top_heap heap;
            for (int i = 0; i < split_count; i++) {
                top_container::top_heap &h = contexts[i]->top_rows.all();
                while (!h.empty()) {
                    heap.push(h.top());
                    h.pop();
                }
            }
            for (int i = 1; i <= top_count && !heap.empty(); i++) {
                const top_container::top_heap_item &item = heap.top();
                fprintf(stderr,
                        "[top][%d].hash_key = \"%s\"\n",
                        i,
                        pegasus::utils::c_escape_string(item.hash_key, sc->escape_all).c_str());
                fprintf(stderr,
                        "[top][%d].sort_key = \"%s\"\n",
                        i,
                        pegasus::utils::c_escape_string(item.sort_key, sc->escape_all).c_str());
                fprintf(stderr, "[top][%d].row_size = %ld\n", i, item.row_size);
                heap.pop();
            }
        }
    }

    return true;
}