inline void scan_data_next()

in src/shell/command_helper.h [442:649]


inline void scan_data_next(scan_data_context *context)
{
    while (!context->split_completed.load() && !context->error_occurred->load() &&
           context->split_request_count.load() < context->max_batch_count) {
        context->split_request_count++;
        context->scanner->async_next([context](int ret,
                                               std::string &&hash_key,
                                               std::string &&sort_key,
                                               std::string &&value,
                                               pegasus::pegasus_client::internal_info &&info,
                                               uint32_t expire_ts_seconds,
                                               int32_t kv_count) {
            if (ret == pegasus::PERR_OK) {
                if (kv_count != -1 || validate_filter(context, sort_key, value)) {
                    bool ts_expired = false;
                    int ttl_seconds = 0;
                    switch (context->op) {
                    case SCAN_COPY:
                        context->split_request_count++;
                        ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
                        if (ts_expired) {
                            scan_data_next(context);
                        } else if (context->no_overwrite) {
                            auto callback =
                                [context](int err,
                                          pegasus::pegasus_client::check_and_set_results &&results,
                                          pegasus::pegasus_client::internal_info &&info) {
                                    if (err != pegasus::PERR_OK) {
                                        if (!context->split_completed.exchange(true)) {
                                            fprintf(
                                                stderr,
                                                "ERROR: split[%d] async check and set failed: %s\n",
                                                context->split_id,
                                                context->client->get_error_string(err));
                                            context->error_occurred->store(true);
                                        }
                                    } else {
                                        if (results.set_succeed) {
                                            context->split_rows++;
                                        }
                                        scan_data_next(context);
                                    }
                                    // should put "split_request_count--" at end of the scope,
                                    // to prevent that split_request_count becomes 0 in the middle.
                                    context->split_request_count--;
                                };
                            pegasus::pegasus_client::check_and_set_options options;
                            options.set_value_ttl_seconds = ttl_seconds;
                            context->client->async_check_and_set(
                                hash_key,
                                sort_key,
                                pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
                                "",
                                sort_key,
                                value,
                                options,
                                std::move(callback),
                                context->timeout_ms);
                        } else {
                            auto callback =
                                [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                    if (err != pegasus::PERR_OK) {
                                        if (!context->split_completed.exchange(true)) {
                                            fprintf(stderr,
                                                    "ERROR: split[%d] async set failed: %s\n",
                                                    context->split_id,
                                                    context->client->get_error_string(err));
                                            context->error_occurred->store(true);
                                        }
                                    } else {
                                        context->split_rows++;
                                        scan_data_next(context);
                                    }
                                    // should put "split_request_count--" at end of the scope,
                                    // to prevent that split_request_count becomes 0 in the middle.
                                    context->split_request_count--;
                                };
                            context->client->async_set(hash_key,
                                                       sort_key,
                                                       value,
                                                       std::move(callback),
                                                       context->timeout_ms,
                                                       ttl_seconds);
                        }
                        break;
                    case SCAN_CLEAR:
                        context->split_request_count++;
                        context->client->async_del(
                            hash_key,
                            sort_key,
                            [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                if (err != pegasus::PERR_OK) {
                                    if (!context->split_completed.exchange(true)) {
                                        fprintf(stderr,
                                                "ERROR: split[%d] async del failed: %s\n",
                                                context->split_id,
                                                context->client->get_error_string(err));
                                        context->error_occurred->store(true);
                                    }
                                } else {
                                    context->split_rows++;
                                    scan_data_next(context);
                                }
                                // should put "split_request_count--" at end of the scope,
                                // to prevent that split_request_count becomes 0 in the middle.
                                context->split_request_count--;
                            },
                            context->timeout_ms);
                        break;
                    case SCAN_COUNT:
                        if (kv_count != -1) {
                            context->split_rows += kv_count;
                            scan_data_next(context);
                            break;
                        }
                        context->split_rows++;
                        if (context->stat_size && context->statistics) {
                            long hash_key_size = hash_key.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
                                hash_key_size);

                            long sort_key_size = sort_key.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
                                sort_key_size);

                            long value_size = value.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);

                            long row_size = hash_key_size + sort_key_size + value_size;
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);

                            if (context->top_count > 0) {
                                context->top_rows.push(
                                    std::move(hash_key), std::move(sort_key), row_size);
                            }
                        }
                        if (context->count_hash_key) {
                            if (hash_key != context->last_hash_key) {
                                context->split_hash_key_count++;
                                context->last_hash_key = std::move(hash_key);
                            }
                        }
                        scan_data_next(context);
                        break;
                    case SCAN_GEN_GEO:
                        context->split_request_count++;
                        ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
                        if (ts_expired) {
                            scan_data_next(context);
                        } else {
                            context->geoclient->async_set(
                                hash_key,
                                sort_key,
                                value,
                                [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                    if (err != pegasus::PERR_OK) {
                                        if (!context->split_completed.exchange(true)) {
                                            fprintf(stderr,
                                                    "ERROR: split[%d] async set failed: %s\n",
                                                    context->split_id,
                                                    context->client->get_error_string(err));
                                            context->error_occurred->store(true);
                                        }
                                    } else {
                                        context->split_rows++;
                                        scan_data_next(context);
                                    }
                                    // should put "split_request_count--" at end of the scope,
                                    // to prevent that split_request_count becomes 0 in the middle.
                                    context->split_request_count--;
                                },
                                context->timeout_ms,
                                ttl_seconds);
                        }
                        break;
                    default:
                        LOG_FATAL("op = {}", context->op);
                        break;
                    }
                } else {
                    scan_data_next(context);
                }
            } else if (ret == pegasus::PERR_SCAN_COMPLETE) {
                context->split_completed.store(true);
            } else {
                if (!context->split_completed.exchange(true)) {
                    fprintf(stderr,
                            "ERROR: split[%d] scan next failed: %s\n",
                            context->split_id,
                            context->client->get_error_string(ret));
                    context->error_occurred->store(true);
                }
            }
            // should put "split_request_count--" at end of the scope,
            // to prevent that split_request_count becomes 0 in the middle.
            context->split_request_count--;
        });

        if (context->count_hash_key) {
            // disable parallel scan if count_hash_key == true
            break;
        }
    }
}