in src/server/pegasus_server_impl.cpp [1353:1505]
void pegasus_server_impl::on_scan(scan_rpc rpc)
{
CHECK(_is_open, "");
_pfc_scan_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
return;
}
std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
if (context) {
rocksdb::Iterator *it = context->iterator.get();
const rocksdb::Slice &stop = context->stop;
bool stop_inclusive = context->stop_inclusive;
::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
bool validate_hash = context->validate_partition_hash;
bool return_expire_ts = context->return_expire_ts;
bool complete = false;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
uint64_t filter_count = 0;
int32_t count = 0;
uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
if (context->batch_size > 0 && context->batch_size < batch_count) {
batch_count = context->batch_size;
}
std::unique_ptr<range_read_limiter> limiter = std::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
limiter->add_count();
auto state = validate_key_value_for_scan(it->key(),
it->value(),
hash_key_filter_type,
hash_key_filter_pattern,
sort_key_filter_type,
sort_key_filter_pattern,
epoch_now,
validate_hash);
switch (state) {
case range_iteration_state::kNormal:
count++;
if (!context->only_return_count) {
append_key_value(resp.kvs, it->key(), it->value(), no_value, return_expire_ts);
}
break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// seek to the last position
complete = true;
break;
}
it->Next();
}
if (context->only_return_count) {
resp.__set_kv_count(count);
}
// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
}
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb scan failed for scan from {}: context_id= {}, stop_key = "
"\"{}\" ({}), batch_size = {}, read_count = {}, error = {}",
rpc.remote_address(),
request.context_id,
::pegasus::utils::c_escape_string(stop),
stop_inclusive ? "inclusive" : "exclusive",
batch_count,
count,
it->status().ToString());
} else {
LOG_ERROR_PREFIX("rocksdb scan failed for scan from {}: error = {}",
rpc.remote_address(),
it->status().ToString());
}
resp.kvs.clear();
} else if (limiter->exceed_limit()) {
// scan exceed limit time
resp.error = rocksdb::Status::kIncomplete;
LOG_WARNING_PREFIX("rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS "
"time_threshold({}ns)",
rpc.remote_address(),
batch_count,
limiter->duration_time(),
limiter->max_duration_time());
} else if (it->Valid() && !complete) {
// scan not completed
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
::dsn::tasking::enqueue(LPC_PEGASUS_SERVER_DELAY,
&_tracker,
[this, handle]() { _context_cache.fetch(handle); },
0,
std::chrono::minutes(5));
} else {
// scan completed
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
if (filter_count > 0) {
_pfc_recent_filter_count->add(filter_count);
}
} else {
resp.error = rocksdb::Status::Code::kNotFound;
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}