void pegasus_server_impl::on_scan()

in src/server/pegasus_server_impl.cpp [1353:1505]


void pegasus_server_impl::on_scan(scan_rpc rpc)
{
    CHECK(_is_open, "");
    _pfc_scan_qps->increment();
    uint64_t start_time = dsn_now_ns();
    const auto &request = rpc.request();
    dsn::message_ex *req = rpc.dsn_request();
    auto &resp = rpc.response();
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
    resp.server = _primary_address;

    if (!_read_size_throttling_controller->available()) {
        rpc.error() = dsn::ERR_BUSY;
        _counter_recent_read_throttling_reject_count->increment();
        return;
    }

    std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
    if (context) {
        rocksdb::Iterator *it = context->iterator.get();
        const rocksdb::Slice &stop = context->stop;
        bool stop_inclusive = context->stop_inclusive;
        ::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
        const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
        ::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
        const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
        bool no_value = context->no_value;
        bool validate_hash = context->validate_partition_hash;
        bool return_expire_ts = context->return_expire_ts;
        bool complete = false;
        uint32_t epoch_now = ::pegasus::utils::epoch_now();
        uint64_t expire_count = 0;
        uint64_t filter_count = 0;
        int32_t count = 0;

        uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
        if (context->batch_size > 0 && context->batch_size < batch_count) {
            batch_count = context->batch_size;
        }

        std::unique_ptr<range_read_limiter> limiter = std::make_unique<range_read_limiter>(
            batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

        while (count < batch_count && limiter->valid() && it->Valid()) {
            int c = it->key().compare(stop);
            if (c > 0 || (c == 0 && !stop_inclusive)) {
                // out of range
                complete = true;
                break;
            }

            limiter->add_count();

            auto state = validate_key_value_for_scan(it->key(),
                                                     it->value(),
                                                     hash_key_filter_type,
                                                     hash_key_filter_pattern,
                                                     sort_key_filter_type,
                                                     sort_key_filter_pattern,
                                                     epoch_now,
                                                     validate_hash);

            switch (state) {
            case range_iteration_state::kNormal:
                count++;
                if (!context->only_return_count) {
                    append_key_value(resp.kvs, it->key(), it->value(), no_value, return_expire_ts);
                }
                break;
            case range_iteration_state::kExpired:
                expire_count++;
                break;
            case range_iteration_state::kFiltered:
                filter_count++;
                break;
            default:
                break;
            }

            if (c == 0) {
                // seek to the last position
                complete = true;
                break;
            }

            it->Next();
        }

        if (context->only_return_count) {
            resp.__set_kv_count(count);
        }

        // check iteration time whether exceed limit
        if (!complete) {
            limiter->time_check_after_incomplete_scan();
        }

        resp.error = it->status().code();
        if (!it->status().ok()) {
            // error occur
            if (FLAGS_rocksdb_verbose_log) {
                LOG_ERROR_PREFIX("rocksdb scan failed for scan from {}: context_id= {}, stop_key = "
                                 "\"{}\" ({}), batch_size = {}, read_count = {}, error = {}",
                                 rpc.remote_address(),
                                 request.context_id,
                                 ::pegasus::utils::c_escape_string(stop),
                                 stop_inclusive ? "inclusive" : "exclusive",
                                 batch_count,
                                 count,
                                 it->status().ToString());
            } else {
                LOG_ERROR_PREFIX("rocksdb scan failed for scan from {}: error = {}",
                                 rpc.remote_address(),
                                 it->status().ToString());
            }
            resp.kvs.clear();
        } else if (limiter->exceed_limit()) {
            // scan exceed limit time
            resp.error = rocksdb::Status::kIncomplete;
            LOG_WARNING_PREFIX("rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS "
                               "time_threshold({}ns)",
                               rpc.remote_address(),
                               batch_count,
                               limiter->duration_time(),
                               limiter->max_duration_time());
        } else if (it->Valid() && !complete) {
            // scan not completed
            int64_t handle = _context_cache.put(std::move(context));
            resp.context_id = handle;
            ::dsn::tasking::enqueue(LPC_PEGASUS_SERVER_DELAY,
                                    &_tracker,
                                    [this, handle]() { _context_cache.fetch(handle); },
                                    0,
                                    std::chrono::minutes(5));
        } else {
            // scan completed
            resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
        }

        if (expire_count > 0) {
            _pfc_recent_expire_count->add(expire_count);
        }
        if (filter_count > 0) {
            _pfc_recent_filter_count->add(filter_count);
        }
    } else {
        resp.error = rocksdb::Status::Code::kNotFound;
    }

    _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
    _pfc_scan_latency->set(dsn_now_ns() - start_time);
}