openr/ctrl-server/OpenrCtrlHandler.cpp (1,079 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <openr/ctrl-server/OpenrCtrlHandler.h>
#if __has_include("filesystem")
#include <filesystem>
namespace fs = std::filesystem;
#else
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
#endif
#include <folly/ExceptionString.h>
#include <folly/logging/xlog.h>
#include <openr/common/Constants.h>
#include <openr/common/LsdbUtil.h>
#include <openr/common/Util.h>
#include <openr/monitor/LogSample.h>
namespace fb303 = facebook::fb303;
namespace openr {
OpenrCtrlHandler::OpenrCtrlHandler(
const std::string& nodeName,
const std::unordered_set<std::string>& acceptablePeerCommonNames,
OpenrEventBase* ctrlEvb,
Decision* decision,
Fib* fib,
KvStore<thrift::OpenrCtrlCppAsyncClient>* kvStore,
LinkMonitor* linkMonitor,
Monitor* monitor,
PersistentStore* configStore,
PrefixManager* prefixManager,
Spark* spark,
std::shared_ptr<const Config> config)
: fb303::BaseService("openr"),
nodeName_(nodeName),
acceptablePeerCommonNames_(acceptablePeerCommonNames),
ctrlEvb_(ctrlEvb),
decision_(decision),
fib_(fib),
kvStore_(kvStore),
linkMonitor_(linkMonitor),
monitor_(monitor),
configStore_(configStore),
prefixManager_(prefixManager),
spark_(spark),
config_(config) {
// We expect ctrl-evb not be running otherwise adding fiber task is not
// thread safe.
CHECK_NOTNULL(ctrlEvb);
CHECK(not ctrlEvb->isRunning());
// Add fiber task to receive publication from KvStore
if (kvStore_) {
auto taskFutureKvStore = ctrlEvb->addFiberTaskFuture(
[q = std::move(kvStore_->getKvStoreUpdatesReader()),
this]() mutable noexcept {
XLOG(INFO) << "Starting KvStore updates processing fiber";
while (true) {
auto maybePub = q.get(); // perform read
XLOG(DBG2) << "Received publication from KvStore";
if (maybePub.hasError()) {
XLOG(INFO) << "Terminating KvStore publications processing fiber";
break;
}
folly::variant_match(
std::move(maybePub).value(),
[this](thrift::Publication&& pub) {
processPublication(std::move(pub));
},
[](thrift::InitializationEvent&&) {
// skip the processing of initialization event
});
}
XLOG(INFO) << "KvStore updates processing fiber stopped";
});
workers_.push_back(std::move(taskFutureKvStore));
}
// Add fiber task to receive fib streaming
if (fib_) {
auto taskFutureFib = ctrlEvb->addFiberTaskFuture(
[q = std::move(fib_->getFibUpdatesReader()), this]() mutable noexcept {
XLOG(INFO) << "Starting Fib updates processing fiber";
while (true) {
auto maybeUpdate = q.get(); // perform read
XLOG(DBG2) << "Received DecisionRouteUpdate from Fib";
if (maybeUpdate.hasError()) {
XLOG(INFO)
<< "Terminating Fib RouteDatabaseDelta processing fiber";
break;
}
// Publish the update to all active streams
fibPublishers_.withWLock([&maybeUpdate](auto& fibPublishers) {
if (fibPublishers.size()) {
const auto fibUpdate = maybeUpdate.value().toThrift();
for (auto& fibPublisher : fibPublishers) {
fibPublisher.second.next(fibUpdate);
}
}
});
// Publish the detailed update to all active streams
fibDetailSubscribers_.withWLock(
[&maybeUpdate](auto& fibSubscribers) {
if (fibSubscribers.size()) {
const auto fibUpdateDetail =
maybeUpdate.value().toThriftDetail();
for (auto& fibSubscriber : fibSubscribers) {
fibSubscriber.second.total_messages++;
fibSubscriber.second.last_message_time =
std::chrono::system_clock::now();
fibSubscriber.second.publisher->next(fibUpdateDetail);
}
}
});
}
XLOG(INFO) << "Fib updates processing fiber stopped";
});
workers_.push_back(std::move(taskFutureFib));
}
}
OpenrCtrlHandler::~OpenrCtrlHandler() {
closeKvStorePublishers();
closeFibPublishers();
XLOG(INFO) << "Cleanup all pending request(s).";
longPollReqs_.withWLock([&](auto& longPollReqs) { longPollReqs.clear(); });
XLOG(INFO)
<< "Waiting for termination of kvStoreUpdatesQueue, FibUpdatesQueue";
folly::collectAll(workers_.begin(), workers_.end()).get();
XLOG(INFO) << "Collected all workers";
}
// NOTE: We're intentionally creating list of publishers to and then invoke
// `complete()` on them.
// Reason => `complete()` returns only when callback `onComplete` associated
// with publisher returns. Since we acquire lock within `onComplete` callback,
// we will run into the deadlock if `complete()` is invoked within
// SYNCHRONIZED block
void
OpenrCtrlHandler::closeKvStorePublishers() {
std::vector<std::unique_ptr<KvStorePublisher>> publishers;
kvStorePublishers_.withWLock([&](auto& kvStorePublishers_) {
for (auto& [_, publisher] : kvStorePublishers_) {
publishers.emplace_back(std::move(publisher));
}
});
XLOG(INFO) << "Terminating " << publishers.size()
<< " active KvStore snoop stream(s).";
for (auto& publisher : publishers) {
// We have to send an exception as part of the completion, otherwise
// thrift doesn't seem to notify the peer of the shutdown
publisher->complete(folly::make_exception_wrapper<std::runtime_error>(
"publisher terminated"));
}
}
// Refer to note on top of closeKvStorePublishers
void
OpenrCtrlHandler::closeFibPublishers() {
std::vector<apache::thrift::ServerStreamPublisher<thrift::RouteDatabaseDelta>>
fibPublishers_close;
fibPublishers_.withWLock([&fibPublishers_close](auto& fibPublishers) {
for (auto& [_, fibPublisher] : fibPublishers) {
fibPublishers_close.emplace_back(std::move(fibPublisher));
}
});
XLOG(INFO) << "Terminating " << fibPublishers_close.size()
<< " active Fib snoop stream(s).";
for (auto& fibPublisher : fibPublishers_close) {
std::move(fibPublisher).complete();
}
}
void
OpenrCtrlHandler::processPublication(thrift::Publication&& pub) {
// publish via KvStorePublisher
kvStorePublishers_.withWLock([&](auto& kvStorePublishers_) {
for (auto& [_, publisher] : kvStorePublishers_) {
publisher->last_message_time_ = std::chrono::system_clock::now();
publisher->total_messages_++;
publisher->publish(pub);
}
});
// check if any of KeyVal has 'adj' update
bool isAdjChanged = false;
for (auto& [key, val] : *pub.keyVals_ref()) {
// check if we have any value update.
// Ttl refreshing won't update any value.
if (!val.value_ref().has_value()) {
continue;
}
// "adj:*" key has changed. Update local collection
if (key.find(Constants::kAdjDbMarker.toString()) == 0) {
XLOG(DBG3) << "Adj key: " << key << " change received";
isAdjChanged = true;
break;
}
}
if (isAdjChanged) {
// thrift::Publication contains "adj:*" key change.
// Clean ALL pending promises
longPollReqs_.withWLock([&](auto& longPollReqs) {
for (auto& [_, req] : longPollReqs[*pub.area_ref()]) {
auto& p = req.first; // get the promise
p.setValue(true);
}
longPollReqs.clear();
});
} else {
longPollReqs_.withWLock([&](auto& longPollReqs) {
auto now = getUnixTimeStampMs();
std::vector<int64_t> reqsToClean;
for (auto& [clientId, req] : longPollReqs[*pub.area_ref()]) {
auto& p = req.first;
auto& timeStamp = req.second;
if (now - timeStamp >= Constants::kLongPollReqHoldTime.count()) {
XLOG(INFO) << "Elapsed time: " << now - timeStamp
<< " is over hold limit: "
<< Constants::kLongPollReqHoldTime.count();
reqsToClean.emplace_back(clientId);
p.setValue(false);
}
}
// cleanup expired requests since no ADJ change observed
for (auto& clientId : reqsToClean) {
longPollReqs[*pub.area_ref()].erase(clientId);
}
});
}
}
void
OpenrCtrlHandler::authorizeConnection() {
auto connContext = getConnectionContext()->getConnectionContext();
auto peerCommonName = connContext->getPeerCommonName();
auto peerAddr = connContext->getPeerAddress();
// We legitely accepts all connections (secure/non-secure) from localhost
if (peerAddr->isLoopbackAddress()) {
return;
}
if (peerCommonName.empty() || acceptablePeerCommonNames_.empty()) {
// for now, we will allow non-secure connections, but lets log the event so
// we know how often this is happening.
LogSample sample{};
sample.addString(
"event",
peerCommonName.empty() ? "UNENCRYPTED_CTRL_CONNECTION"
: "UNRESTRICTED_AUTHORIZATION");
sample.addString("node_name", nodeName_);
sample.addString(
"peer_address", connContext->getPeerAddress()->getAddressStr());
sample.addString("peer_common_name", peerCommonName);
XLOG(INFO) << "Authorizing request with issues: " << sample.toJson();
return;
}
if (!acceptablePeerCommonNames_.count(peerCommonName)) {
throw thrift::OpenrError(
fmt::format("Peer name {} is unacceptable", peerCommonName));
}
}
fb303::cpp2::fb303_status
OpenrCtrlHandler::getStatus() {
return fb303::cpp2::fb303_status::ALIVE;
}
void
OpenrCtrlHandler::getEventLogs(std::vector<::std::string>& _return) {
auto recentEventLogs = monitor_->getRecentEventLogs();
for (auto const& log : recentEventLogs) {
_return.emplace_back(log);
}
}
void
OpenrCtrlHandler::getCounters(std::map<std::string, int64_t>& _return) {
BaseService::getCounters(_return);
}
void
OpenrCtrlHandler::getRegexCounters(
std::map<std::string, int64_t>& _return,
std::unique_ptr<std::string> regex) {
// Compile regex
re2::RE2 compiledRegex(*regex);
if (not compiledRegex.ok()) {
return;
}
// Get all counters
std::map<std::string, int64_t> counters;
getCounters(counters);
// Filter counters
for (auto const& kv : counters) {
if (RE2::PartialMatch(kv.first, compiledRegex)) {
_return.emplace(kv);
}
}
}
void
OpenrCtrlHandler::getSelectedCounters(
std::map<std::string, int64_t>& _return,
std::unique_ptr<std::vector<std::string>> keys) {
// Get all counters
std::map<std::string, int64_t> counters;
getCounters(counters);
// Filter counters
for (auto const& key : *keys) {
auto it = counters.find(key);
if (it != counters.end()) {
_return.emplace(*it);
}
}
}
int64_t
OpenrCtrlHandler::getCounter(std::unique_ptr<std::string> key) {
return BaseService::getCounter(std::move(key));
}
void
OpenrCtrlHandler::getMyNodeName(std::string& _return) {
_return = std::string(nodeName_);
}
void
OpenrCtrlHandler::getOpenrVersion(thrift::OpenrVersions& _openrVersion) {
_openrVersion.version_ref() = Constants::kOpenrVersion;
_openrVersion.lowestSupportedVersion_ref() =
Constants::kOpenrSupportedVersion;
}
void
OpenrCtrlHandler::getBuildInfo(thrift::BuildInfo& _buildInfo) {
_buildInfo = getBuildInfoThrift();
}
// validate config
void
OpenrCtrlHandler::dryrunConfig(
std::string& _return, std::unique_ptr<std::string> file) {
if (not file) {
throw thrift::OpenrError("Dereference nullptr for config file");
}
// check if the config file exists and readable
const auto& fileName = *file;
if (not fs::exists(fileName)) {
throw thrift::OpenrError(
fmt::format("Config file doesn't exist: {}", fileName));
}
try {
// Create policy manager using new config file
auto config = Config(fileName);
_return = config.getRunningConfig();
} catch (const std::exception& ex) {
throw thrift::OpenrError(ex.what());
}
}
void
OpenrCtrlHandler::getRunningConfig(std::string& _return) {
_return = config_->getRunningConfig();
}
void
OpenrCtrlHandler::getRunningConfigThrift(thrift::OpenrConfig& _config) {
_config = config_->getConfig();
}
void
OpenrCtrlHandler::getInitializationEvents(
std::map<thrift::InitializationEvent, int64_t>& _return) {
for (int eventInt = int(thrift::InitializationEvent::INITIALIZING);
eventInt <= int(thrift::InitializationEvent::INITIALIZED);
++eventInt) {
auto event = static_cast<thrift::InitializationEvent>(eventInt);
// The fb303 counter is set in function logInitializationEvent().
auto counterKey = fmt::format(
Constants::kInitEventCounterFormat,
apache::thrift::util::enumNameSafe(event));
if (fb303::fbData->hasCounter(counterKey)) {
_return[event] = fb303::fbData->getCounter(counterKey);
}
}
}
bool
OpenrCtrlHandler::initializationConverged() {
// The fb303 counter is set in function logInitializationEvent().
auto convergenceKey = fmt::format(
Constants::kInitEventCounterFormat,
apache::thrift::util::enumNameSafe(
thrift::InitializationEvent::INITIALIZED));
return fb303::fbData->hasCounter(convergenceKey);
}
int64_t
OpenrCtrlHandler::getInitializationDurationMs() {
// The fb303 counter is set in function logInitializationEvent().
auto convergenceKey = fmt::format(
Constants::kInitEventCounterFormat,
apache::thrift::util::enumNameSafe(
thrift::InitializationEvent::INITIALIZED));
// Throw std::invalid_argument exception if convergenceKey does not exist.
return fb303::fbData->getCounter(convergenceKey);
}
std::unique_ptr<std::string>
OpenrCtrlHandler::getSingleAreaOrThrow(std::string const& caller) {
fb303::fbData->addStatValue(
fmt::format("ctrl.get_single_area.{}", caller), 1, fb303::COUNT);
auto const& areas = config_->getAreas();
if (1 != areas.size()) {
throw thrift::OpenrError(
"Iterface requires node to be confgiured with exactly one area");
}
return std::make_unique<std::string>(areas.begin()->first);
}
//
// PrefixManager APIs
//
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_advertisePrefixes(
std::unique_ptr<std::vector<thrift::PrefixEntry>> prefixes) {
CHECK(prefixManager_);
return prefixManager_->advertisePrefixes(std::move(*prefixes))
.defer([](folly::Try<bool>&&) { return folly::Unit(); });
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_withdrawPrefixes(
std::unique_ptr<std::vector<thrift::PrefixEntry>> prefixes) {
CHECK(prefixManager_);
return prefixManager_->withdrawPrefixes(std::move(*prefixes))
.defer([](folly::Try<bool>&&) { return folly::Unit(); });
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_withdrawPrefixesByType(
thrift::PrefixType prefixType) {
CHECK(prefixManager_);
return prefixManager_->withdrawPrefixesByType(prefixType)
.defer([](folly::Try<bool>&&) { return folly::Unit(); });
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_syncPrefixesByType(
thrift::PrefixType prefixType,
std::unique_ptr<std::vector<thrift::PrefixEntry>> prefixes) {
CHECK(prefixManager_);
return prefixManager_->syncPrefixesByType(prefixType, std::move(*prefixes))
.defer([](folly::Try<bool>&&) { return folly::Unit(); });
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::PrefixEntry>>>
OpenrCtrlHandler::semifuture_getPrefixes() {
CHECK(prefixManager_);
return prefixManager_->getPrefixes();
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::PrefixEntry>>>
OpenrCtrlHandler::semifuture_getPrefixesByType(thrift::PrefixType prefixType) {
CHECK(prefixManager_);
return prefixManager_->getPrefixesByType(prefixType);
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdvertisedRouteDetail>>>
OpenrCtrlHandler::semifuture_getAdvertisedRoutes() {
auto filter = std::make_unique<thrift::AdvertisedRouteFilter>();
return semifuture_getAdvertisedRoutesFiltered(std::move(filter));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdvertisedRouteDetail>>>
OpenrCtrlHandler::semifuture_getAdvertisedRoutesFiltered(
std::unique_ptr<thrift::AdvertisedRouteFilter> filter) {
CHECK(prefixManager_);
return prefixManager_->getAdvertisedRoutesFiltered(std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::OriginatedPrefixEntry>>>
OpenrCtrlHandler::semifuture_getOriginatedPrefixes() {
CHECK(prefixManager_);
return prefixManager_->getOriginatedPrefixes();
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdvertisedRoute>>>
OpenrCtrlHandler::semifuture_getAreaAdvertisedRoutes(
std::unique_ptr<std::string> areaName,
thrift::RouteFilterType routeFilterType) {
auto filter = std::make_unique<thrift::AdvertisedRouteFilter>();
return semifuture_getAreaAdvertisedRoutesFiltered(
std::move(areaName), std::move(routeFilterType), std::move(filter));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdvertisedRoute>>>
OpenrCtrlHandler::semifuture_getAreaAdvertisedRoutesFiltered(
std::unique_ptr<std::string> areaName,
thrift::RouteFilterType routeFilterType,
std::unique_ptr<thrift::AdvertisedRouteFilter> filter) {
CHECK(prefixManager_);
return prefixManager_->getAreaAdvertisedRoutes(
std::move(*areaName), std::move(routeFilterType), std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdvertisedRoute>>>
OpenrCtrlHandler::semifuture_getAdvertisedRoutesWithOriginationPolicy(
thrift::RouteFilterType routeFilterType,
std::unique_ptr<thrift::AdvertisedRouteFilter> filter) {
CHECK(prefixManager_);
return prefixManager_->getAdvertisedRoutesWithOriginationPolicy(
std::move(routeFilterType), std::move(*filter));
}
//
// Fib APIs
//
folly::SemiFuture<std::unique_ptr<thrift::RouteDatabase>>
OpenrCtrlHandler::semifuture_getRouteDb() {
CHECK(fib_);
return fib_->getRouteDb();
}
folly::SemiFuture<std::unique_ptr<thrift::RouteDatabaseDetail>>
OpenrCtrlHandler::semifuture_getRouteDetailDb() {
CHECK(fib_);
return fib_->getRouteDetailDb();
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::UnicastRoute>>>
OpenrCtrlHandler::semifuture_getUnicastRoutesFiltered(
std::unique_ptr<std::vector<std::string>> prefixes) {
CHECK(fib_);
return fib_->getUnicastRoutes(std::move(*prefixes));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::UnicastRoute>>>
OpenrCtrlHandler::semifuture_getUnicastRoutes() {
folly::Promise<std::unique_ptr<std::vector<thrift::UnicastRoute>>> p;
CHECK(fib_);
return fib_->getUnicastRoutes({});
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::MplsRoute>>>
OpenrCtrlHandler::semifuture_getMplsRoutes() {
CHECK(fib_);
return fib_->getMplsRoutes({});
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::MplsRoute>>>
OpenrCtrlHandler::semifuture_getMplsRoutesFiltered(
std::unique_ptr<std::vector<int32_t>> labels) {
CHECK(fib_);
return fib_->getMplsRoutes(std::move(*labels));
}
//
// Spark APIs
//
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_floodRestartingMsg() {
CHECK(spark_);
return spark_->floodRestartingMsg();
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::SparkNeighbor>>>
OpenrCtrlHandler::semifuture_getNeighbors() {
CHECK(spark_);
return spark_->getNeighbors();
}
//
// Performance stats APIs
//
folly::SemiFuture<std::unique_ptr<thrift::PerfDatabase>>
OpenrCtrlHandler::semifuture_getPerfDb() {
CHECK(fib_);
return fib_->getPerfDb();
}
//
// Decision APIs
//
folly::SemiFuture<std::unique_ptr<std::vector<thrift::ReceivedRouteDetail>>>
OpenrCtrlHandler::semifuture_getReceivedRoutes() {
auto filter = std::make_unique<thrift::ReceivedRouteFilter>();
return semifuture_getReceivedRoutesFiltered(std::move(filter));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::ReceivedRouteDetail>>>
OpenrCtrlHandler::semifuture_getReceivedRoutesFiltered(
std::unique_ptr<thrift::ReceivedRouteFilter> filter) {
CHECK(decision_);
return decision_->getReceivedRoutesFiltered(std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<thrift::RouteDatabase>>
OpenrCtrlHandler::semifuture_getRouteDbComputed(
std::unique_ptr<std::string> nodeName) {
CHECK(decision_);
return decision_->getDecisionRouteDb(*nodeName);
}
folly::SemiFuture<std::unique_ptr<thrift::AdjDbs>>
OpenrCtrlHandler::semifuture_getDecisionAdjacencyDbs() {
auto filter = std::make_unique<thrift::AdjacenciesFilter>();
filter->selectAreas_ref() = {
*getSingleAreaOrThrow("getDecisionAdjacencyDbs")};
return semifuture_getDecisionAdjacenciesFiltered(std::move(filter))
.deferValue([](std::unique_ptr<std::vector<thrift::AdjacencyDatabase>>&&
adjDbs) mutable {
auto res = std::make_unique<thrift::AdjDbs>();
for (auto& db : *adjDbs) {
auto name = *db.thisNodeName_ref();
res->emplace(std::move(name), std::move(db));
}
return res;
});
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdjacencyDatabase>>>
OpenrCtrlHandler::semifuture_getDecisionAdjacenciesFiltered(
std::unique_ptr<thrift::AdjacenciesFilter> filter) {
CHECK(decision_);
return decision_->getDecisionAdjacenciesFiltered(std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<
std::map<std::string, std::vector<::openr::thrift::AdjacencyDatabase>>>>
OpenrCtrlHandler::semifuture_getDecisionAreaAdjacenciesFiltered(
std::unique_ptr<thrift::AdjacenciesFilter> filter) {
CHECK(decision_);
return decision_->getDecisionAreaAdjacenciesFiltered(std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<thrift::PrefixDbs>>
OpenrCtrlHandler::semifuture_getDecisionPrefixDbs() {
auto filter = std::make_unique<thrift::ReceivedRouteFilter>();
filter->areaName_ref() = *getSingleAreaOrThrow("getDecisionPrefixDbs");
return semifuture_getReceivedRoutesFiltered(std::move(filter))
.deferValue([](std::unique_ptr<std::vector<thrift::ReceivedRouteDetail>>&&
routes) mutable {
auto res = std::make_unique<thrift::PrefixDbs>();
for (auto const& routeDetail : *routes) {
for (auto const& route : *routeDetail.routes_ref()) {
(*res)[*route.key_ref()->node_ref()].prefixEntries_ref()->push_back(
*route.route_ref());
}
}
for (auto& [name, db] : *res) {
db.thisNodeName_ref() = name;
}
return res;
});
}
//
// KvStore APIs
//
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreKeyVals(
std::unique_ptr<std::vector<std::string>> filterKeys) {
return semifuture_getKvStoreKeyValsArea(
std::move(filterKeys), getSingleAreaOrThrow("getKvStoreKeyVals"));
}
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreKeyValsArea(
std::unique_ptr<std::vector<std::string>> filterKeys,
std::unique_ptr<std::string> area) {
thrift::KeyGetParams params;
*params.keys_ref() = std::move(*filterKeys);
CHECK(kvStore_);
return kvStore_->semifuture_getKvStoreKeyVals(
std::move(*area), std::move(params));
}
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreKeyValsFiltered(
std::unique_ptr<thrift::KeyDumpParams> filter) {
return semifuture_getKvStoreKeyValsFilteredArea(
std::move(filter), getSingleAreaOrThrow("getKvStoreKeyValsFiltered"));
}
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreKeyValsFilteredArea(
std::unique_ptr<thrift::KeyDumpParams> filter,
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_dumpKvStoreKeys(std::move(*filter), {*area})
.deferValue(
[](std::unique_ptr<std::vector<thrift::Publication>>&& pubs) mutable {
thrift::Publication pub = pubs->empty() ? thrift::Publication{}
: std::move(*pubs->begin());
return std::make_unique<thrift::Publication>(std::move(pub));
});
}
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreHashFiltered(
std::unique_ptr<thrift::KeyDumpParams> filter) {
return semifuture_getKvStoreHashFilteredArea(
std::move(filter), getSingleAreaOrThrow("getKvStoreHashFiltered"));
}
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
OpenrCtrlHandler::semifuture_getKvStoreHashFilteredArea(
std::unique_ptr<thrift::KeyDumpParams> filter,
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_dumpKvStoreHashes(
std::move(*area), std::move(*filter));
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setKvStoreKeyVals(
std::unique_ptr<thrift::KeySetParams> setParams,
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_setKvStoreKeyVals(
std::move(*area), std::move(*setParams));
}
folly::SemiFuture<bool>
OpenrCtrlHandler::semifuture_longPollKvStoreAdj(
std::unique_ptr<thrift::KeyVals> snapshot) {
return semifuture_longPollKvStoreAdjArea(
getSingleAreaOrThrow("longPollKvStoreAdj"), std::move(snapshot));
}
folly::SemiFuture<bool>
OpenrCtrlHandler::semifuture_longPollKvStoreAdjArea(
std::unique_ptr<std::string> area,
std::unique_ptr<thrift::KeyVals> snapshot) {
folly::Promise<bool> p;
auto sf = p.getSemiFuture();
auto timeStamp = getUnixTimeStampMs();
auto requestId = pendingRequestId_++;
thrift::KeyDumpParams params;
// build thrift::KeyVals with "adj:" key ONLY
// to ensure KvStore ONLY compare "adj:" key
thrift::KeyVals adjKeyVals;
for (auto& kv : *snapshot) {
if (kv.first.find(Constants::kAdjDbMarker.toString()) == 0) {
adjKeyVals.emplace(kv.first, kv.second);
}
}
// Only care about "adj:" key
*params.prefix_ref() = Constants::kAdjDbMarker;
params.keys_ref() = {Constants::kAdjDbMarker.toString()};
// Only dump difference between KvStore and client snapshot
params.keyValHashes_ref() = std::move(adjKeyVals);
// Explicitly do SYNC call to KvStore
std::unique_ptr<thrift::Publication> thriftPub{nullptr};
try {
thriftPub = semifuture_getKvStoreKeyValsFilteredArea(
std::make_unique<thrift::KeyDumpParams>(params),
std::make_unique<std::string>(*area))
.get();
} catch (std::exception const& ex) {
p.setException(thrift::OpenrError(ex.what()));
return sf;
}
if (thriftPub->keyVals_ref()->size() > 0) {
XLOG(DBG3) << "AdjKey has been added/modified. Notify immediately";
p.setValue(true);
} else if (
thriftPub->tobeUpdatedKeys_ref().has_value() &&
thriftPub->tobeUpdatedKeys_ref().value().size() > 0) {
XLOG(DBG3) << "AdjKey has been deleted/expired. Notify immediately";
p.setValue(true);
} else {
// Client provided data is consistent with KvStore.
// Store req for future processing when there is publication
// from KvStore.
XLOG(DBG3) << "No adj change detected. Store req as pending request";
longPollReqs_.withWLock([&](auto& longPollReq) {
longPollReq[*area].emplace(
requestId, std::make_pair(std::move(p), timeStamp));
});
}
return sf;
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_processKvStoreDualMessage(
std::unique_ptr<thrift::DualMessages> messages,
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_processKvStoreDualMessage(
std::move(*area), std::move(*messages));
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_updateFloodTopologyChild(
std::unique_ptr<thrift::FloodTopoSetParams> params,
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_updateFloodTopologyChild(
std::move(*area), std::move(*params));
}
folly::SemiFuture<std::unique_ptr<thrift::SptInfos>>
OpenrCtrlHandler::semifuture_getSpanningTreeInfos(
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_getSpanningTreeInfos(std::move(*area));
}
folly::SemiFuture<std::unique_ptr<thrift::PeersMap>>
OpenrCtrlHandler::semifuture_getKvStorePeers() {
return semifuture_getKvStorePeersArea(
getSingleAreaOrThrow("getKvStorePeers"));
}
folly::SemiFuture<std::unique_ptr<thrift::PeersMap>>
OpenrCtrlHandler::semifuture_getKvStorePeersArea(
std::unique_ptr<std::string> area) {
CHECK(kvStore_);
return kvStore_->semifuture_getKvStorePeers(std::move(*area));
}
folly::SemiFuture<std::unique_ptr<::std::vector<thrift::KvStoreAreaSummary>>>
OpenrCtrlHandler::semifuture_getKvStoreAreaSummary(
std::unique_ptr<std::set<std::string>> selectAreas) {
CHECK(kvStore_);
return kvStore_->semifuture_getKvStoreAreaSummaryInternal(
std::move(*selectAreas));
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::StreamSubscriberInfo>>>
OpenrCtrlHandler::semifuture_getSubscriberInfo(int64_t type) {
folly::Promise<std::unique_ptr<std::vector<thrift::StreamSubscriberInfo>>>
stream_sessions;
auto sf = stream_sessions.getSemiFuture();
ctrlEvb_->runInEventBaseThread([stream_sessions = std::move(stream_sessions),
type = std::move(type),
this]() mutable {
std::vector<thrift::StreamSubscriberInfo> subscribers;
if (type == 0) {
kvStorePublishers_.withWLock([&](auto& kvStorePublishers_) {
for (auto& [id, publisher] : kvStorePublishers_) {
thrift::StreamSubscriberInfo subscriber;
subscriber.subscriber_id_ref() = id;
auto currentTime = std::chrono::steady_clock::now();
auto duration_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - publisher->subscription_time_);
subscriber.uptime_ref() = duration_time.count();
subscriber.last_msg_sent_time_ref() =
std::chrono::time_point_cast<std::chrono::milliseconds>(
publisher->last_message_time_)
.time_since_epoch()
.count();
subscriber.total_streamed_msgs_ref() = publisher->total_messages_;
subscribers.emplace_back(subscriber);
}
});
} else if (type == 1) {
fibDetailSubscribers_.withWLock([&](auto& fibDetailSubscribers_) {
for (auto& [id, publisher] : fibDetailSubscribers_) {
thrift::StreamSubscriberInfo subscriber;
subscriber.subscriber_id_ref() = id;
auto currentTime = std::chrono::steady_clock::now();
auto duration_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - publisher.upSince);
subscriber.uptime_ref() = duration_time.count();
subscriber.last_msg_sent_time_ref() =
std::chrono::time_point_cast<std::chrono::milliseconds>(
publisher.last_message_time)
.time_since_epoch()
.count();
subscriber.total_streamed_msgs_ref() = publisher.total_messages;
subscribers.emplace_back(subscriber);
}
});
} else {
XLOG(INFO) << "Invalid type input. Specify type as KVstore or FIB";
}
stream_sessions.setValue(
std::make_unique<std::vector<thrift::StreamSubscriberInfo>>(
std::move(subscribers)));
});
return sf;
}
apache::thrift::ServerStream<thrift::Publication>
OpenrCtrlHandler::subscribeKvStoreFilter(
std::unique_ptr<thrift::KeyDumpParams> filter,
std::unique_ptr<std::set<std::string>> selectAreas) {
// Get new client-ID (monotonically increasing)
auto clientToken = publisherToken_++;
auto streamAndPublisher =
apache::thrift::ServerStream<thrift::Publication>::createPublisher(
[this, clientToken]() {
kvStorePublishers_.withWLock([&](auto& kvStorePublishers_) {
if (kvStorePublishers_.erase(clientToken)) {
XLOG(INFO) << "KvStore snoop stream-" << clientToken
<< " ended.";
} else {
XLOG(ERR) << "Can't remove unknown KvStore snoop stream-"
<< clientToken;
}
fb303::fbData->setCounter(
"subscribers.kvstore", kvStorePublishers_.size());
});
});
kvStorePublishers_.withWLock([&](auto& kvStorePublishers_) {
assert(kvStorePublishers_.count(clientToken) == 0);
XLOG(INFO) << "KvStore snoop stream-" << clientToken
<< " started for areas: " << folly::join(", ", *selectAreas);
auto kvStorePublisher = std::make_unique<KvStorePublisher>(
*selectAreas,
std::move(*filter),
std::move(streamAndPublisher.second),
std::chrono::steady_clock::now(),
0);
kvStorePublishers_.emplace(clientToken, std::move(kvStorePublisher));
fb303::fbData->setCounter("subscribers.kvstore", kvStorePublishers_.size());
});
return std::move(streamAndPublisher.first);
}
folly::SemiFuture<apache::thrift::ResponseAndServerStream<
thrift::Publication,
thrift::Publication>>
OpenrCtrlHandler::semifuture_subscribeAndGetKvStore() {
return semifuture_subscribeAndGetKvStoreFiltered(
std::make_unique<thrift::KeyDumpParams>());
}
folly::SemiFuture<apache::thrift::ResponseAndServerStream<
thrift::Publication,
thrift::Publication>>
OpenrCtrlHandler::semifuture_subscribeAndGetKvStoreFiltered(
std::unique_ptr<thrift::KeyDumpParams> dumpParams) {
auto selectAreas = std::make_unique<std::set<std::string>>();
selectAreas->insert(*getSingleAreaOrThrow("subscribeAndGetKvStoreFiltered"));
return semifuture_subscribeAndGetAreaKvStores(
std::move(dumpParams), std::move(selectAreas))
.deferValue([](apache::thrift::ResponseAndServerStream<
std::vector<thrift::Publication>,
thrift::Publication>&& responses) mutable {
thrift::Publication pub = responses.response.empty()
? thrift::Publication{}
: std::move(*responses.response.begin());
return apache::thrift::
ResponseAndServerStream<thrift::Publication, thrift::Publication>{
std::move(pub), std::move(responses.stream)};
});
}
folly::SemiFuture<apache::thrift::ResponseAndServerStream<
std::vector<thrift::Publication>,
thrift::Publication>>
OpenrCtrlHandler::semifuture_subscribeAndGetAreaKvStores(
std::unique_ptr<thrift::KeyDumpParams> dumpParams,
std::unique_ptr<std::set<std::string>> selectAreas) {
auto dumpParamsCopy = std::make_unique<thrift::KeyDumpParams>(*dumpParams);
auto selectAreasCopy = std::make_unique<std::set<std::string>>(*selectAreas);
return kvStore_
->semifuture_dumpKvStoreKeys(
std::move(*dumpParams), std::move(*selectAreas))
.defer([stream = subscribeKvStoreFilter(
std::move(dumpParamsCopy), std::move(selectAreasCopy))](
folly::Try<std::unique_ptr<std::vector<thrift::Publication>>>&&
pubs) mutable {
pubs.throwUnlessValue();
for (auto& pub : *pubs.value()) {
// Set the publication timestamp
pub.timestamp_ms_ref() = getUnixTimeStampMs();
}
return apache::thrift::ResponseAndServerStream<
std::vector<thrift::Publication>,
thrift::Publication>{std::move(*pubs.value()), std::move(stream)};
});
}
apache::thrift::ServerStream<thrift::RouteDatabaseDelta>
OpenrCtrlHandler::subscribeFib() {
// Get new client-ID (monotonically increasing)
auto clientToken = publisherToken_++;
auto streamAndPublisher =
apache::thrift::ServerStream<thrift::RouteDatabaseDelta>::createPublisher(
[this, clientToken]() {
fibPublishers_.withWLock([&clientToken](auto& fibPublishers) {
if (fibPublishers.erase(clientToken)) {
XLOG(INFO) << "Fib snoop stream-" << clientToken << " ended.";
} else {
XLOG(ERR) << "Can't remove unknown Fib snoop stream-"
<< clientToken;
}
fb303::fbData->setCounter(
"subscribers.fib", fibPublishers.size());
});
});
fibPublishers_.withWLock([&clientToken,
&streamAndPublisher](auto& fibPublishers) {
assert(fibPublishers.count(clientToken) == 0);
XLOG(INFO) << "Fib snoop stream-" << clientToken << " started.";
fibPublishers.emplace(clientToken, std::move(streamAndPublisher.second));
fb303::fbData->setCounter("subscribers.fib", fibPublishers.size());
});
return std::move(streamAndPublisher.first);
}
folly::SemiFuture<apache::thrift::ResponseAndServerStream<
thrift::RouteDatabase,
thrift::RouteDatabaseDelta>>
OpenrCtrlHandler::semifuture_subscribeAndGetFib() {
auto stream = subscribeFib();
return semifuture_getRouteDb().defer(
[stream = std::move(stream)](
folly::Try<std::unique_ptr<thrift::RouteDatabase>>&& db) mutable {
db.throwUnlessValue();
return apache::thrift::ResponseAndServerStream<
thrift::RouteDatabase,
thrift::RouteDatabaseDelta>{
std::move(*db.value()), std::move(stream)};
});
}
apache::thrift::ServerStream<thrift::RouteDatabaseDeltaDetail>
OpenrCtrlHandler::subscribeFibDetail() {
// Get new client-ID (monotonically increasing)
auto clientToken = publisherToken_++;
auto streamAndPublisher = apache::thrift::ServerStream<
thrift::RouteDatabaseDeltaDetail>::createPublisher([this, clientToken]() {
fibDetailSubscribers_.withWLock([&clientToken](auto& fibDetailSubscribers) {
if (fibDetailSubscribers.erase(clientToken)) {
XLOG(INFO) << "Fib detail snoop stream-" << clientToken << " ended.";
} else {
XLOG(ERR) << "Can't remove unknown Fib detail snoop stream-"
<< clientToken;
}
fb303::fbData->setCounter(
"subscribers.fibDetail", fibDetailSubscribers.size());
});
});
fibDetailSubscribers_.withWLock(
[&clientToken, &streamAndPublisher](auto& fibDetailSubscribers) {
assert(fibDetailSubscribers.count(clientToken) == 0);
XLOG(INFO) << "Fib detail snoop stream-" << clientToken << " started.";
auto publisher = std::make_unique<apache::thrift::ServerStreamPublisher<
thrift::RouteDatabaseDeltaDetail>>(
std::move(streamAndPublisher.second));
FibStreamSubscriber fib_subscriber(
std::chrono::steady_clock::now(), std::move(publisher));
fibDetailSubscribers.emplace(clientToken, std::move(fib_subscriber));
fb303::fbData->setCounter(
"subscribers.fibDetail", fibDetailSubscribers.size());
});
return std::move(streamAndPublisher.first);
}
folly::SemiFuture<apache::thrift::ResponseAndServerStream<
thrift::RouteDatabaseDetail,
thrift::RouteDatabaseDeltaDetail>>
OpenrCtrlHandler::semifuture_subscribeAndGetFibDetail() {
auto stream = subscribeFibDetail();
return semifuture_getRouteDetailDb().defer(
[stream = std::move(stream)](
folly::Try<std::unique_ptr<thrift::RouteDatabaseDetail>>&&
db) mutable {
db.throwIfFailed();
return apache::thrift::ResponseAndServerStream<
thrift::RouteDatabaseDetail,
thrift::RouteDatabaseDeltaDetail>{
std::move(*db.value()), std::move(stream)};
});
}
//
// LinkMonitor APIs
//
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setNodeOverload() {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setNodeOverload(true);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetNodeOverload() {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setNodeOverload(false);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setInterfaceOverload(
std::unique_ptr<std::string> interfaceName) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setInterfaceOverload(
std::move(*interfaceName), true);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetInterfaceOverload(
std::unique_ptr<std::string> interfaceName) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setInterfaceOverload(
std::move(*interfaceName), false);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setInterfaceMetric(
std::unique_ptr<std::string> interfaceName, int32_t overrideMetric) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setLinkMetric(
std::move(*interfaceName), overrideMetric);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetInterfaceMetric(
std::unique_ptr<std::string> interfaceName) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setLinkMetric(
std::move(*interfaceName), std::nullopt);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setAdjacencyMetric(
std::unique_ptr<std::string> interfaceName,
std::unique_ptr<std::string> adjNodeName,
int32_t overrideMetric) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setAdjacencyMetric(
std::move(*interfaceName), std::move(*adjNodeName), overrideMetric);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetAdjacencyMetric(
std::unique_ptr<std::string> interfaceName,
std::unique_ptr<std::string> adjNodeName) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setAdjacencyMetric(
std::move(*interfaceName), std::move(*adjNodeName), std::nullopt);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setNodeInterfaceMetricIncrement(
int32_t metricIncrementVal) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setNodeInterfaceMetricIncrement(
metricIncrementVal);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetNodeInterfaceMetricIncrement() {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_unsetNodeInterfaceMetricIncrement();
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setInterfaceMetricIncrement(
std::unique_ptr<std::string> interfaceName, int32_t metricIncrementVal) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_setInterfaceMetricIncrement(
std::move(*interfaceName), metricIncrementVal);
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_unsetInterfaceMetricIncrement(
std::unique_ptr<std::string> interfaceName) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_unsetInterfaceMetricIncrement(
std::move(*interfaceName));
}
folly::SemiFuture<std::unique_ptr<thrift::DumpLinksReply>>
OpenrCtrlHandler::semifuture_getInterfaces() {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_getInterfaces();
}
folly::SemiFuture<std::unique_ptr<thrift::AdjacencyDatabase>>
OpenrCtrlHandler::semifuture_getLinkMonitorAdjacencies() {
CHECK(linkMonitor_);
auto filter = std::make_unique<thrift::AdjacenciesFilter>();
filter->selectAreas_ref() = {
*getSingleAreaOrThrow("getLinkMonitorAdjacencies")};
return semifuture_getLinkMonitorAdjacenciesFiltered(std::move(filter))
.deferValue([](std::unique_ptr<std::vector<thrift::AdjacencyDatabase>>&&
dbs) mutable {
thrift::AdjacencyDatabase db;
if (!dbs->empty()) {
db = std::move(*dbs->begin());
}
return std::make_unique<thrift::AdjacencyDatabase>(std::move(db));
});
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::AdjacencyDatabase>>>
OpenrCtrlHandler::semifuture_getLinkMonitorAdjacenciesFiltered(
std::unique_ptr<thrift::AdjacenciesFilter> filter) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_getAdjacencies(std::move(*filter));
}
folly::SemiFuture<std::unique_ptr<
std::map<std::string, std::vector<thrift::AdjacencyDatabase>>>>
OpenrCtrlHandler::semifuture_getLinkMonitorAreaAdjacenciesFiltered(
std::unique_ptr<thrift::AdjacenciesFilter> filter) {
CHECK(linkMonitor_);
return linkMonitor_->semifuture_getAreaAdjacencies(std::move(*filter));
}
//
// ConfigStore API
//
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setConfigKey(
std::unique_ptr<std::string> key, std::unique_ptr<std::string> value) {
CHECK(configStore_);
return configStore_->store(std::move(*key), std::move(*value));
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_eraseConfigKey(std::unique_ptr<std::string> key) {
CHECK(configStore_);
auto sf = configStore_->erase(std::move(*key));
return std::move(sf).defer([](folly::Try<bool>&&) { return folly::Unit(); });
}
folly::SemiFuture<std::unique_ptr<std::string>>
OpenrCtrlHandler::semifuture_getConfigKey(std::unique_ptr<std::string> key) {
CHECK(configStore_);
auto sf = configStore_->load(std::move(*key));
return std::move(sf).defer(
[](folly::Try<std::optional<std::string>>&& val) mutable {
if (val.hasException() or not val->has_value()) {
throw thrift::OpenrError("key doesn't exists");
}
return std::make_unique<std::string>(std::move(val).value().value());
});
}
//
// RibPolicy APIs
//
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setRibPolicy(
std::unique_ptr<thrift::RibPolicy> policy) {
return decision_->setRibPolicy(*policy);
}
folly::SemiFuture<std::unique_ptr<thrift::RibPolicy>>
OpenrCtrlHandler::semifuture_getRibPolicy() {
return decision_->getRibPolicy().deferValue([](thrift::RibPolicy&& policy) {
return std::make_unique<thrift::RibPolicy>(policy);
});
}
folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_clearRibPolicy() {
return decision_->clearRibPolicy();
}
} // namespace openr