openr/kvstore/tools/KvStoreSnooper.cpp (74 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include <iostream> #include <folly/init/Init.h> #include <folly/logging/xlog.h> #include <openr/common/OpenrClient.h> #include <openr/kvstore/KvStore.h> #include <openr/kvstore/KvStoreUtil.h> DEFINE_string(host, "::1", "Host to connect to"); DEFINE_int32(port, openr::Constants::kOpenrCtrlPort, "OpenrCtrl server port"); DEFINE_int32(connect_timeout_ms, 1000, "Connect timeout for client"); DEFINE_int32(processing_timeout_ms, 5000, "Processing timeout for client"); int main(int argc, char** argv) { // Initialize all params folly::init(&argc, &argv); // Define and start event base folly::EventBase evb; std::thread evbThread([&evb]() { evb.loopForever(); }); // Create Open/R client auto client = openr::getOpenrCtrlPlainTextClient< openr::thrift::OpenrCtrlCppAsyncClient, apache::thrift::RocketClientChannel>( evb, folly::IPAddress(FLAGS_host), FLAGS_port, std::chrono::milliseconds(FLAGS_connect_timeout_ms), std::chrono::milliseconds(FLAGS_processing_timeout_ms)); auto response = client->semifuture_subscribeAndGetAreaKvStores({}, {}).get(); std::unordered_map< std::string /* area */, std::unordered_map<std::string /* key */, openr::thrift::Value>> areaKeyVals; XLOG(INFO) << "Stream is connected, updates will follow"; for (auto const& pub : response.response) { XLOG(INFO) << "Received " << pub.keyVals_ref()->size() << " entries in initial dump for area: " << *pub.area_ref(); areaKeyVals[*pub.area_ref()] = *pub.keyVals_ref(); } XLOG(INFO) << ""; auto subscription = std::move(response.stream) .subscribeExTry( folly::Executor::getKeepAliveToken(&evb), [areaKeyVals = std::move(areaKeyVals)]( folly::Try<openr::thrift::Publication>&& maybePub) mutable { if (maybePub.hasException()) { XLOG(ERR) << maybePub.exception().what(); return; } auto& pub = maybePub.value(); // Print expired key-vals for (const auto& key : *pub.expiredKeys_ref()) { std::cout << "Expired Key: " << key << std::endl; std::cout << "" << std::endl; } // Print updates auto updatedKeyVals = openr::mergeKeyValues( areaKeyVals.at(pub.get_area()), *pub.keyVals_ref()) .first; for (auto& [key, val] : updatedKeyVals) { std::cout << (val.value_ref().has_value() ? "Updated" : "Refreshed") << " KeyVal: " << key << std::endl; std::cout << " version: " << *val.version_ref() << std::endl; std::cout << " originatorId: " << *val.originatorId_ref() << std::endl; std::cout << " ttl: " << *val.ttl_ref() << std::endl; std::cout << " ttlVersion: " << *val.ttlVersion_ref() << std::endl; std::cout << " hash: " << val.hash_ref().value() << std::endl << std::endl; // intended } }); evbThread.join(); subscription.cancel(); std::move(subscription).detach(); client.reset(); return 0; }