in src/server/pegasus_server_impl.cpp [410:831]
void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
{
CHECK(_is_open, "");
_pfc_multi_get_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
return;
}
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for multi_get from {}: sort key filter type {} not supported",
rpc.remote_address(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count;
uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count;
if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) {
max_kv_count = request.max_kv_count;
}
int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0
? _rng_rd_opts.multi_get_max_iteration_size
: INT_MAX;
int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config);
uint32_t epoch_now = ::pegasus::utils::epoch_now();
int32_t count = 0;
int64_t size = 0;
int32_t iteration_count = 0;
int32_t expire_count = 0;
int32_t filter_count = 0;
if (request.sort_keys.empty()) {
::dsn::blob range_start_key, range_stop_key;
pegasus_generate_key(range_start_key, request.hash_key, request.start_sortkey);
bool start_inclusive = request.start_inclusive;
bool stop_inclusive;
if (request.stop_sortkey.length() == 0) {
pegasus_generate_next_blob(range_stop_key, request.hash_key);
stop_inclusive = false;
} else {
pegasus_generate_key(range_stop_key, request.hash_key, request.stop_sortkey);
stop_inclusive = request.stop_inclusive;
}
rocksdb::Slice start(range_start_key.data(), range_start_key.length());
rocksdb::Slice stop(range_stop_key.data(), range_stop_key.length());
// limit key range by prefix filter
::dsn::blob prefix_start_key, prefix_stop_key;
if (request.sort_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
request.sort_key_filter_pattern.length() > 0) {
pegasus_generate_key(
prefix_start_key, request.hash_key, request.sort_key_filter_pattern);
pegasus_generate_next_blob(
prefix_stop_key, request.hash_key, request.sort_key_filter_pattern);
rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
}
rocksdb::Slice prefix_stop(prefix_stop_key.data(), prefix_stop_key.length());
if (prefix_stop.compare(stop) <= 0) {
stop = prefix_stop;
stop_inclusive = false;
}
}
// check if range is empty
int c = start.compare(stop);
if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
// empty sort key range
if (FLAGS_rocksdb_verbose_log) {
LOG_WARNING_PREFIX(
"empty sort key range for multi_get from {}: hash_key = \"{}\", start_sort_key "
"= \"{}\" ({}), stop_sort_key = \"{}\" ({}), sort_key_filter_type = {}, "
"sort_key_filter_pattern = \"{}\", final_start = \"{}\" ({}), final_stop = "
"\"{}\" ({})",
rpc.remote_address(),
::pegasus::utils::c_escape_string(request.hash_key),
::pegasus::utils::c_escape_string(request.start_sortkey),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(request.stop_sortkey),
request.stop_inclusive ? "inclusive" : "exclusive",
::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
->second,
::pegasus::utils::c_escape_string(request.sort_key_filter_pattern),
::pegasus::utils::c_escape_string(start),
start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(stop),
stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
std::unique_ptr<rocksdb::Iterator> it;
bool complete = false;
std::unique_ptr<range_read_limiter> limiter =
std::make_unique<range_read_limiter>(max_iteration_count,
max_iteration_size,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
if (!request.reverse) {
it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
it->Seek(start);
bool first_exclusive = !start_inclusive;
while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check stop sort key
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
// check start sort key
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(start) == 0) {
// discard start_sortkey
it->Next();
continue;
}
}
limiter->add_count();
// extract value
auto state = append_key_value_for_multi_get(resp.kvs,
it->key(),
it->value(),
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value);
switch (state) {
case range_iteration_state::kNormal: {
count++;
auto &kv = resp.kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
} break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// if arrived to the last position
complete = true;
break;
}
it->Next();
}
} else { // reverse
rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
if (_data_cf_opts.prefix_extractor) {
// NOTE: Prefix bloom filter is not supported in reverse seek mode (see
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for
// more details), and we have to do total order seek on rocksdb which might be worse
// performance. However we consider that reverse scan is a rare use case, and if
// your workload has many reverse scans, you'd better use 'common' bloom filter (by
// set [pegasus.server]rocksdb_filter_type to 'common').
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
it.reset(_db->NewIterator(rd_opts, _data_cf));
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check start sort key
int c = it->key().compare(start);
if (c < 0 || (c == 0 && !start_inclusive)) {
// out of range
complete = true;
break;
}
// check stop sort key
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(stop) == 0) {
// discard stop_sortkey
it->Prev();
continue;
}
}
limiter->add_count();
// extract value
auto state = append_key_value_for_multi_get(reverse_kvs,
it->key(),
it->value(),
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value);
switch (state) {
case range_iteration_state::kNormal: {
count++;
auto &kv = reverse_kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
} break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// if arrived to the last position
complete = true;
break;
}
it->Prev();
}
if (it->status().ok() && !reverse_kvs.empty()) {
// revert order to make resp.kvs ordered in sort_key
resp.kvs.reserve(reverse_kvs.size());
for (int i = reverse_kvs.size() - 1; i >= 0; i--) {
resp.kvs.emplace_back(std::move(reverse_kvs[i]));
}
}
}
iteration_count = limiter->get_iteration_count();
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb scan failed for multi_get from {}: hash_key = \"{}\", "
"reverse = {}, error = {}",
rpc.remote_address(),
::pegasus::utils::c_escape_string(request.hash_key),
request.reverse ? "true" : "false",
it->status().ToString());
} else {
LOG_ERROR_PREFIX(
"rocksdb scan failed for multi_get from {}: reverse = {}, error = {}",
rpc.remote_address(),
request.reverse ? "true" : "false",
it->status().ToString());
}
resp.kvs.clear();
} else if (it->Valid() && !complete) {
// scan not completed
resp.error = rocksdb::Status::kIncomplete;
if (limiter->exceed_limit()) {
LOG_WARNING_PREFIX(
"rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)",
rpc.remote_address(),
limiter->duration_time(),
limiter->max_duration_time());
}
}
} else { // condition: !request.sort_keys.empty()
bool error_occurred = false;
rocksdb::Status final_status;
bool exceed_limit = false;
std::vector<::dsn::blob> keys_holder;
std::vector<rocksdb::Slice> keys;
std::vector<std::string> values;
keys_holder.reserve(request.sort_keys.size());
keys.reserve(request.sort_keys.size());
for (auto &sort_key : request.sort_keys) {
::dsn::blob raw_key;
pegasus_generate_key(raw_key, request.hash_key, sort_key);
keys.emplace_back(raw_key.data(), raw_key.length());
keys_holder.emplace_back(std::move(raw_key));
}
std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
for (int i = 0; i < keys.size(); i++) {
rocksdb::Status &status = statuses[i];
std::string &value = values[i];
// print log
if (!status.ok()) {
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", "
"sort_key = \"{}\", error = {}",
rpc.remote_address(),
::pegasus::utils::c_escape_string(request.hash_key),
::pegasus::utils::c_escape_string(request.sort_keys[i]),
status.ToString());
} else if (!status.IsNotFound()) {
LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: error = {}",
rpc.remote_address(),
status.ToString());
}
}
// check ttl
if (status.ok()) {
uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
if (expire_ts > 0 && expire_ts <= epoch_now) {
expire_count++;
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}",
rpc.remote_address());
}
status = rocksdb::Status::NotFound();
}
}
// extract value
if (status.ok()) {
// check if exceed limit
if (count >= max_kv_count || size >= max_kv_size) {
exceed_limit = true;
break;
}
::dsn::apps::key_value kv;
kv.key = request.sort_keys[i];
if (!request.no_value) {
pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
}
count++;
size += kv.key.length() + kv.value.length();
resp.kvs.emplace_back(std::move(kv));
}
// if error occurred
if (!status.ok() && !status.IsNotFound()) {
error_occurred = true;
final_status = status;
break;
}
}
if (error_occurred) {
resp.error = final_status.code();
resp.kvs.clear();
} else if (exceed_limit) {
resp.error = rocksdb::Status::kIncomplete;
} else {
resp.error = rocksdb::Status::kOk;
}
}
#ifdef PEGASUS_UNIT_TEST
// sleep 10ms for unit test
usleep(10 * 1000);
#endif
uint64_t time_used = dsn_now_ns() - start_time;
if (is_multi_get_abnormal(time_used, size, iteration_count)) {
LOG_WARNING_PREFIX(
"rocksdb abnormal multi_get from {}: hash_key = {}, "
"start_sort_key = {} ({}), stop_sort_key = {} ({}), "
"sort_key_filter_type = {}, sort_key_filter_pattern = {}, "
"max_kv_count = {}, max_kv_size = {}, reverse = {}, "
"result_count = {}, result_size = {}, iteration_count = {}, "
"expire_count = {}, filter_count = {}, time_used = {} ns",
rpc.remote_address(),
::pegasus::utils::c_escape_string(request.hash_key),
::pegasus::utils::c_escape_string(request.start_sortkey),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(request.stop_sortkey),
request.stop_inclusive ? "inclusive" : "exclusive",
::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)->second,
::pegasus::utils::c_escape_string(request.sort_key_filter_pattern),
request.max_kv_count,
request.max_kv_size,
request.reverse ? "true" : "false",
count,
size,
iteration_count,
expire_count,
filter_count,
time_used);
_pfc_recent_abnormal_count->increment();
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
if (filter_count > 0) {
_pfc_recent_filter_count->add(filter_count);
}
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}