void pegasus_server_impl::on_get_scanner()

in src/server/pegasus_server_impl.cpp [1148:1394]


void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
{
    CHECK_TRUE(_is_open);

    METRIC_VAR_INCREMENT(scan_requests);

    auto &resp = rpc.response();
    resp.app_id = _gpid.get_app_id();
    resp.partition_index = _gpid.get_partition_index();
    resp.server = _primary_host_port;

    CHECK_READ_THROTTLING();

    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);

    const auto &request = rpc.request();
    dsn::message_ex *req = rpc.dsn_request();
    if (!is_filter_type_supported(request.hash_key_filter_type)) {
        LOG_ERROR_PREFIX(
            "invalid argument for get_scanner from {}: hash key filter type {} not supported",
            rpc.remote_address(),
            request.hash_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
        return;
    }

    if (!is_filter_type_supported(request.sort_key_filter_type)) {
        LOG_ERROR_PREFIX(
            "invalid argument for get_scanner from {}: sort key filter type {} not supported",
            rpc.remote_address(),
            request.sort_key_filter_type);
        resp.error = rocksdb::Status::kInvalidArgument;
        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
        return;
    }

    rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
    if (_data_cf_opts.prefix_extractor) {
        ::dsn::blob start_hash_key, tmp;
        pegasus_restore_key(request.start_key, start_hash_key, tmp);
        if (start_hash_key.size() == 0 || request.full_scan) {
            // hash_key is not passed, only happened when do full scan (scanners got by
            // get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
            rd_opts.total_order_seek = true;
            rd_opts.prefix_same_as_start = false;
        }
    }
    bool start_inclusive = request.start_inclusive;
    bool stop_inclusive = request.stop_inclusive;
    rocksdb::Slice start(request.start_key.data(), request.start_key.length());
    rocksdb::Slice stop(request.stop_key.data(), request.stop_key.length());

    // limit key range by prefix filter
    // because data is not ordered by hash key (hash key "aa" is greater than "b"),
    // so we can only limit the start range by hash key filter.
    ::dsn::blob prefix_start_key;
    if (request.hash_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
        request.hash_key_filter_pattern.length() > 0) {
        pegasus_generate_key(prefix_start_key, request.hash_key_filter_pattern, ::dsn::blob());
        rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
        if (prefix_start.compare(start) > 0) {
            start = prefix_start;
            start_inclusive = true;
            // Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real
            // hashkey, we should not seek this prefix by prefix bloom filter. However, it only
            // happen when do full scan (scanners got by get_unordered_scanners), in which case the
            // following flags has been updated.
            CHECK(!_data_cf_opts.prefix_extractor || rd_opts.total_order_seek, "Invalid option");
            CHECK(!_data_cf_opts.prefix_extractor || !rd_opts.prefix_same_as_start,
                  "Invalid option");
        }
    }

    // check if range is empty
    int c = start.compare(stop);
    if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
        // empty key range
        if (FLAGS_rocksdb_verbose_log) {
            LOG_WARNING_PREFIX("empty key range for get_scanner from {}: start_key = \"{}\" ({}), "
                               "stop_key = \"{}\" ({})",
                               rpc.remote_address(),
                               ::pegasus::utils::c_escape_sensitive_string(request.start_key),
                               request.start_inclusive ? "inclusive" : "exclusive",
                               ::pegasus::utils::c_escape_sensitive_string(request.stop_key),
                               request.stop_inclusive ? "inclusive" : "exclusive");
        }
        resp.error = rocksdb::Status::kOk;
        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
        return;
    }

    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rd_opts, _data_cf));
    it->Seek(start);
    bool complete = false;
    bool first_exclusive = !start_inclusive;
    uint32_t epoch_now = ::pegasus::utils::epoch_now();
    uint64_t expire_count = 0;
    uint64_t filter_count = 0;
    int32_t count = 0;

    uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
    if (request.batch_size > 0 && request.batch_size < batch_count) {
        batch_count = request.batch_size;
    }
    resp.kvs.reserve(batch_count);

    bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
    bool only_return_count = request.__isset.only_return_count ? request.only_return_count : false;

    std::unique_ptr<range_read_limiter> limiter =
        std::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
                                             0,
                                             _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

    while (count < batch_count && limiter->valid() && it->Valid()) {
        int c = it->key().compare(stop);
        if (c > 0 || (c == 0 && !stop_inclusive)) {
            // out of range
            complete = true;
            break;
        }

        if (first_exclusive) {
            first_exclusive = false;
            if (it->key().compare(start) == 0) {
                // discard start_sortkey
                it->Next();
                continue;
            }
        }

        limiter->add_count();

        auto state = validate_key_value_for_scan(
            it->key(),
            it->value(),
            request.hash_key_filter_type,
            request.hash_key_filter_pattern,
            request.sort_key_filter_type,
            request.sort_key_filter_pattern,
            epoch_now,
            request.__isset.validate_partition_hash ? request.validate_partition_hash : true);

        switch (state) {
        case range_iteration_state::kNormal:
            count++;
            if (!only_return_count) {
                append_key_value(
                    resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts);
            }
            break;
        case range_iteration_state::kExpired:
            expire_count++;
            break;
        case range_iteration_state::kFiltered:
            filter_count++;
            break;
        default:
            break;
        }

        if (c == 0) {
            // seek to the last position
            complete = true;
            break;
        }

        it->Next();
    }
    if (only_return_count) {
        resp.__set_kv_count(count);
    }

    // check iteration time whether exceed limit
    if (!complete) {
        limiter->time_check_after_incomplete_scan();
    }

    resp.error = it->status().code();
    if (!it->status().ok()) {
        // error occur
        if (FLAGS_rocksdb_verbose_log) {
            LOG_ERROR_PREFIX("rocksdb scan failed for get_scanner from {}: start_key = \"{}\" "
                             "({}), stop_key = \"{}\" ({}), batch_size = {}, read_count = {}, "
                             "error = {}",
                             rpc.remote_address(),
                             ::pegasus::utils::c_escape_sensitive_string(start),
                             request.start_inclusive ? "inclusive" : "exclusive",
                             ::pegasus::utils::c_escape_sensitive_string(stop),
                             request.stop_inclusive ? "inclusive" : "exclusive",
                             batch_count,
                             count,
                             it->status().ToString());
        } else {
            LOG_ERROR_PREFIX("rocksdb scan failed for get_scanner from {}: error = {}",
                             rpc.remote_address(),
                             it->status().ToString());
        }
        resp.kvs.clear();
    } else if (limiter->exceed_limit()) {
        // scan exceed limit time
        resp.error = rocksdb::Status::kIncomplete;
        LOG_WARNING_PREFIX("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS "
                           "time_threshold_ns({})",
                           rpc.remote_address(),
                           batch_count,
                           limiter->duration_time(),
                           limiter->max_duration_time());
    } else if (it->Valid() && !complete) {
        // scan not completed
        std::unique_ptr<pegasus_scan_context> context(new pegasus_scan_context(
            std::move(it),
            std::string(stop.data(), stop.size()),
            request.stop_inclusive,
            request.hash_key_filter_type,
            std::string(request.hash_key_filter_pattern.data(),
                        request.hash_key_filter_pattern.length()),
            request.sort_key_filter_type,
            std::string(request.sort_key_filter_pattern.data(),
                        request.sort_key_filter_pattern.length()),
            batch_count,
            request.no_value,
            request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
            return_expire_ts,
            only_return_count));
        int64_t handle = _context_cache.put(std::move(context));
        resp.context_id = handle;
        // if the context is used, it will be fetched and re-put into cache,
        // which will change the handle,
        // then the delayed task will fetch null context by old handle, and do nothing.
        ::dsn::tasking::enqueue(
            LPC_PEGASUS_SERVER_DELAY,
            &_tracker,
            [this, handle]() { _context_cache.fetch(handle); },
            0,
            std::chrono::minutes(5));
    } else {
        // scan completed
        resp.context_id = pegasus_scan_context::SCAN_CONTEXT_ID_COMPLETED;
    }

    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
    METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);

    _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}