src/client_lib/pegasus_scanner_impl.cpp (347 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include <stdint.h> #include <chrono> #include <functional> #include <list> #include <memory> #include <string> #include <type_traits> #include <utility> #include <vector> #include "common/gpid.h" #include "pegasus/client.h" #include "pegasus/error.h" #include "pegasus_client_impl.h" #include "pegasus_key_schema.h" #include "rrdb/rrdb.client.h" #include "rrdb/rrdb_types.h" #include "rpc/serialization.h" #include "utils/blob.h" #include "utils/error_code.h" #include "utils/fmt_logging.h" #include "utils/synchronize.h" #include "utils/zlocks.h" namespace dsn { class message_ex; } // namespace dsn using namespace ::dsn; using namespace pegasus; namespace pegasus { namespace client { // TODO(yingchun): There are duplicate variables in src/server/pegasus_scan_context.h, // because this is in client library, it's better to avoid including too many headers. // We can move it to thrift which would be included by both server and client. static const int SCAN_CONTEXT_ID_VALID_MIN = 0; static const int SCAN_CONTEXT_ID_COMPLETED = -1; static const int SCAN_CONTEXT_ID_NOT_EXIST = -2; pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector<uint64_t> &&hash, const scan_options &options, bool validate_partition_hash, bool full_scan) : pegasus_scanner_impl( client, std::move(hash), options, _min, _max, validate_partition_hash, full_scan) { _options.start_inclusive = true; _options.stop_inclusive = false; } pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector<uint64_t> &&hash, const scan_options &options, const ::dsn::blob &start_key, const ::dsn::blob &stop_key, bool validate_partition_hash, bool full_scan) : _client(client), _start_key(start_key), _stop_key(stop_key), _options(options), _splits_hash(std::move(hash)), _p(-1), _kv_count(-1), _context(SCAN_CONTEXT_ID_COMPLETED), _rpc_started(false), _validate_partition_hash(validate_partition_hash), _full_scan(full_scan), _type(async_scan_type::NORMAL) { } int pegasus_client_impl::pegasus_scanner_impl::next(int32_t &count, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::string &&hash, std::string &&sort, std::string &&val, internal_info &&ii, uint32_t expire_ts_seconds, int32_t kv_count) { ret = err; if (info != nullptr) { *info = std::move(ii); } count = kv_count; op_completed.notify(); }; async_next(std::move(callback)); op_completed.wait(); return ret; } int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, std::string &sortkey, std::string &value, internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; auto callback = [&](int err, std::string &&hash, std::string &&sort, std::string &&val, internal_info &&ii, uint32_t expire_ts_seconds, int32_t kv_count) { ret = err; hashkey = std::move(hash); sortkey = std::move(sort); value = std::move(val); if (info) { (*info) = std::move(ii); } op_completed.notify(); }; async_next(std::move(callback)); op_completed.wait(); return ret; } void pegasus_client_impl::pegasus_scanner_impl::async_next(async_scan_next_callback_t &&callback) { _lock.lock(); if (_queue.empty()) { _queue.emplace_back(std::move(callback)); _async_next_internal(); // do not unlock() to ensure that other callbacks won't be executed in the this caller's // thread } else { // rpc in-progress; callback will be executed when rpc finished _queue.emplace_back(std::move(callback)); _lock.unlock(); } } bool pegasus_client_impl::pegasus_scanner_impl::safe_destructible() const { ::dsn::zauto_lock l(_lock); return _queue.empty(); } pegasus_client::pegasus_scanner_wrapper pegasus_client_impl::pegasus_scanner_impl::get_smart_wrapper() { return std::make_shared<pegasus_scanner_impl_wrapper>(this); } // rpc won't be executed concurrently void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal() { // _lock will be locked out of the while block CHECK(!_queue.empty(), "queue should not be empty when _async_next_internal start"); std::list<async_scan_next_callback_t> temp; while (true) { // count_only means should calculate kv counts once while (++_p >= _kvs.size() && _type != async_scan_type::COUNT_ONLY) { if (_context == SCAN_CONTEXT_ID_COMPLETED) { // reach the end of one partition if (_splits_hash.empty()) { // all completed swap(_queue, temp); _lock.unlock(); // ATTENTION: after unlock, member variables can not be used anymore for (auto &callback : temp) { if (callback) { internal_info info; info.app_id = -1; info.partition_index = -1; info.decree = -1; callback(PERR_SCAN_COMPLETE, std::string(), std::string(), std::string(), std::move(info), 0, -1); } } return; } else { _hash = _splits_hash.back(); _splits_hash.pop_back(); _split_reset(); } } else if (_context == SCAN_CONTEXT_ID_NOT_EXIST) { // no valid context_id found _lock.unlock(); _start_scan(); return; } else { // valid context_id _lock.unlock(); _next_batch(); return; } } // valid data got std::string hash_key, sort_key, value; uint32_t expire_ts_seconds = 0; if (!_options.only_return_count) { pegasus_restore_key(_kvs[_p].key, hash_key, sort_key); value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length()); if (_kvs[_p].__isset.expire_ts_seconds) { expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds); } } auto &callback = _queue.front(); if (callback) { internal_info info(_info); _lock.unlock(); callback(PERR_OK, std::move(hash_key), std::move(sort_key), std::move(value), std::move(info), expire_ts_seconds, _kv_count); if (_options.only_return_count) { _type = async_scan_type::COUNT_ONLY_FINISHED; } _lock.lock(); if (_queue.size() == 1) { // keep the last callback until exit this function std::swap(temp, _queue); _lock.unlock(); return; } else { _queue.pop_front(); } } } } void pegasus_client_impl::pegasus_scanner_impl::_next_batch() { ::dsn::apps::scan_request req; req.context_id = _context; CHECK(!_rpc_started, ""); _rpc_started = true; _client->scan( req, [this](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable { _on_scan_response(err, req, resp); }, std::chrono::milliseconds(_options.timeout_ms), _hash); } void pegasus_client_impl::pegasus_scanner_impl::_start_scan() { ::dsn::apps::get_scanner_request req; if (_kvs.empty()) { req.start_key = _start_key; req.start_inclusive = _options.start_inclusive; } else { req.start_key = _kvs.back().key; req.start_inclusive = false; } req.stop_key = _stop_key; req.stop_inclusive = _options.stop_inclusive; req.batch_size = _options.batch_size; req.hash_key_filter_type = (dsn::apps::filter_type::type)_options.hash_key_filter_type; req.hash_key_filter_pattern = ::dsn::blob( _options.hash_key_filter_pattern.data(), 0, _options.hash_key_filter_pattern.size()); req.sort_key_filter_type = (dsn::apps::filter_type::type)_options.sort_key_filter_type; req.sort_key_filter_pattern = ::dsn::blob( _options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size()); req.no_value = _options.no_value; req.__set_validate_partition_hash(_validate_partition_hash); req.__set_return_expire_ts(_options.return_expire_ts); req.__set_full_scan(_full_scan); req.__set_only_return_count(_options.only_return_count); CHECK(!_rpc_started, ""); _rpc_started = true; _client->get_scanner( req, [this](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable { _on_scan_response(err, req, resp); }, std::chrono::milliseconds(_options.timeout_ms), _hash); } void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { CHECK(_rpc_started, ""); _rpc_started = false; ::dsn::apps::scan_response response; if (err == ERR_OK) { ::dsn::unmarshall(resp, response); _info.app_id = response.app_id; _info.partition_index = response.partition_index; _info.decree = -1; _info.server = response.server; if (response.error == 0) { _lock.lock(); _kvs = std::move(response.kvs); _p = -1; _context = response.context_id; // If `kv_count` exists in response, then: // 1) server side supports only counting size, and // 2) `kvs` in response must be empty if (response.__isset.kv_count) { _type = async_scan_type::COUNT_ONLY; _kv_count = response.kv_count; } _async_next_internal(); return; } else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) { _lock.lock(); _context = SCAN_CONTEXT_ID_NOT_EXIST; _async_next_internal(); return; } } else { _info.app_id = -1; _info.partition_index = -1; _info.decree = -1; _info.server = ""; } // error occured auto ret = get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); internal_info info = _info; std::list<async_scan_next_callback_t> temp; _lock.lock(); std::swap(_queue, temp); _lock.unlock(); // ATTENTION: after unlock with empty queue, memebers variables can not be used anymore for (auto &callback : temp) { if (callback) { callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1); } } } void pegasus_client_impl::pegasus_scanner_impl::_split_reset() { _kvs.clear(); _p = -1; _context = SCAN_CONTEXT_ID_NOT_EXIST; } pegasus_client_impl::pegasus_scanner_impl::~pegasus_scanner_impl() { dsn::zauto_lock l(_lock); CHECK(!_rpc_started, "all scan-rpc should be completed here"); CHECK(_queue.empty(), "queue should be empty"); if (_client) { if (_context >= SCAN_CONTEXT_ID_VALID_MIN) _client->clear_scanner(_context, _hash); _client = nullptr; } } void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next( async_scan_next_callback_t &&callback) { // wrap shared_ptr _p with callback _p->async_next([__p = _p, user_callback = std::move(callback)](int error_code, std::string &&hash_key, std::string &&sort_key, std::string &&value, internal_info &&info, uint32_t expire_ts_seconds, int32_t kv_count) { user_callback(error_code, std::move(hash_key), std::move(sort_key), std::move(value), std::move(info), expire_ts_seconds, kv_count); }); } const char pegasus_client_impl::pegasus_scanner_impl::_holder[] = {'\x00', '\x00', '\xFF', '\xFF'}; const ::dsn::blob pegasus_client_impl::pegasus_scanner_impl::_min = ::dsn::blob(_holder, 0, 2); const ::dsn::blob pegasus_client_impl::pegasus_scanner_impl::_max = ::dsn::blob(_holder, 2, 2); } // namespace client } // namespace pegasus