void pegasus_server_impl::on_multi_get()

in src/server/pegasus_server_impl.cpp [410:831]


void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
{
    CHECK(_is_open, "");
    _pfc_multi_get_qps->increment();
    uint64_t start_time = dsn_now_ns();

    const auto &request = rpc.request();
    dsn::message_ex *req = rpc.dsn_request();
    auto &resp = rpc.response();
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
    resp.server = _primary_address;

    if (!_read_size_throttling_controller->available()) {
        rpc.error() = dsn::ERR_BUSY;
        _counter_recent_read_throttling_reject_count->increment();
        return;
    }

    if (!is_filter_type_supported(request.sort_key_filter_type)) {
        LOG_ERROR_PREFIX(
            "invalid argument for multi_get from {}: sort key filter type {} not supported",
            rpc.remote_address(),
            request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
        return;
    }

    uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count;
    uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count;
    if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) {
        max_kv_count = request.max_kv_count;
    }

    int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
    int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0
                                            ? _rng_rd_opts.multi_get_max_iteration_size
                                            : INT_MAX;
    int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config);

    uint32_t epoch_now = ::pegasus::utils::epoch_now();
    int32_t count = 0;
    int64_t size = 0;
    int32_t iteration_count = 0;
    int32_t expire_count = 0;
    int32_t filter_count = 0;

    if (request.sort_keys.empty()) {
        ::dsn::blob range_start_key, range_stop_key;
        pegasus_generate_key(range_start_key, request.hash_key, request.start_sortkey);
        bool start_inclusive = request.start_inclusive;
        bool stop_inclusive;
        if (request.stop_sortkey.length() == 0) {
            pegasus_generate_next_blob(range_stop_key, request.hash_key);
            stop_inclusive = false;
        } else {
            pegasus_generate_key(range_stop_key, request.hash_key, request.stop_sortkey);
            stop_inclusive = request.stop_inclusive;
        }

        rocksdb::Slice start(range_start_key.data(), range_start_key.length());
        rocksdb::Slice stop(range_stop_key.data(), range_stop_key.length());

        // limit key range by prefix filter
        ::dsn::blob prefix_start_key, prefix_stop_key;
        if (request.sort_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
            request.sort_key_filter_pattern.length() > 0) {
            pegasus_generate_key(
                prefix_start_key, request.hash_key, request.sort_key_filter_pattern);
            pegasus_generate_next_blob(
                prefix_stop_key, request.hash_key, request.sort_key_filter_pattern);

            rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
            if (prefix_start.compare(start) > 0) {
                start = prefix_start;
                start_inclusive = true;
            }

            rocksdb::Slice prefix_stop(prefix_stop_key.data(), prefix_stop_key.length());
            if (prefix_stop.compare(stop) <= 0) {
                stop = prefix_stop;
                stop_inclusive = false;
            }
        }

        // check if range is empty
        int c = start.compare(stop);
        if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
            // empty sort key range
            if (FLAGS_rocksdb_verbose_log) {
                LOG_WARNING_PREFIX(
                    "empty sort key range for multi_get from {}: hash_key = \"{}\", start_sort_key "
                    "= \"{}\" ({}), stop_sort_key = \"{}\" ({}), sort_key_filter_type = {}, "
                    "sort_key_filter_pattern = \"{}\", final_start = \"{}\" ({}), final_stop = "
                    "\"{}\" ({})",
                    rpc.remote_address(),
                    ::pegasus::utils::c_escape_string(request.hash_key),
                    ::pegasus::utils::c_escape_string(request.start_sortkey),
                    request.start_inclusive ? "inclusive" : "exclusive",
                    ::pegasus::utils::c_escape_string(request.stop_sortkey),
                    request.stop_inclusive ? "inclusive" : "exclusive",
                    ::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
                        ->second,
                    ::pegasus::utils::c_escape_string(request.sort_key_filter_pattern),
                    ::pegasus::utils::c_escape_string(start),
                    start_inclusive ? "inclusive" : "exclusive",
                    ::pegasus::utils::c_escape_string(stop),
                    stop_inclusive ? "inclusive" : "exclusive");
            }
            resp.error = rocksdb::Status::kOk;
            _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
            _pfc_multi_get_latency->set(dsn_now_ns() - start_time);

            return;
        }

        std::unique_ptr<rocksdb::Iterator> it;
        bool complete = false;

        std::unique_ptr<range_read_limiter> limiter =
            std::make_unique<range_read_limiter>(max_iteration_count,
                                                 max_iteration_size,
                                                 _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

        if (!request.reverse) {
            it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
            it->Seek(start);
            bool first_exclusive = !start_inclusive;
            while (count < max_kv_count && limiter->valid() && it->Valid()) {
                // check stop sort key
                int c = it->key().compare(stop);
                if (c > 0 || (c == 0 && !stop_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }

                // check start sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(start) == 0) {
                        // discard start_sortkey
                        it->Next();
                        continue;
                    }
                }

                limiter->add_count();

                // extract value
                auto state = append_key_value_for_multi_get(resp.kvs,
                                                            it->key(),
                                                            it->value(),
                                                            request.sort_key_filter_type,
                                                            request.sort_key_filter_pattern,
                                                            epoch_now,
                                                            request.no_value);

                switch (state) {
                case range_iteration_state::kNormal: {
                    count++;
                    auto &kv = resp.kvs.back();
                    uint64_t kv_size = kv.key.length() + kv.value.length();
                    size += kv_size;
                    limiter->add_size(kv_size);
                } break;
                case range_iteration_state::kExpired:
                    expire_count++;
                    break;
                case range_iteration_state::kFiltered:
                    filter_count++;
                    break;
                default:
                    break;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Next();
            }
        } else { // reverse
            rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
            if (_data_cf_opts.prefix_extractor) {
                // NOTE: Prefix bloom filter is not supported in reverse seek mode (see
                // https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for
                // more details), and we have to do total order seek on rocksdb which might be worse
                // performance. However we consider that reverse scan is a rare use case, and if
                // your workload has many reverse scans, you'd better use 'common' bloom filter (by
                // set [pegasus.server]rocksdb_filter_type to 'common').
                rd_opts.total_order_seek = true;
                rd_opts.prefix_same_as_start = false;
            }
            it.reset(_db->NewIterator(rd_opts, _data_cf));
            it->SeekForPrev(stop);
            bool first_exclusive = !stop_inclusive;
            std::vector<::dsn::apps::key_value> reverse_kvs;
            while (count < max_kv_count && limiter->valid() && it->Valid()) {
                // check start sort key
                int c = it->key().compare(start);
                if (c < 0 || (c == 0 && !start_inclusive)) {
                    // out of range
                    complete = true;
                    break;
                }

                // check stop sort key
                if (first_exclusive) {
                    first_exclusive = false;
                    if (it->key().compare(stop) == 0) {
                        // discard stop_sortkey
                        it->Prev();
                        continue;
                    }
                }

                limiter->add_count();

                // extract value
                auto state = append_key_value_for_multi_get(reverse_kvs,
                                                            it->key(),
                                                            it->value(),
                                                            request.sort_key_filter_type,
                                                            request.sort_key_filter_pattern,
                                                            epoch_now,
                                                            request.no_value);
                switch (state) {
                case range_iteration_state::kNormal: {
                    count++;
                    auto &kv = reverse_kvs.back();
                    uint64_t kv_size = kv.key.length() + kv.value.length();
                    size += kv_size;
                    limiter->add_size(kv_size);
                } break;
                case range_iteration_state::kExpired:
                    expire_count++;
                    break;
                case range_iteration_state::kFiltered:
                    filter_count++;
                    break;
                default:
                    break;
                }

                if (c == 0) {
                    // if arrived to the last position
                    complete = true;
                    break;
                }

                it->Prev();
            }

            if (it->status().ok() && !reverse_kvs.empty()) {
                // revert order to make resp.kvs ordered in sort_key
                resp.kvs.reserve(reverse_kvs.size());
                for (int i = reverse_kvs.size() - 1; i >= 0; i--) {
                    resp.kvs.emplace_back(std::move(reverse_kvs[i]));
                }
            }
        }

        iteration_count = limiter->get_iteration_count();
        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (FLAGS_rocksdb_verbose_log) {
                LOG_ERROR_PREFIX("rocksdb scan failed for multi_get from {}: hash_key = \"{}\", "
                                 "reverse = {}, error = {}",
                                 rpc.remote_address(),
                                 ::pegasus::utils::c_escape_string(request.hash_key),
                                 request.reverse ? "true" : "false",
                                 it->status().ToString());
            } else {
                LOG_ERROR_PREFIX(
                    "rocksdb scan failed for multi_get from {}: reverse = {}, error = {}",
                    rpc.remote_address(),
                    request.reverse ? "true" : "false",
                    it->status().ToString());
            }
            resp.kvs.clear();
        } else if (it->Valid() && !complete) {
            // scan not completed
            resp.error = rocksdb::Status::kIncomplete;
            if (limiter->exceed_limit()) {
                LOG_WARNING_PREFIX(
                    "rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)",
                    rpc.remote_address(),
                    limiter->duration_time(),
                    limiter->max_duration_time());
            }
        }
    } else { // condition: !request.sort_keys.empty()
        bool error_occurred = false;
        rocksdb::Status final_status;
        bool exceed_limit = false;
        std::vector<::dsn::blob> keys_holder;
        std::vector<rocksdb::Slice> keys;
        std::vector<std::string> values;
        keys_holder.reserve(request.sort_keys.size());
        keys.reserve(request.sort_keys.size());
        for (auto &sort_key : request.sort_keys) {
            ::dsn::blob raw_key;
            pegasus_generate_key(raw_key, request.hash_key, sort_key);
            keys.emplace_back(raw_key.data(), raw_key.length());
            keys_holder.emplace_back(std::move(raw_key));
        }

        std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
        for (int i = 0; i < keys.size(); i++) {
            rocksdb::Status &status = statuses[i];
            std::string &value = values[i];
            // print log
            if (!status.ok()) {
                if (FLAGS_rocksdb_verbose_log) {
                    LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", "
                                     "sort_key = \"{}\", error = {}",
                                     rpc.remote_address(),
                                     ::pegasus::utils::c_escape_string(request.hash_key),
                                     ::pegasus::utils::c_escape_string(request.sort_keys[i]),
                                     status.ToString());
                } else if (!status.IsNotFound()) {
                    LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: error = {}",
                                     rpc.remote_address(),
                                     status.ToString());
                }
            }
            // check ttl
            if (status.ok()) {
                uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
                if (expire_ts > 0 && expire_ts <= epoch_now) {
                    expire_count++;
                    if (FLAGS_rocksdb_verbose_log) {
                        LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}",
                                         rpc.remote_address());
                    }
                    status = rocksdb::Status::NotFound();
                }
            }
            // extract value
            if (status.ok()) {
                // check if exceed limit
                if (count >= max_kv_count || size >= max_kv_size) {
                    exceed_limit = true;
                    break;
                }
                ::dsn::apps::key_value kv;
                kv.key = request.sort_keys[i];
                if (!request.no_value) {
                    pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
                }
                count++;
                size += kv.key.length() + kv.value.length();
                resp.kvs.emplace_back(std::move(kv));
            }
            // if error occurred
            if (!status.ok() && !status.IsNotFound()) {
                error_occurred = true;
                final_status = status;
                break;
            }
        }

        if (error_occurred) {
            resp.error = final_status.code();
            resp.kvs.clear();
        } else if (exceed_limit) {
            resp.error = rocksdb::Status::kIncomplete;
        } else {
            resp.error = rocksdb::Status::kOk;
        }
    }

#ifdef PEGASUS_UNIT_TEST
    // sleep 10ms for unit test
    usleep(10 * 1000);
#endif

    uint64_t time_used = dsn_now_ns() - start_time;
    if (is_multi_get_abnormal(time_used, size, iteration_count)) {
        LOG_WARNING_PREFIX(
            "rocksdb abnormal multi_get from {}: hash_key = {}, "
            "start_sort_key = {} ({}), stop_sort_key = {} ({}), "
            "sort_key_filter_type = {}, sort_key_filter_pattern = {}, "
            "max_kv_count = {}, max_kv_size = {}, reverse = {}, "
            "result_count = {}, result_size = {}, iteration_count = {}, "
            "expire_count = {}, filter_count = {}, time_used = {} ns",
            rpc.remote_address(),
            ::pegasus::utils::c_escape_string(request.hash_key),
            ::pegasus::utils::c_escape_string(request.start_sortkey),
            request.start_inclusive ? "inclusive" : "exclusive",
            ::pegasus::utils::c_escape_string(request.stop_sortkey),
            request.stop_inclusive ? "inclusive" : "exclusive",
            ::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)->second,
            ::pegasus::utils::c_escape_string(request.sort_key_filter_pattern),
            request.max_kv_count,
            request.max_kv_size,
            request.reverse ? "true" : "false",
            count,
            size,
            iteration_count,
            expire_count,
            filter_count,
            time_used);
        _pfc_recent_abnormal_count->increment();
    }

    if (expire_count > 0) {
        _pfc_recent_expire_count->add(expire_count);
    }
    if (filter_count > 0) {
        _pfc_recent_filter_count->add(filter_count);
    }

    _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
    _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}