in src/shell/command_helper.h [442:649]
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
context->split_request_count.load() < context->max_batch_count) {
context->split_request_count++;
context->scanner->async_next([context](int ret,
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
int32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (kv_count != -1 || validate_filter(context, sort_key, value)) {
bool ts_expired = false;
int ttl_seconds = 0;
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
if (ts_expired) {
scan_data_next(context);
} else if (context->no_overwrite) {
auto callback =
[context](int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(
stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
options.set_value_ttl_seconds = ttl_seconds;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms,
ttl_seconds);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
if (kv_count != -1) {
context->split_rows += kv_count;
scan_data_next(context);
break;
}
context->split_rows++;
if (context->stat_size && context->statistics) {
long hash_key_size = hash_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
hash_key_size);
long sort_key_size = sort_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
sort_key_size);
long value_size = value.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);
if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
if (ts_expired) {
scan_data_next(context);
} else {
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms,
ttl_seconds);
}
break;
default:
LOG_FATAL("op = {}", context->op);
break;
}
} else {
scan_data_next(context);
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
} else {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] scan next failed: %s\n",
context->split_id,
context->client->get_error_string(ret));
context->error_occurred->store(true);
}
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
});
if (context->count_hash_key) {
// disable parallel scan if count_hash_key == true
break;
}
}
}