src/geo/lib/geo_client.cpp (891 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "geo_client.h" #include <fmt/core.h> #include <math.h> #include <pegasus/error.h> #include <s2/s1angle.h> #include <s2/s2cap.h> #include <s2/s2cell.h> #include <s2/s2cell_id.h> #include <s2/s2cell_union.h> #include <s2/s2earth.h> #include <s2/s2latlng.h> #include <s2/s2region_coverer.h> #include <s2/util/units/length-units.h> #include <stddef.h> #include <atomic> #include <cstdint> #include <limits> #include <mutex> #include <type_traits> #include <vector> #include "base/pegasus_key_schema.h" #include "base/pegasus_utils.h" #include "geo/lib/latlng_codec.h" #include "pegasus/client.h" #include "utils/blob.h" #include "utils/error_code.h" #include "utils/errors.h" #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/synchronize.h" DSN_DEFINE_int32(geo_client.lib, min_level, 12, "Min cell level for a scan. Cell id at this level is the hash-key in Pegasus. " "min_level is immutable after geo_client data has been inserted into DB. " "Edge length at level 12 is about 2 km"); DSN_DEFINE_int32(geo_client.lib, max_level, 16, "Max cell level for a scan. Cell id at this level is the prefix of sort-key " "in Pegasus, and it's convenient for scan operation. max_level is mutable " "at any time, and geo_client-lib users can change it to a appropriate " "value to improve performance in their scenario. Edge length at level 16 " "is about 150 m"); DSN_DEFINE_group_validator(min_max_level, [](std::string &message) -> bool { if (FLAGS_min_level >= FLAGS_max_level) { message = fmt::format("[geo_client.lib].min_level({}) should be < " "[geo_client.lib].max_level({})", FLAGS_min_level, FLAGS_max_level); return false; } return true; }); DSN_DEFINE_uint32(geo_client.lib, latitude_index, 5, "latitude index in value"); DSN_DEFINE_uint32(geo_client.lib, longitude_index, 4, "longitude index in value"); namespace pegasus { namespace geo { struct SearchResultNearer { inline bool operator()(const SearchResult &l, const SearchResult &r) { return l.distance < r.distance; } }; struct SearchResultFarther { inline bool operator()(const SearchResult &l, const SearchResult &r) { return l.distance > r.distance; } }; geo_client::geo_client(const char *config_file, const char *cluster_name, const char *common_app_name, const char *geo_app_name) { bool ok = pegasus_client_factory::initialize(config_file); CHECK(ok, "init pegasus client factory failed"); _common_data_client = pegasus_client_factory::get_client(cluster_name, common_app_name); CHECK_NOTNULL(_common_data_client, "init pegasus _common_data_client failed"); _geo_data_client = pegasus_client_factory::get_client(cluster_name, geo_app_name); CHECK_NOTNULL(_geo_data_client, "init pegasus _geo_data_client failed"); dsn::error_s s = _codec.set_latlng_indices(FLAGS_latitude_index, FLAGS_longitude_index); CHECK_OK(s, "set_latlng_indices({}, {}) failed", FLAGS_latitude_index, FLAGS_longitude_index); } dsn::error_s geo_client::set_max_level(int level) { if (level <= FLAGS_min_level) { return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "level({}) must be larger than FLAGS_min_level({})", level, FLAGS_min_level); } FLAGS_max_level = level; return dsn::error_s::ok(); } int geo_client::set(const std::string &hash_key, const std::string &sort_key, const std::string &value, int timeout_ms, int ttl_seconds, pegasus_client::internal_info *info) { int ret = PERR_OK; dsn::utils::notify_event set_completed; auto async_set_callback = [&](int ec_, pegasus_client::internal_info &&info_) { if (ec_ != PERR_OK) { LOG_ERROR("set data failed. hash_key={}, sort_key={}, error={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); ret = ec_; } if (info != nullptr) { *info = std::move(info_); } set_completed.notify(); }; async_set(hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); set_completed.wait(); return ret; } void geo_client::async_set(const std::string &hash_key, const std::string &sort_key, const std::string &value, pegasus_client::async_set_callback_t &&callback, int timeout_ms, int ttl_seconds) { async_del( hash_key, sort_key, true, [this, hash_key, sort_key, value, timeout_ms, ttl_seconds, cb = std::move(callback)]( int ec_, pegasus_client::internal_info &&info_) { if (ec_ != PERR_OK) { cb(ec_, std::move(info_)); return; } std::shared_ptr<int> ret = std::make_shared<int>(PERR_OK); std::shared_ptr<std::atomic<int32_t>> set_count = std::make_shared<std::atomic<int32_t>>(2); std::shared_ptr<pegasus_client::internal_info> info = std::make_shared<pegasus_client::internal_info>(); auto async_set_callback = [=](int ec_, pegasus_client::internal_info &&info_, DataType data_type_) { if (data_type_ == DataType::common) { *info = std::move(info_); } if (ec_ != PERR_OK) { LOG_ERROR("set {} data failed. hash_key={}, sort_key={}, error={}", data_type_ == DataType::common ? "common" : "geo", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); *ret = ec_; } if (set_count->fetch_sub(1) == 1) { if (cb != nullptr) { cb(*ret, std::move(*info)); } } }; async_set_common_data( hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); async_set_geo_data( hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); }, timeout_ms); } void geo_client::async_set(const std::string &hash_key, const std::string &sort_key, double lat_degrees, double lng_degrees, pegasus_client::async_set_callback_t &&callback, int timeout_ms, int ttl_seconds) { std::string value; if (!_codec.encode_to_value(lat_degrees, lng_degrees, value)) { callback(PERR_GEO_INVALID_LATLNG_ERROR, {}); return; } async_set(hash_key, sort_key, value, std::move(callback), timeout_ms, ttl_seconds); } int geo_client::get(const std::string &hash_key, const std::string &sort_key, double &lat_degrees, double &lng_degrees, int timeout_ms) { int ret = PERR_OK; dsn::utils::notify_event get_completed; auto get_latlng_callback = [&](int ec_, int id_, double lat_degrees_, double lng_degrees_) { if (ec_ == PERR_OK) { lat_degrees = lat_degrees_; lng_degrees = lng_degrees_; } else { LOG_WARNING("get data failed. hash_key={}, sort_key={}, error={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); } ret = ec_; get_completed.notify(); }; async_get(hash_key, sort_key, 0, get_latlng_callback, timeout_ms); get_completed.wait(); return ret; } void geo_client::async_get(const std::string &hash_key, const std::string &sort_key, int id, get_latlng_callback_t &&callback, int timeout_ms) { _common_data_client->async_get( hash_key, sort_key, [this, &hash_key, &sort_key, id, cb = std::move(callback)]( int ec_, std::string &&value_, pegasus_client::internal_info &&info_) { if (ec_ != PERR_OK) { cb(ec_, id, 0, 0); return; } S2LatLng latlng; if (!_codec.decode_from_value(value_, latlng)) { LOG_ERROR("decode_from_value failed. hash_key={}, sort_key={}, value={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), value_); cb(PERR_GEO_DECODE_VALUE_ERROR, id, 0, 0); return; } cb(ec_, id, latlng.lat().degrees(), latlng.lng().degrees()); }, timeout_ms); } int geo_client::del(const std::string &hash_key, const std::string &sort_key, int timeout_ms, pegasus_client::internal_info *info) { int ret = PERR_OK; dsn::utils::notify_event del_completed; auto async_del_callback = [&](int ec_, pegasus_client::internal_info &&info_) { if (ec_ != PERR_OK) { LOG_ERROR("del data failed. hash_key={}, sort_key={}, error={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); ret = ec_; } if (info != nullptr) { *info = std::move(info_); } del_completed.notify(); }; async_del(hash_key, sort_key, false, async_del_callback, timeout_ms); del_completed.wait(); return ret; } void geo_client::async_del(const std::string &hash_key, const std::string &sort_key, bool keep_common_data, pegasus_client::async_del_callback_t &&callback, int timeout_ms) { _common_data_client->async_get( hash_key, sort_key, [this, hash_key, sort_key, keep_common_data, timeout_ms, cb = std::move(callback)]( int ec_, std::string &&value_, pegasus_client::internal_info &&info_) { if (ec_ == PERR_NOT_FOUND) { if (cb != nullptr) { cb(PERR_OK, std::move(info_)); } return; } if (ec_ != PERR_OK) { if (cb != nullptr) { cb(ec_, std::move(info_)); } return; } bool keep_geo_data = false; std::string geo_hash_key; std::string geo_sort_key; if (!generate_geo_keys(hash_key, sort_key, value_, geo_hash_key, geo_sort_key)) { keep_geo_data = true; LOG_WARNING("generate_geo_keys failed"); } std::shared_ptr<int> ret = std::make_shared<int>(PERR_OK); std::shared_ptr<std::atomic<int32_t>> del_count = std::make_shared<std::atomic<int32_t>>(2); if (keep_common_data) { del_count->fetch_sub(1); } if (keep_geo_data) { del_count->fetch_sub(1); } if (del_count->load() == 0) { cb(PERR_OK, pegasus_client::internal_info()); return; } auto async_del_callback = [=](int ec__, pegasus_client::internal_info &&, DataType data_type_) mutable { if (ec__ != PERR_OK) { LOG_ERROR("del {} data failed. hash_key={}, sort_key={}, error={}", data_type_ == DataType::common ? "common" : "geo", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); *ret = ec__; } if (del_count->fetch_sub(1) == 1) { cb(*ret, std::move(info_)); } }; if (!keep_common_data) { async_del_common_data(hash_key, sort_key, async_del_callback, timeout_ms); } if (!keep_geo_data) { async_del_geo_data(geo_hash_key, geo_sort_key, async_del_callback, timeout_ms); } }, timeout_ms); } int geo_client::set_geo_data(const std::string &hash_key, const std::string &sort_key, const std::string &value, int timeout_ms, int ttl_seconds) { int ret = PERR_OK; dsn::utils::notify_event set_completed; auto async_set_callback = [&](int ec_, pegasus_client::internal_info &&info_) { if (ec_ != PERR_OK) { ret = ec_; LOG_ERROR("set geo data failed. hash_key={}, sort_key={}, error={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); } set_completed.notify(); }; async_set_geo_data(hash_key, sort_key, value, async_set_callback, timeout_ms, ttl_seconds); set_completed.wait(); return ret; } void geo_client::async_set_geo_data(const std::string &hash_key, const std::string &sort_key, const std::string &value, pegasus_client::async_set_callback_t &&callback, int timeout_ms, int ttl_seconds) { async_set_geo_data( hash_key, sort_key, value, [cb = std::move(callback)](int ec_, pegasus_client::internal_info &&info_, DataType) { if (cb != nullptr) { cb(ec_, std::move(info_)); } }, timeout_ms, ttl_seconds); } int geo_client::search_radial(double lat_degrees, double lng_degrees, double radius_m, int count, SortType sort_type, int timeout_ms, std::list<SearchResult> &result) { int ret = PERR_OK; S2LatLng latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees); if (!latlng.is_valid()) { LOG_ERROR("latlng is invalid. lat_degrees={}, lng_degrees={}", lat_degrees, lng_degrees); return PERR_GEO_INVALID_LATLNG_ERROR; } dsn::utils::notify_event search_completed; async_search_radial(latlng, radius_m, count, sort_type, timeout_ms, [&](int ec_, std::list<SearchResult> &&result_) { if (PERR_OK == ec_) { result = std::move(result_); } ret = ec_; search_completed.notify(); }); search_completed.wait(); return ret; } void geo_client::async_search_radial(double lat_degrees, double lng_degrees, double radius_m, int count, SortType sort_type, int timeout_ms, geo_search_callback_t &&callback) { S2LatLng latlng = S2LatLng::FromDegrees(lat_degrees, lng_degrees); if (!latlng.is_valid()) { LOG_ERROR("latlng is invalid. lat_degrees={}, lng_degrees={}", lat_degrees, lng_degrees); callback(PERR_GEO_INVALID_LATLNG_ERROR, {}); } async_search_radial(latlng, radius_m, count, sort_type, timeout_ms, std::move(callback)); } int geo_client::search_radial(const std::string &hash_key, const std::string &sort_key, double radius_m, int count, SortType sort_type, int timeout_ms, std::list<SearchResult> &result) { int ret = PERR_OK; dsn::utils::notify_event search_completed; async_search_radial(hash_key, sort_key, radius_m, count, sort_type, timeout_ms, [&](int ec_, std::list<SearchResult> &&result_) { if (ec_ != PERR_OK) { ret = ec_; } result = std::move(result_); search_completed.notify(); }); search_completed.wait(); return ret; } void geo_client::async_search_radial(const std::string &hash_key, const std::string &sort_key, double radius_m, int count, SortType sort_type, int timeout_ms, geo_search_callback_t &&callback) { _common_data_client->async_get( hash_key, sort_key, [this, hash_key, sort_key, radius_m, count, sort_type, timeout_ms, cb = std::move(callback)]( int ec_, std::string &&value_, pegasus_client::internal_info &&) mutable { if (ec_ != PERR_OK) { LOG_ERROR("get failed. hash_key={}, sort_key={}, error={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), get_error_string(ec_)); cb(ec_, {}); return; } S2LatLng latlng; if (!_codec.decode_from_value(value_, latlng)) { LOG_ERROR("decode_from_value failed. hash_key={}, sort_key={}, value={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), value_); cb(ec_, {}); return; } async_search_radial( latlng, radius_m, count, sort_type, (int)ceil(timeout_ms * 0.8), std::move(cb)); }, (int)ceil(timeout_ms * 0.2)); } void geo_client::async_search_radial(const S2LatLng &latlng, double radius_m, int count, SortType sort_type, int timeout_ms, geo_search_callback_t &&callback) { // generate a cap std::shared_ptr<S2Cap> cap_ptr = std::make_shared<S2Cap>(); gen_search_cap(latlng, radius_m, *cap_ptr); // generate cell ids S2CellUnion cids; gen_cells_covered_by_cap(*cap_ptr, cids); // search data in the cell ids async_get_result_from_cells(cids, cap_ptr, count, sort_type, timeout_ms, [this, count, sort_type, cb = std::move(callback)]( std::list<std::list<SearchResult>> &&results_) { std::list<SearchResult> result; normalize_result(std::move(results_), count, sort_type, result); cb(PERR_OK, std::move(result)); }); } void geo_client::gen_search_cap(const S2LatLng &latlng, double radius_m, S2Cap &cap) { util::units::Meters radius((float)radius_m); cap = S2Cap(latlng.ToPoint(), S2Earth::ToAngle(radius)); } void geo_client::gen_cells_covered_by_cap(const S2Cap &cap, S2CellUnion &cids) { S2RegionCoverer rc; rc.mutable_options()->set_fixed_level(FLAGS_min_level); cids = rc.GetCovering(cap); } void geo_client::async_get_result_from_cells(const S2CellUnion &cids, std::shared_ptr<S2Cap> cap_ptr, int count, SortType sort_type, int timeout_ms, scan_all_area_callback_t &&callback) { int single_scan_count = count; if (sort_type == SortType::asc || sort_type == SortType::desc) { single_scan_count = -1; // scan all data to make full sort } // scan all cell ids std::shared_ptr<std::list<std::list<SearchResult>>> results = std::make_shared<std::list<std::list<SearchResult>>>(); std::shared_ptr<std::atomic<bool>> send_finish = std::make_shared<std::atomic<bool>>(false); std::shared_ptr<std::atomic<int>> scan_count = std::make_shared<std::atomic<int>>(0); auto single_scan_finish_callback = [send_finish, scan_count, results, cb = std::move(callback)]() { // NOTE: make sure fetch_sub is at first of the if expression to make it always execute if (scan_count->fetch_sub(1) == 1 && send_finish->load()) { cb(std::move(*results.get())); } }; for (const auto &cid : cids) { if (cap_ptr->Contains(S2Cell(cid))) { // for the full contained cell, scan all data in this cell(which is at the // FLAGS_min_level) results->emplace_back(std::list<SearchResult>()); scan_count->fetch_add(1); start_scan(cid.ToString(), "", "", cap_ptr, single_scan_count, timeout_ms, single_scan_finish_callback, results->back()); } else { // for the partial contained cell, scan cells covered by the cap at the FLAGS_max_level // which is more accurate than the ones at FLAGS_min_level, but it will cost more time // on calculating here. std::string hash_key = cid.parent(FLAGS_min_level).ToString(); std::pair<std::string, std::string> start_stop_sort_keys; S2CellId pre; // traverse all sub cell ids of `cid` on FLAGS_max_level along the Hilbert curve, to // find the needed ones. for (S2CellId cur = cid.child_begin(FLAGS_max_level); cur != cid.child_end(FLAGS_max_level); cur = cur.next()) { if (cap_ptr->MayIntersect(S2Cell(cur))) { // only cells whose any vertex is contained by the cap is needed if (!pre.is_valid()) { // `cur` is the very first cell in Hilbert curve contained by the cap pre = cur; start_stop_sort_keys.first = gen_start_sort_key(pre, hash_key); } else { if (pre.next() != cur) { // `pre` is the last cell in Hilbert curve contained by the cap // `cur` is a new start cell in Hilbert curve contained by the cap start_stop_sort_keys.second = gen_stop_sort_key(pre, hash_key); results->emplace_back(std::list<SearchResult>()); scan_count->fetch_add(1); start_scan(hash_key, std::move(start_stop_sort_keys.first), std::move(start_stop_sort_keys.second), cap_ptr, single_scan_count, timeout_ms, single_scan_finish_callback, results->back()); start_stop_sort_keys.first = gen_start_sort_key(cur, hash_key); start_stop_sort_keys.second.clear(); } pre = cur; } } } CHECK(!start_stop_sort_keys.first.empty(), ""); // the last sub slice of current `cid` on FLAGS_max_level in Hilbert curve covered by // `cap` if (start_stop_sort_keys.second.empty()) { start_stop_sort_keys.second = gen_stop_sort_key(pre, hash_key); results->emplace_back(std::list<SearchResult>()); scan_count->fetch_add(1); start_scan(hash_key, std::move(start_stop_sort_keys.first), std::move(start_stop_sort_keys.second), cap_ptr, single_scan_count, timeout_ms, single_scan_finish_callback, results->back()); } } } // when all scan rpc have received before send_finish is set to true, the callback will never be // called, so we add 2 lines tricky code as follows scan_count->fetch_add(1); send_finish->store(true); single_scan_finish_callback(); } void geo_client::normalize_result(std::list<std::list<SearchResult>> &&results, int count, SortType sort_type, std::list<SearchResult> &result) { result.clear(); for (auto &r : results) { result.splice(result.end(), r); if (sort_type == SortType::random && count > 0 && result.size() >= count) { break; } } if (sort_type == SortType::asc) { result = utils::top_n<SearchResult, SearchResultNearer>(result, count).to(); } else if (sort_type == SortType::desc) { result = utils::top_n<SearchResult, SearchResultFarther>(result, count).to(); } else if (count > 0 && result.size() > count) { result.resize((size_t)count); } } bool geo_client::generate_geo_keys(const std::string &hash_key, const std::string &sort_key, const std::string &value, std::string &geo_hash_key, std::string &geo_sort_key) { // extract latitude and longitude from value S2LatLng latlng; if (!_codec.decode_from_value(value, latlng)) { LOG_ERROR("decode_from_value failed. hash_key={}, sort_key={}, value={}", utils::redact_sensitive_string(hash_key), utils::redact_sensitive_string(sort_key), value); return false; } // generate hash key S2CellId leaf_cell_id = S2Cell(latlng).id(); S2CellId parent_cell_id = leaf_cell_id.parent(FLAGS_min_level); geo_hash_key = parent_cell_id.ToString(); // [0,5]{1}/[0,3]{FLAGS_min_level} // generate sort key dsn::blob sort_key_postfix; pegasus_generate_key(sort_key_postfix, hash_key, sort_key); geo_sort_key = leaf_cell_id.ToString().substr(geo_hash_key.length()) + ":" + sort_key_postfix.to_string(); // [0,3]{30-FLAGS_min_level}:combine_keys return true; } bool geo_client::restore_origin_keys(const std::string &geo_sort_key, std::string &origin_hash_key, std::string &origin_sort_key) { // geo_sort_key: [0,3]{30-FLAGS_min_level}:combine_keys int cid_prefix_len = 30 - FLAGS_min_level + 1; // '1' is for ':' in geo_sort_key if (geo_sort_key.length() <= cid_prefix_len) { return false; } auto origin_keys_len = static_cast<unsigned int>(geo_sort_key.length() - cid_prefix_len); pegasus_restore_key(dsn::blob(geo_sort_key.c_str(), cid_prefix_len, origin_keys_len), origin_hash_key, origin_sort_key); return true; } std::string geo_client::gen_sort_key(const S2CellId &max_level_cid, const std::string &hash_key) { return max_level_cid.ToString().substr(hash_key.length()); } std::string geo_client::gen_start_sort_key(const S2CellId &max_level_cid, const std::string &hash_key) { return gen_sort_key(max_level_cid, hash_key); } std::string geo_client::gen_stop_sort_key(const S2CellId &max_level_cid, const std::string &hash_key) { return gen_sort_key(max_level_cid, hash_key) + "z"; } void geo_client::async_set_common_data(const std::string &hash_key, const std::string &sort_key, const std::string &value, update_callback_t &&callback, int timeout_ms, int ttl_seconds) { _common_data_client->async_set( hash_key, sort_key, value, [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { cb(error_code, std::move(info), DataType::common); }, timeout_ms, ttl_seconds); } void geo_client::async_set_geo_data(const std::string &hash_key, const std::string &sort_key, const std::string &value, update_callback_t &&callback, int timeout_ms, int ttl_seconds) { std::string geo_hash_key; std::string geo_sort_key; if (!generate_geo_keys(hash_key, sort_key, value, geo_hash_key, geo_sort_key)) { callback(PERR_GEO_DECODE_VALUE_ERROR, pegasus_client::internal_info(), DataType::geo); return; } _geo_data_client->async_set( geo_hash_key, geo_sort_key, value, [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { cb(error_code, std::move(info), DataType::geo); }, timeout_ms, ttl_seconds); } void geo_client::async_del_common_data(const std::string &hash_key, const std::string &sort_key, update_callback_t &&callback, int timeout_ms) { _common_data_client->async_del( hash_key, sort_key, [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { cb(error_code, std::move(info), DataType::common); }, timeout_ms); } void geo_client::async_del_geo_data(const std::string &geo_hash_key, const std::string &geo_sort_key, update_callback_t &&callback, int timeout_ms) { _geo_data_client->async_del( geo_hash_key, geo_sort_key, [cb = std::move(callback)](int error_code, pegasus_client::internal_info &&info) { cb(error_code, std::move(info), DataType::geo); }, timeout_ms); } void geo_client::start_scan(const std::string &hash_key, std::string &&start_sort_key, std::string &&stop_sort_key, std::shared_ptr<S2Cap> cap_ptr, int count, int timeout_ms, scan_one_area_callback_t &&callback, std::list<SearchResult> &result) { pegasus_client::scan_options options; options.start_inclusive = true; options.stop_inclusive = true; options.batch_size = 1000; options.timeout_ms = timeout_ms; _geo_data_client->async_get_scanner( hash_key, start_sort_key, stop_sort_key, options, [this, cap_ptr, count, cb = std::move(callback), &result]( int error_code, pegasus_client::pegasus_scanner *hash_scanner) mutable { if (error_code == PERR_OK) { do_scan(hash_scanner->get_smart_wrapper(), cap_ptr, count, std::move(cb), result); } else { cb(); } }); } void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper, std::shared_ptr<S2Cap> cap_ptr, int count, scan_one_area_callback_t &&callback, std::list<SearchResult> &result) { scanner_wrapper->async_next( [this, cap_ptr, count, scanner_wrapper, cb = std::move(callback), &result]( int ret, std::string &&geo_hash_key, std::string &&geo_sort_key, std::string &&value, pegasus_client::internal_info &&info, uint32_t expire_ts_seconds, int32_t kv_count) mutable { if (ret == PERR_SCAN_COMPLETE) { cb(); return; } if (ret != PERR_OK) { LOG_ERROR("async_next failed. error={}", get_error_string(ret)); cb(); return; } S2LatLng latlng; if (!_codec.decode_from_value(value, latlng)) { LOG_ERROR("decode_from_value failed. value={}", value); cb(); return; } double distance = S2Earth::GetDistanceMeters(S2LatLng(cap_ptr->center()), latlng); if (distance <= S2Earth::ToMeters(cap_ptr->radius())) { std::string origin_hash_key, origin_sort_key; if (!restore_origin_keys(geo_sort_key, origin_hash_key, origin_sort_key)) { LOG_ERROR("restore_origin_keys failed. geo_sort_key={}", utils::redact_sensitive_string(geo_sort_key)); cb(); return; } result.emplace_back(SearchResult(latlng.lat().degrees(), latlng.lng().degrees(), distance, std::move(origin_hash_key), std::move(origin_sort_key), std::move(value))); } if (count != -1 && result.size() >= count) { cb(); return; } do_scan(scanner_wrapper, cap_ptr, count, std::move(cb), result); }); } int geo_client::distance(const std::string &hash_key1, const std::string &sort_key1, const std::string &hash_key2, const std::string &sort_key2, int timeout_ms, double &distance) { int ret = PERR_OK; dsn::utils::notify_event get_completed; auto async_calculate_callback = [&](int ec_, double &&distance_) { if (ec_ != PERR_OK) { LOG_ERROR( "get distance failed. hash_key1={}, sort_key1={}, hash_key2={}, sort_key2={}, " "error={}", utils::redact_sensitive_string(hash_key1), utils::redact_sensitive_string(sort_key1), utils::redact_sensitive_string(hash_key2), utils::redact_sensitive_string(sort_key2), get_error_string(ec_)); ret = ec_; } distance = distance_; get_completed.notify(); }; async_distance( hash_key1, sort_key1, hash_key2, sort_key2, timeout_ms, async_calculate_callback); get_completed.wait(); return ret; } void geo_client::async_distance(const std::string &hash_key1, const std::string &sort_key1, const std::string &hash_key2, const std::string &sort_key2, int timeout_ms, distance_callback_t &&callback) { std::shared_ptr<int> ret = std::make_shared<int>(PERR_OK); std::shared_ptr<std::mutex> mutex = std::make_shared<std::mutex>(); std::shared_ptr<std::vector<S2LatLng>> get_result = std::make_shared<std::vector<S2LatLng>>(); auto async_get_callback = [=, cb = std::move(callback)]( int ec_, std::string &&value_, pegasus_client::internal_info &&) { if (ec_ != PERR_OK) { LOG_ERROR("get data failed. hash_key1={}, sort_key1={}, hash_key2={}, sort_key2={}, " "error={}", utils::redact_sensitive_string(hash_key1), utils::redact_sensitive_string(sort_key1), utils::redact_sensitive_string(hash_key2), utils::redact_sensitive_string(sort_key2), get_error_string(ec_)); *ret = ec_; } S2LatLng latlng; if (!_codec.decode_from_value(value_, latlng)) { LOG_ERROR("decode_from_value failed. value={}", value_); *ret = PERR_GEO_DECODE_VALUE_ERROR; } std::lock_guard<std::mutex> lock(*mutex); get_result->push_back(latlng); if (get_result->size() == 2) { if (*ret == PERR_OK) { double distance = S2Earth::GetDistanceMeters((*get_result)[0], (*get_result)[1]); cb(*ret, distance); } else { cb(*ret, std::numeric_limits<double>::max()); } } }; _common_data_client->async_get(hash_key1, sort_key1, async_get_callback, timeout_ms); _common_data_client->async_get(hash_key2, sort_key2, async_get_callback, timeout_ms); } } // namespace geo } // namespace pegasus