in src/server/pegasus_server_impl.cpp [1148:1394]
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
{
CHECK_TRUE(_is_open);
METRIC_VAR_INCREMENT(scan_requests);
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_host_port;
CHECK_READ_THROTTLING();
METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
if (!is_filter_type_supported(request.hash_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: hash key filter type {} not supported",
rpc.remote_address(),
request.hash_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
return;
}
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: sort key filter type {} not supported",
rpc.remote_address(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
return;
}
rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
if (_data_cf_opts.prefix_extractor) {
::dsn::blob start_hash_key, tmp;
pegasus_restore_key(request.start_key, start_hash_key, tmp);
if (start_hash_key.size() == 0 || request.full_scan) {
// hash_key is not passed, only happened when do full scan (scanners got by
// get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
}
bool start_inclusive = request.start_inclusive;
bool stop_inclusive = request.stop_inclusive;
rocksdb::Slice start(request.start_key.data(), request.start_key.length());
rocksdb::Slice stop(request.stop_key.data(), request.stop_key.length());
// limit key range by prefix filter
// because data is not ordered by hash key (hash key "aa" is greater than "b"),
// so we can only limit the start range by hash key filter.
::dsn::blob prefix_start_key;
if (request.hash_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
request.hash_key_filter_pattern.length() > 0) {
pegasus_generate_key(prefix_start_key, request.hash_key_filter_pattern, ::dsn::blob());
rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
// Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real
// hashkey, we should not seek this prefix by prefix bloom filter. However, it only
// happen when do full scan (scanners got by get_unordered_scanners), in which case the
// following flags has been updated.
CHECK(!_data_cf_opts.prefix_extractor || rd_opts.total_order_seek, "Invalid option");
CHECK(!_data_cf_opts.prefix_extractor || !rd_opts.prefix_same_as_start,
"Invalid option");
}
}
// check if range is empty
int c = start.compare(stop);
if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
// empty key range
if (FLAGS_rocksdb_verbose_log) {
LOG_WARNING_PREFIX("empty key range for get_scanner from {}: start_key = \"{}\" ({}), "
"stop_key = \"{}\" ({})",
rpc.remote_address(),
::pegasus::utils::c_escape_sensitive_string(request.start_key),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_sensitive_string(request.stop_key),
request.stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
return;
}
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rd_opts, _data_cf));
it->Seek(start);
bool complete = false;
bool first_exclusive = !start_inclusive;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
uint64_t filter_count = 0;
int32_t count = 0;
uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
if (request.batch_size > 0 && request.batch_size < batch_count) {
batch_count = request.batch_size;
}
resp.kvs.reserve(batch_count);
bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
bool only_return_count = request.__isset.only_return_count ? request.only_return_count : false;
std::unique_ptr<range_read_limiter> limiter =
std::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
0,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(start) == 0) {
// discard start_sortkey
it->Next();
continue;
}
}
limiter->add_count();
auto state = validate_key_value_for_scan(
it->key(),
it->value(),
request.hash_key_filter_type,
request.hash_key_filter_pattern,
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true);
switch (state) {
case range_iteration_state::kNormal:
count++;
if (!only_return_count) {
append_key_value(
resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts);
}
break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// seek to the last position
complete = true;
break;
}
it->Next();
}
if (only_return_count) {
resp.__set_kv_count(count);
}
// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
}
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb scan failed for get_scanner from {}: start_key = \"{}\" "
"({}), stop_key = \"{}\" ({}), batch_size = {}, read_count = {}, "
"error = {}",
rpc.remote_address(),
::pegasus::utils::c_escape_sensitive_string(start),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_sensitive_string(stop),
request.stop_inclusive ? "inclusive" : "exclusive",
batch_count,
count,
it->status().ToString());
} else {
LOG_ERROR_PREFIX("rocksdb scan failed for get_scanner from {}: error = {}",
rpc.remote_address(),
it->status().ToString());
}
resp.kvs.clear();
} else if (limiter->exceed_limit()) {
// scan exceed limit time
resp.error = rocksdb::Status::kIncomplete;
LOG_WARNING_PREFIX("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS "
"time_threshold_ns({})",
rpc.remote_address(),
batch_count,
limiter->duration_time(),
limiter->max_duration_time());
} else if (it->Valid() && !complete) {
// scan not completed
std::unique_ptr<pegasus_scan_context> context(new pegasus_scan_context(
std::move(it),
std::string(stop.data(), stop.size()),
request.stop_inclusive,
request.hash_key_filter_type,
std::string(request.hash_key_filter_pattern.data(),
request.hash_key_filter_pattern.length()),
request.sort_key_filter_type,
std::string(request.sort_key_filter_pattern.data(),
request.sort_key_filter_pattern.length()),
batch_count,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts,
only_return_count));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
// which will change the handle,
// then the delayed task will fetch null context by old handle, and do nothing.
::dsn::tasking::enqueue(
LPC_PEGASUS_SERVER_DELAY,
&_tracker,
[this, handle]() { _context_cache.fetch(handle); },
0,
std::chrono::minutes(5));
} else {
// scan completed
resp.context_id = pegasus_scan_context::SCAN_CONTEXT_ID_COMPLETED;
}
METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}