openr/fib/Fib.cpp (870 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <fb303/ServiceData.h>
#include <folly/IPAddress.h>
#include <folly/logging/xlog.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include <openr/common/Constants.h>
#include <openr/common/LsdbUtil.h>
#include <openr/common/NetworkUtil.h>
#include <openr/fib/Fib.h>
namespace fb303 = facebook::fb303;
namespace openr {
namespace { // anonymous for local function definitions
void
logFibUpdateError(thrift::PlatformFibUpdateError const& error) {
fb303::fbData->addStatValue(
"fib.thrift.failure.fib_update_error", 1, fb303::COUNT);
XLOG(ERR) << "Partially failed to update/delete following in FIB.";
for (auto& [_, prefixes] : *error.vrf2failedAddUpdatePrefixes_ref()) {
for (auto& prefix : prefixes) {
XLOG(ERR) << " > " << toString(prefix) << " add/update";
}
}
for (auto& [_, prefixes] : *error.vrf2failedDeletePrefixes_ref()) {
for (auto& prefix : prefixes) {
XLOG(ERR) << " > " << toString(prefix) << " delete";
}
}
for (auto& label : *error.failedAddUpdateMplsLabels_ref()) {
XLOG(ERR) << " > " << label << " add/update";
}
for (auto& label : *error.failedDeleteMplsLabels_ref()) {
XLOG(ERR) << " > " << label << " delete";
}
}
} // namespace
Fib::Fib(
std::shared_ptr<const Config> config,
messaging::RQueue<DecisionRouteUpdate> routeUpdatesQueue,
messaging::ReplicateQueue<DecisionRouteUpdate>& fibRouteUpdatesQueue,
messaging::ReplicateQueue<LogSample>& logSampleQueue)
: myNodeName_(*config->getConfig().node_name_ref()),
thriftPort_(*config->getConfig().fib_port_ref()),
dryrun_(config->getConfig().dryrun_ref().value_or(false)),
enableSegmentRouting_(
config->getConfig().enable_segment_routing_ref().value_or(false)),
routeDeleteDelay_(*config->getConfig().route_delete_delay_ms_ref()),
retryRoutesExpBackoff_(
Constants::kFibInitialBackoff, Constants::kFibMaxBackoff, false),
fibRouteUpdatesQueue_(fibRouteUpdatesQueue),
logSampleQueue_(logSampleQueue) {
CHECK_GE(routeDeleteDelay_.count(), 0)
<< "Route delete duration must be >= 0ms";
// On startup we do require routedb_sync so explicitly set the counter to 0
fb303::fbData->setCounter("fib.synced", 0);
//
// Start RetryRoute fiber with stop signal.
//
addFiberTask(
[this]() mutable noexcept { retryRoutesTask(retryRoutesStopSignal_); });
//
// Create KeepAlive task with stop signal. Signalling part consists of two
// - Promise retained in state variable of Fib module. Fiber awaits on it. The
// promise is fulfilled in Fib::stop()
// - SemiFuture is passed to fiber for awaiting
//
addFiberTask(
[this]() mutable noexcept { keepAliveTask(keepAliveStopSignal_); });
// Fiber to process route updates from Decision
addFiberTask([q = std::move(routeUpdatesQueue), this]() mutable noexcept {
while (true) {
auto maybeThriftObj = q.get(); // perform read
if (maybeThriftObj.hasError()) {
XLOG(DBG1) << "Terminating route delta processing fiber";
break;
}
fb303::fbData->addStatValue("fib.process_route_db", 1, fb303::COUNT);
processDecisionRouteUpdate(std::move(maybeThriftObj).value());
}
});
// Initialize stats keys
fb303::fbData->addStatExportType("fib.convergence_time_ms", fb303::AVG);
fb303::fbData->addStatExportType(
"fib.local_route_program_time_ms", fb303::AVG);
fb303::fbData->addStatExportType("fib.num_of_route_updates", fb303::SUM);
fb303::fbData->addStatExportType("fib.process_route_db", fb303::COUNT);
fb303::fbData->addStatExportType("fib.sync_fib_calls", fb303::COUNT);
fb303::fbData->addStatExportType(
"fib.thrift.failure.add_del_route", fb303::COUNT);
fb303::fbData->addStatExportType(
"fib.thrift.failure.keepalive", fb303::COUNT);
fb303::fbData->addStatExportType("fib.thrift.failure.sync_fib", fb303::COUNT);
fb303::fbData->addStatExportType("fib.route_programming.time_ms", fb303::AVG);
}
void
Fib::stop() {
// Send stop signal to internal fibers
keepAliveStopSignal_.post();
retryRoutesStopSignal_.post();
retryRoutesSignal_.signal();
// Invoke stop method of super class
OpenrEventBase::stop();
XLOG(DBG1) << "Stopped FIB event base";
}
std::optional<folly::CIDRNetwork>
Fib::longestPrefixMatch(
const folly::CIDRNetwork& inputPrefix,
const std::unordered_map<folly::CIDRNetwork, RibUnicastEntry>&
unicastRoutes) {
std::optional<folly::CIDRNetwork> matchedPrefix;
int maxMask = -1;
const auto& inputIP = inputPrefix.first;
const auto& inputMask = inputPrefix.second;
// longest prefix matching
for (const auto& route : unicastRoutes) {
const auto& dbIP = route.first.first;
const auto& dbMask = route.first.second;
if (maxMask < dbMask && inputMask >= dbMask &&
inputIP.mask(dbMask) == dbIP) {
maxMask = dbMask;
matchedPrefix = route.first;
}
}
return matchedPrefix;
}
folly::SemiFuture<std::unique_ptr<thrift::RouteDatabase>>
Fib::getRouteDb() {
folly::Promise<std::unique_ptr<thrift::RouteDatabase>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread([p = std::move(p), this]() mutable {
thrift::RouteDatabase routeDb;
routeDb.thisNodeName_ref() = myNodeName_;
for (const auto& route : routeState_.unicastRoutes) {
routeDb.unicastRoutes_ref()->emplace_back(route.second.toThrift());
}
for (const auto& route : routeState_.mplsRoutes) {
routeDb.mplsRoutes_ref()->emplace_back(route.second.toThrift());
}
p.setValue(std::make_unique<thrift::RouteDatabase>(std::move(routeDb)));
});
return sf;
}
folly::SemiFuture<std::unique_ptr<thrift::RouteDatabaseDetail>>
Fib::getRouteDetailDb() {
folly::Promise<std::unique_ptr<thrift::RouteDatabaseDetail>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread([p = std::move(p), this]() mutable {
thrift::RouteDatabaseDetail routeDetailDb;
routeDetailDb.thisNodeName_ref() = myNodeName_;
for (const auto& route : routeState_.unicastRoutes) {
routeDetailDb.unicastRoutes_ref()->emplace_back(
route.second.toThriftDetail());
}
for (const auto& route : routeState_.mplsRoutes) {
routeDetailDb.mplsRoutes_ref()->emplace_back(
route.second.toThriftDetail());
}
p.setValue(std::make_unique<thrift::RouteDatabaseDetail>(
std::move(routeDetailDb)));
});
return sf;
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::UnicastRoute>>>
Fib::getUnicastRoutes(std::vector<std::string> prefixes) {
folly::Promise<std::unique_ptr<std::vector<thrift::UnicastRoute>>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread(
[p = std::move(p), prefixes = std::move(prefixes), this]() mutable {
p.setValue(std::make_unique<std::vector<thrift::UnicastRoute>>(
getUnicastRoutesFiltered(std::move(prefixes))));
});
return sf;
}
folly::SemiFuture<std::unique_ptr<std::vector<thrift::MplsRoute>>>
Fib::getMplsRoutes(std::vector<int32_t> labels) {
folly::Promise<std::unique_ptr<std::vector<thrift::MplsRoute>>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread(
[p = std::move(p), labels = std::move(labels), this]() mutable {
p.setValue(std::make_unique<std::vector<thrift::MplsRoute>>(
getMplsRoutesFiltered(std::move(labels))));
});
return sf;
}
folly::SemiFuture<std::unique_ptr<thrift::PerfDatabase>>
Fib::getPerfDb() {
folly::Promise<std::unique_ptr<thrift::PerfDatabase>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread([p = std::move(p), this]() mutable {
p.setValue(std::make_unique<thrift::PerfDatabase>(dumpPerfDb()));
});
return sf;
}
std::vector<thrift::UnicastRoute>
Fib::getUnicastRoutesFiltered(std::vector<std::string> prefixes) {
// return and send the vector<thrift::UnicastRoute>
std::vector<thrift::UnicastRoute> retRouteVec;
// the matched prefix after longest prefix matching and avoid duplicates
std::set<folly::CIDRNetwork> matchPrefixSet;
// if the params is empty, return all routes
if (prefixes.empty()) {
for (const auto& routes : routeState_.unicastRoutes) {
retRouteVec.emplace_back(routes.second.toThrift());
}
return retRouteVec;
}
// longest prefix matching for each input string
for (const auto& prefixStr : prefixes) {
// try to convert the string prefix into CIDRNetwork
const auto maybePrefix =
folly::IPAddress::tryCreateNetwork(prefixStr, -1, true);
if (maybePrefix.hasError()) {
XLOG(ERR) << "Invalid IP address as prefix: " << prefixStr;
return retRouteVec;
}
const auto inputPrefix = maybePrefix.value();
// do longest prefix match, add the matched prefix to the result set
const auto& matchedPrefix =
Fib::longestPrefixMatch(inputPrefix, routeState_.unicastRoutes);
if (matchedPrefix.has_value()) {
matchPrefixSet.insert(matchedPrefix.value());
}
}
// get the routes from the prefix set
for (const auto& prefix : matchPrefixSet) {
retRouteVec.emplace_back(routeState_.unicastRoutes.at(prefix).toThrift());
}
return retRouteVec;
}
std::vector<thrift::MplsRoute>
Fib::getMplsRoutesFiltered(std::vector<int32_t> labels) {
// return and send the vector<thrift::MplsRoute>
std::vector<thrift::MplsRoute> retRouteVec;
// if the params is empty, return all MPLS routes
if (labels.empty()) {
for (const auto& routes : routeState_.mplsRoutes) {
retRouteVec.emplace_back(routes.second.toThrift());
}
return retRouteVec;
}
// get the params: list of MPLS label filters -> set of MPLS label filters
std::set<int32_t> labelFilterSet;
for (const auto& label : labels) {
labelFilterSet.insert(label);
}
// get the filtered MPLS routes and avoid duplicates
for (const auto& routes : routeState_.mplsRoutes) {
if (labelFilterSet.find(routes.first) != labelFilterSet.end()) {
retRouteVec.emplace_back(routes.second.toThrift());
}
}
return retRouteVec;
}
messaging::RQueue<DecisionRouteUpdate>
Fib::getFibUpdatesReader() {
return fibRouteUpdatesQueue_.getReader();
}
void
Fib::RouteState::update(const DecisionRouteUpdate& routeUpdate) {
// Add/Update unicast routes to update
for (const auto& [prefix, route] : routeUpdate.unicastRoutesToUpdate) {
unicastRoutes.insert_or_assign(prefix, route);
}
// Add mpls routes to update
for (const auto& [label, route] : routeUpdate.mplsRoutesToUpdate) {
mplsRoutes.insert_or_assign(label, route);
}
// Delete unicast routes
for (const auto& dest : routeUpdate.unicastRoutesToDelete) {
unicastRoutes.erase(dest);
}
// Delete mpls routes
for (const auto& topLabel : routeUpdate.mplsRoutesToDelete) {
mplsRoutes.erase(topLabel);
}
}
DecisionRouteUpdate
Fib::RouteState::createUpdate() {
DecisionRouteUpdate update;
//
// Case - First Sync
// Return all updates
//
if (state == SYNCING and not isInitialSynced) {
update.type = DecisionRouteUpdate::FULL_SYNC;
update.unicastRoutesToUpdate = unicastRoutes;
update.mplsRoutesToUpdate = mplsRoutes;
return update;
}
//
// Case - Subsequent Sync or re-programming of failed routes
// Return updates based on dirty state
//
update.type = DecisionRouteUpdate::INCREMENTAL;
auto const currentTime = std::chrono::steady_clock::now();
// Populate unicast routes to add, update, or delete
for (auto itrPrefixes = dirtyPrefixes.begin();
itrPrefixes != dirtyPrefixes.end();) {
if (currentTime < itrPrefixes->second) {
++itrPrefixes;
continue; // Route is not yet ready for retry
}
auto iter = unicastRoutes.find(itrPrefixes->first);
if (iter == unicastRoutes.end()) { // Delete
update.unicastRoutesToDelete.emplace_back(itrPrefixes->first);
} else { // Add or Update
update.unicastRoutesToUpdate.emplace(itrPrefixes->first, iter->second);
}
// remove as we are creating a new update to program
itrPrefixes = dirtyPrefixes.erase(itrPrefixes);
}
// Populate mpls routes to add, update, or delete
for (auto itrLabel = dirtyLabels.begin(); itrLabel != dirtyLabels.end();) {
if (currentTime < itrLabel->second) {
++itrLabel;
continue; // Route is not yet ready for retry
}
auto it = mplsRoutes.find(itrLabel->first);
if (it == mplsRoutes.end()) { // Delete
update.mplsRoutesToDelete.emplace_back(itrLabel->first);
} else { // Add or Update
update.mplsRoutesToUpdate.emplace(itrLabel->first, it->second);
}
// remove as we are creating a new update to program
itrLabel = dirtyLabels.erase(itrLabel);
}
return update;
}
// Computes the minimum timestamp among unicast and mpls routes w.r.t
// current timestamp. In case there is a delete event which expired in the
// past, retry timer is scheduled immediately.
std::chrono::milliseconds
Fib::nextRetryDuration() const {
// Schedule retry timer immediately if this is initial Fib sync, or delayed
// deletion is not enabled, or if there is no pending (dirty) routes for
// processing.
if ((routeState_.state == RouteState::SYNCING) or
(routeState_.dirtyPrefixes.empty() and routeState_.dirtyLabels.empty())) {
// Return backoff duration if any
return retryRoutesExpBackoff_.getTimeRemainingUntilRetry();
}
auto const currTime = std::chrono::steady_clock::now();
auto nextRetryTime =
std::chrono::time_point<std::chrono::steady_clock>::max();
for (auto& [prefix, addDeleteTime] : routeState_.dirtyPrefixes) {
nextRetryTime = std::min(addDeleteTime, nextRetryTime);
}
for (auto& [label, addDeleteTime] : routeState_.dirtyLabels) {
nextRetryTime = std::min(addDeleteTime, nextRetryTime);
}
return std::chrono::ceil<std::chrono::milliseconds>(
std::max(nextRetryTime, currTime) - currTime);
}
void
Fib::RouteState::processFibUpdateError(
thrift::PlatformFibUpdateError const& fibError,
std::chrono::time_point<std::chrono::steady_clock> retryAt) {
// Mark prefixes as dirty. All newly failed unicast routes are added into
// dirtyPrefixes map. We can distinguish between add/update and delete updates
// in createUpdate().
for (auto& [_, prefixes] : *fibError.vrf2failedAddUpdatePrefixes_ref()) {
for (auto& prefix : prefixes) {
dirtyPrefixes.insert_or_assign(toIPNetwork(prefix), retryAt);
}
}
for (auto& [_, prefixes] : *fibError.vrf2failedDeletePrefixes_ref()) {
for (auto& prefix : prefixes) {
dirtyPrefixes.insert_or_assign(toIPNetwork(prefix), retryAt);
}
}
// Mark labels as dirty. All newly failed unicast routes are added into
// dirtyPrefixes map. We can distinguish between add/update and delete updates
// in createUpdate().
for (auto& label : *fibError.failedAddUpdateMplsLabels_ref()) {
dirtyLabels.insert_or_assign(label, retryAt);
}
for (auto& label : *fibError.failedDeleteMplsLabels_ref()) {
dirtyLabels.insert_or_assign(label, retryAt);
}
}
// Process new route updates received from Decision module.
void
Fib::processDecisionRouteUpdate(DecisionRouteUpdate&& routeUpdate) {
// Process state transition event
transitionRouteState(RouteState::RIB_UPDATE);
// Update perfEvents_ .. We replace existing perf events with new one as
// convergence is going to be based on new data, not the old.
if (routeUpdate.perfEvents.has_value()) {
addPerfEvent(
routeUpdate.perfEvents.value(), myNodeName_, "FIB_ROUTE_DB_RECVD");
}
// Before anything, get rid of doNotInstall routes
auto iter = routeUpdate.unicastRoutesToUpdate.cbegin();
while (iter != routeUpdate.unicastRoutesToUpdate.cend()) {
if (iter->second.doNotInstall) {
XLOG(INFO) << "Not installing route for prefix "
<< folly::IPAddress::networkToString(iter->first);
iter = routeUpdate.unicastRoutesToUpdate.erase(iter);
} else {
++iter;
}
}
// Filter MPLS next-hops to unique action
for (auto& [_, mplsRoute] : routeUpdate.mplsRoutesToUpdate) {
mplsRoute.filterNexthopsToUniqueAction();
}
updateRoutes(std::move(routeUpdate));
if (routeState_.needsRetry()) {
// Trigger initial Fib sync, or schedule retry routes timer if needed.
retryRoutesSignal_.signal();
}
}
thrift::PerfDatabase
Fib::dumpPerfDb() const {
thrift::PerfDatabase perfDb;
*perfDb.thisNodeName_ref() = myNodeName_;
for (auto const& perf : perfDb_) {
perfDb.eventInfo_ref()->emplace_back(perf);
}
return perfDb;
}
void
Fib::printUnicastRoutesAddUpdate(
const std::vector<thrift::UnicastRoute>& unicastRoutesToUpdate) {
if (not unicastRoutesToUpdate.size()) {
return;
}
for (auto const& route : unicastRoutesToUpdate) {
XLOG(DBG1) << "> " << toString(*route.dest_ref())
<< ", NextHopsCount = " << route.nextHops_ref()->size();
for (auto const& nh : *route.nextHops_ref()) {
XLOG(DBG1) << " " << toString(nh);
}
}
}
void
Fib::printMplsRoutesAddUpdate(
const std::vector<thrift::MplsRoute>& mplsRoutesToUpdate) {
if (not mplsRoutesToUpdate.size()) {
return;
}
for (auto const& route : mplsRoutesToUpdate) {
XLOG(DBG1) << "> " << std::to_string(*route.topLabel_ref()) << ", "
<< " NextHopsCount = " << route.nextHops_ref()->size();
for (auto const& nh : *route.nextHops_ref()) {
XLOG(DBG1) << " " << toString(nh);
}
}
}
bool
Fib::updateUnicastRoutes(
const bool useDeleteDelay,
const std::chrono::time_point<std::chrono::steady_clock>& currentTime,
const std::chrono::time_point<std::chrono::steady_clock>& retryAt,
DecisionRouteUpdate& routeUpdate,
thrift::RouteDatabaseDelta& routeDbDelta) {
bool success{true};
//
// Delete Unicast routes
//
auto& unicastRoutesToDelete = *routeDbDelta.unicastRoutesToDelete_ref();
if (delayedDeletionEnabled() and useDeleteDelay) {
// Clear the routes to delete
unicastRoutesToDelete.clear();
// Mark dirty state here & set
for (auto& prefix : routeUpdate.unicastRoutesToDelete) {
const auto [itr, _] = routeState_.dirtyPrefixes.insert_or_assign(
prefix, currentTime + routeDeleteDelay_);
XLOG(INFO) << "Will delete unicast route "
<< folly::IPAddress::networkToString(prefix) << " after "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
itr->second - currentTime)
.count()
<< "ms";
}
}
if (unicastRoutesToDelete.size()) {
XLOG(INFO) << "Deleting " << unicastRoutesToDelete.size()
<< " unicast routes in FIB";
for (auto const& prefix : unicastRoutesToDelete) {
XLOG(DBG1) << "> " << toString(prefix);
}
if (dryrun_) {
XLOG(INFO) << "Skipping deletion of unicast routes in dryrun ... ";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_deleteUnicastRoutes(kFibId_, unicastRoutesToDelete);
} catch (std::exception& e) {
success = false;
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.add_del_route", 1, fb303::COUNT);
XLOG(ERR) << "Failed to delete unicast routes from FIB. Error: "
<< folly::exceptionStr(e);
// Marked all routes to be deleted as dirty. So we try to remove them
// again from FIB.
for (const auto& prefix : routeUpdate.unicastRoutesToDelete) {
routeState_.dirtyPrefixes.insert_or_assign(prefix, retryAt);
}
// NOTE: We still want to advertise these prefixes as deleted
}
}
}
//
// Update Unicast routes
//
auto const& unicastRoutesToUpdate = *routeDbDelta.unicastRoutesToUpdate_ref();
if (unicastRoutesToUpdate.size()) {
XLOG(INFO) << "Adding/Updating " << unicastRoutesToUpdate.size()
<< " unicast routes in FIB";
printUnicastRoutesAddUpdate(unicastRoutesToUpdate);
if (dryrun_) {
XLOG(INFO) << "Skipping add/update of unicast routes in dryrun ... ";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_addUnicastRoutes(kFibId_, unicastRoutesToUpdate);
} catch (thrift::PlatformFibUpdateError const& fibUpdateError) {
success = false;
logFibUpdateError(fibUpdateError);
// Remove failed routes from fibRouteUpdates
routeUpdate.processFibUpdateError(fibUpdateError);
// Mark failed routes as dirty in route state
routeState_.processFibUpdateError(fibUpdateError, retryAt);
} catch (std::exception const& e) {
success = false;
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.add_del_route", 1, fb303::COUNT);
XLOG(ERR) << "Failed to add/update unicast routes in FIB. Error: "
<< folly::exceptionStr(e);
// Mark routes we failed to update as dirty for retry. Also declare
// these routes as deleted to client, because we failed to update them
// Next retry should restore, but meanwhile clients can take appropriate
// action because FIB state is unclear e.g. withdraw route from KvStore
for (auto& [prefix, _] : routeUpdate.unicastRoutesToUpdate) {
routeState_.dirtyPrefixes.insert_or_assign(prefix, retryAt);
routeUpdate.unicastRoutesToDelete.emplace_back(prefix);
}
// We don't want to advertise failed route add/updates
routeUpdate.unicastRoutesToUpdate.clear();
}
}
}
return success;
}
bool
Fib::updateMplsRoutes(
const bool useDeleteDelay,
const std::chrono::time_point<std::chrono::steady_clock>& currentTime,
const std::chrono::time_point<std::chrono::steady_clock>& retryAt,
DecisionRouteUpdate& routeUpdate,
thrift::RouteDatabaseDelta& routeDbDelta) {
bool success{true};
//
// Delete Mpls routes
//
auto& mplsRoutesToDelete = *routeDbDelta.mplsRoutesToDelete_ref();
if (useDeleteDelay and delayedDeletionEnabled()) {
// Clear the routes to delete
mplsRoutesToDelete.clear();
// Mark dirty state here & set
for (auto& mplsRoute : routeUpdate.mplsRoutesToDelete) {
const auto [itr, _] = routeState_.dirtyLabels.insert_or_assign(
mplsRoute, currentTime + routeDeleteDelay_);
XLOG(INFO) << "Will delete mpls route " << mplsRoute << " after "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
itr->second - currentTime)
.count()
<< "ms";
}
}
if (mplsRoutesToDelete.size()) {
XLOG(INFO) << "Deleting " << mplsRoutesToDelete.size()
<< " mpls routes in FIB";
for (auto const& topLabel : mplsRoutesToDelete) {
XLOG(DBG1) << "> " << std::to_string(topLabel);
}
if (dryrun_) {
XLOG(INFO) << "Skipping deletion of mpls routes in dryrun ... ";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_deleteMplsRoutes(kFibId_, mplsRoutesToDelete);
} catch (std::exception const& e) {
success = false;
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.add_del_route", 1, fb303::COUNT);
XLOG(ERR) << "Failed to delete mpls routes from FIB. Error: "
<< folly::exceptionStr(e);
// Marked all routes to be deleted as dirty. So we try to remove them
// again from FIB.
for (const auto& label : routeUpdate.mplsRoutesToDelete) {
routeState_.dirtyLabels.insert_or_assign(label, retryAt);
}
// NOTE: We still want to advertise these labels as deleted
}
}
}
//
// Update Mpls routes
//
auto const& mplsRoutesToUpdate = *routeDbDelta.mplsRoutesToUpdate_ref();
if (mplsRoutesToUpdate.size()) {
XLOG(INFO) << "Adding/Updating " << mplsRoutesToUpdate.size()
<< " mpls routes in FIB";
printMplsRoutesAddUpdate(mplsRoutesToUpdate);
if (dryrun_) {
XLOG(INFO) << "Skipping add/update of mpls routes in dryrun ... ";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_addMplsRoutes(kFibId_, mplsRoutesToUpdate);
} catch (thrift::PlatformFibUpdateError const& fibUpdateError) {
success = false;
logFibUpdateError(fibUpdateError);
// Remove failed routes from fibRouteUpdates
routeUpdate.processFibUpdateError(fibUpdateError);
// Mark failed routes as dirty in route state
routeState_.processFibUpdateError(fibUpdateError, retryAt);
} catch (std::exception const& e) {
success = false;
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.add_del_route", 1, fb303::COUNT);
XLOG(ERR) << "Failed to add/update mpls routes in FIB. Error: "
<< folly::exceptionStr(e);
// Mark routes we failed to update as dirty for retry. Also declare
// these routes as deleted to client, because we failed to update them
// Next retry should restore, but meanwhile clients can take
// appropriate action because FIB state is unclear e.g. withdraw route
// from KvStore
for (auto& [label, _] : routeUpdate.mplsRoutesToUpdate) {
routeState_.dirtyLabels.insert_or_assign(label, retryAt);
routeUpdate.mplsRoutesToDelete.emplace_back(label);
}
// We don't want to advertise failed route add/updates
routeUpdate.mplsRoutesToUpdate.clear();
}
}
}
return success;
}
bool
Fib::updateRoutes(DecisionRouteUpdate&& routeUpdate, bool useDeleteDelay) {
SCOPE_EXIT {
updateRoutesSemaphore_.signal(); // Release when this function returns
};
updateRoutesSemaphore_.wait();
// Return if empty
if (routeUpdate.empty()) {
XLOG(INFO) << "No entries in route update";
return true;
}
// Backup routes in routeState_. In case update routes failed, routes will be
// programmed in later scheduled FIB sync.
routeState_.update(routeUpdate);
// Update flat counters here as they depend on routeState_ and its change
updateGlobalCounters();
// Skip route programming if we're in the SYNCING state. We only perform
// incremental route programming in AWAITING or SYNCED state. In SYNCING
// state we let `syncRoutes` do the work instead.
if (routeState_.state == RouteState::SYNCING) {
XLOG(INFO) << "Skip route programming in SYNCING state";
return true;
}
XLOG(INFO) << "Updating routes in FIB";
auto const currentTime = std::chrono::steady_clock::now();
auto const retryAt =
currentTime + retryRoutesExpBackoff_.getTimeRemainingUntilRetry();
bool success{true};
// Convert DecisionRouteUpdate to RouteDatabaseDelta to use UnicastRoute
// and MplsRoute with the FibService client APIs
auto routeDbDelta = routeUpdate.toThrift();
success &= updateUnicastRoutes(
useDeleteDelay, currentTime, retryAt, routeUpdate, routeDbDelta);
if (enableSegmentRouting_) {
success &= updateMplsRoutes(
useDeleteDelay, currentTime, retryAt, routeUpdate, routeDbDelta);
}
// Log statistics
const auto elapsedTime = std::chrono::ceil<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - currentTime);
XLOG(INFO) << fmt::format(
"It took {} ms to update routes in FIB", elapsedTime.count());
fb303::fbData->addStatValue(
"fib.route_programming.time_ms", elapsedTime.count(), fb303::AVG);
fb303::fbData->addStatValue(
"fib.num_of_route_updates", routeUpdate.size(), fb303::SUM);
// Publish the route update. Clear MPLS routes if segment routing is disabled
routeUpdate.type = DecisionRouteUpdate::INCREMENTAL;
if (not enableSegmentRouting_) {
routeUpdate.mplsRoutesToUpdate.clear();
routeUpdate.mplsRoutesToDelete.clear();
}
fibRouteUpdatesQueue_.push(std::move(routeUpdate));
return success;
}
bool
Fib::syncRoutes() {
SCOPE_EXIT {
updateRoutesSemaphore_.signal(); // Release when this function returns
};
updateRoutesSemaphore_.wait();
// Create set of routes to sync in thrift format
const auto& unicastRoutes =
createUnicastRoutesFromMap(routeState_.unicastRoutes);
const auto& mplsRoutes = createMplsRoutesFromMap(routeState_.mplsRoutes);
const auto currentTime = std::chrono::steady_clock::now();
const auto retryAt =
currentTime + retryRoutesExpBackoff_.getTimeRemainingUntilRetry();
fb303::fbData->addStatValue("fib.sync_fib_calls", 1, fb303::COUNT);
// Create DecisionRouteUpdate that'll be published after successful sync. On
// partial failures we remove routes from this update.
auto fibRouteUpdates = routeState_.createUpdate();
// update flat counters here as they depend on routeState_ and its change
updateGlobalCounters();
//
// Sync Unicast routes
//
XLOG(INFO) << "Syncing " << unicastRoutes.size() << " unicast routes in FIB";
printUnicastRoutesAddUpdate(unicastRoutes);
if (dryrun_) {
XLOG(INFO) << "Skipping programming of unicast routes in dryrun ... ";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_syncFib(kFibId_, unicastRoutes);
} catch (thrift::PlatformFibUpdateError const& fibUpdateError) {
logFibUpdateError(fibUpdateError);
// Remove failed routes from fibRouteUpdates
fibRouteUpdates.processFibUpdateError(fibUpdateError);
// Mark failed routes as dirty in route state
routeState_.processFibUpdateError(fibUpdateError, retryAt);
} catch (std::exception const& e) {
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.sync_fib", 1, fb303::COUNT);
XLOG(ERR) << "Failed to sync unicast routes in FIB. Error: "
<< folly::exceptionStr(e);
return false;
}
}
//
// Sync Mpls routes
//
if (enableSegmentRouting_) {
XLOG(INFO) << "Syncing " << mplsRoutes.size() << " mpls routes in FIB";
printMplsRoutesAddUpdate(mplsRoutes);
if (dryrun_) {
XLOG(INFO) << "Skipping programming of mpls routes in dryrun ...";
} else {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
client_->sync_syncMplsFib(kFibId_, mplsRoutes);
} catch (thrift::PlatformFibUpdateError const& fibUpdateError) {
logFibUpdateError(fibUpdateError);
// Remove failed routes from fibRouteUpdates
fibRouteUpdates.processFibUpdateError(fibUpdateError);
// Mark failed routes as dirty in route state
routeState_.processFibUpdateError(fibUpdateError, retryAt);
} catch (std::exception const& e) {
client_.reset();
fb303::fbData->addStatValue(
"fib.thrift.failure.sync_fib", 1, fb303::COUNT);
XLOG(ERR) << "Failed to sync unicast routes in FIB. Error: "
<< folly::exceptionStr(e);
return false;
}
} // else
} // if enableSegmentRouting_
// Some statistics
// NOTE: We set counter for sync time as it is one time event. We report the
// value of last sync duration
const auto elapsedTime = std::chrono::ceil<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - currentTime);
XLOG(INFO) << "It took " << elapsedTime.count() << "ms to sync routes in FIB";
fb303::fbData->setCounter("fib.route_sync.time_ms", elapsedTime.count());
// Publish route update. We'll do so only if sync is successful for both MPLS
// and Unicast routes.
// NOTE: even empty Fib sync will be published to fibRouteUpdatesQueue_.
if (not enableSegmentRouting_) {
fibRouteUpdates.mplsRoutesToUpdate.clear();
fibRouteUpdates.mplsRoutesToDelete.clear();
}
fibRouteUpdatesQueue_.push(std::move(fibRouteUpdates));
// Transition state on successful sync. Also record our first sync
transitionRouteState(RouteState::FIB_SYNCED);
if (not routeState_.isInitialSynced) {
routeState_.isInitialSynced = true;
logInitializationEvent("Fib", thrift::InitializationEvent::FIB_SYNCED);
}
return true;
}
void
Fib::retryRoutesTask(folly::fibers::Baton& stopSignal) noexcept {
XLOG(INFO) << "Starting RetryRoutes fiber task";
auto timeout = folly::AsyncTimeout::make(
*getEvb(), [this]() noexcept { retryRoutesSignal_.signal(); });
// Repeat in loop
while (not stopSignal.ready()) {
// Wait for signal & retry routes
retryRoutesSignal_.wait();
retryRoutes();
// Add async sleep signal for next invocation. Add only if non zero wait
if (routeState_.needsRetry()) {
auto retryDuration = nextRetryDuration();
XLOG(INFO) << "Scheduling timer after " << retryDuration.count() << "ms";
timeout->scheduleTimeout(retryDuration);
}
} // while
XLOG(INFO) << "RetryRoutes fiber task got stopped";
}
void
Fib::retryRoutes() noexcept {
bool success{false};
retryRoutesExpBackoff_.reportError(); // We increase backoff on every retry
XLOG(INFO) << "Increasing backoff "
<< retryRoutesExpBackoff_.getCurrentBackoff().count() << "ms";
if (routeState_.state == RouteState::SYNCING) {
// SYNC routes if we've RIB snapshot from Decision
success |= syncRoutes();
} else {
// We retry incremental update of routes based on dirty state in AWAITING
// & SYNCED states
auto routeUpdate = routeState_.createUpdate();
if (routeUpdate.empty()) {
XLOG(INFO) << "Returning because of empty updates";
return; // Do not process further as this incurred no change
}
XLOG(INFO) << "Retry programming of dirty route entries";
success |= updateRoutes(std::move(routeUpdate), false /* useDeleteDelay */);
}
// Clear backoff if programming is successful
if (success) {
XLOG(INFO) << "Clearing backoff";
retryRoutesExpBackoff_.reportSuccess();
}
// Set sync state
fb303::fbData->setCounter(
"fib.synced", routeState_.state == RouteState::SYNCED);
}
void
Fib::keepAliveTask(folly::fibers::Baton& stopSignal) noexcept {
XLOG(INFO) << "Starting KeepAlive fiber task";
while (true) { // Break when stop signal is ready
keepAlive();
// Wait for a second. Will terminate if wait completes or signal is ready
if (stopSignal.try_wait_for(Constants::kKeepAliveCheckInterval)) {
break; // Baton was posted
} else {
stopSignal.reset(); // Baton experienced timeout
}
} // while
XLOG(INFO) << "KeepAlive fiber task got stopped";
}
void
Fib::keepAlive() noexcept {
int64_t aliveSince{0};
if (not dryrun_) {
try {
createFibClient(*getEvb(), socket_, client_, thriftPort_);
aliveSince = client_->sync_aliveSince();
} catch (const std::exception& e) {
fb303::fbData->addStatValue(
"fib.thrift.failure.keepalive", 1, fb303::COUNT);
client_.reset();
XLOG(ERR) << "Failed to make thrift call to Switch Agent. Error: "
<< folly::exceptionStr(e);
}
}
// Check if switch agent has restarted or not. Applicable only if we have
// initialized alive-since
if (latestAliveSince_ != 0 && aliveSince != latestAliveSince_) {
XLOG(WARNING) << "FibAgent seems to have restarted. "
<< "Performing full route DB sync ...";
// FibAgent has restarted. Enforce full sync
transitionRouteState(RouteState::FIB_CONNECTED);
retryRoutesExpBackoff_.reportSuccess();
retryRoutesSignal_.signal();
}
latestAliveSince_ = aliveSince;
}
void
Fib::createFibClient(
folly::EventBase& evb,
folly::AsyncSocket*& socket,
std::unique_ptr<thrift::FibServiceAsyncClient>& client,
int32_t port) {
if (!client) {
socket = nullptr;
}
// Reset client if channel is not good
if (socket && (!socket->good() || socket->hangup())) {
client.reset();
socket = nullptr;
}
// Do not create new client if one exists already
if (client) {
return;
}
// Create socket to thrift server and set some connection parameters
auto newSocket = folly::AsyncSocket::newSocket(
&evb,
Constants::kPlatformHost.toString(),
port,
Constants::kPlatformConnTimeout.count());
socket = newSocket.get();
// Create channel and set timeout
auto channel = apache::thrift::HeaderClientChannel::newChannel(
std::move(newSocket),
apache::thrift::HeaderClientChannel::Options()
.setClientType(THRIFT_FRAMED_DEPRECATED)
.setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL));
channel->setTimeout(Constants::kPlatformRoutesProcTimeout.count());
// Reset client_
client = std::make_unique<thrift::FibServiceAsyncClient>(std::move(channel));
}
void
Fib::updateGlobalCounters() {
if (routeState_.state == RouteState::AWAITING) {
return;
}
// Set some flat counters
fb303::fbData->setCounter(
"fib.num_routes",
routeState_.unicastRoutes.size() + routeState_.mplsRoutes.size());
fb303::fbData->setCounter(
"fib.num_unicast_routes", routeState_.unicastRoutes.size());
fb303::fbData->setCounter(
"fib.num_mpls_routes", routeState_.mplsRoutes.size());
}
void
Fib::logPerfEvents(std::optional<thrift::PerfEvents>& perfEvents) {
if (not perfEvents.has_value() or not perfEvents->events_ref()->size()) {
return;
}
// Ignore bad perf event sample if creation time of first event is
// less than creation time of our recently logged perf events.
if (recentPerfEventCreateTs_ >=
*perfEvents->events_ref()->at(0).unixTs_ref()) {
XLOG(WARNING) << "Ignoring perf event with old create timestamp "
<< *perfEvents->events_ref()[0].unixTs_ref()
<< ", expected > " << recentPerfEventCreateTs_;
return;
} else {
recentPerfEventCreateTs_ = *perfEvents->events_ref()->at(0).unixTs_ref();
}
// Add latest event information (this function is meant to be called after
// routeDb has synced)
addPerfEvent(*perfEvents, myNodeName_, "OPENR_FIB_ROUTES_PROGRAMMED");
// Ignore perf events with very off total duration
auto totalDuration = getTotalPerfEventsDuration(*perfEvents);
if (totalDuration.count() < 0 or
totalDuration > Constants::kConvergenceMaxDuration) {
XLOG(WARNING) << "Ignoring perf event with bad total duration "
<< totalDuration.count() << "ms.";
return;
}
// Log event
auto eventStrs = sprintPerfEvents(*perfEvents);
XLOG(INFO) << "OpenR convergence performance. "
<< "Duration=" << totalDuration.count();
for (auto& str : eventStrs) {
XLOG(DBG2) << " " << str;
}
// Add new entry to perf DB and purge extra entries
perfDb_.push_back(std::move(perfEvents).value());
while (perfDb_.size() >= Constants::kPerfBufferSize) {
perfDb_.pop_front();
}
// Export convergence duration counter
fb303::fbData->addStatValue(
"fib.convergence_time_ms", totalDuration.count(), fb303::AVG);
// Add event logs
LogSample sample{};
sample.addString("event", "ROUTE_CONVERGENCE");
sample.addStringVector("perf_events", eventStrs);
sample.addInt("duration_ms", totalDuration.count());
logSampleQueue_.push(sample);
}
std::string
Fib::RouteState::toStr(RouteState::State state) {
switch (state) {
case State::AWAITING:
return "AWAITING";
case State::SYNCING:
return "SYNCING";
case State::SYNCED:
return "SYNCED";
default:
return "UNKNOWN";
}
}
void
Fib::transitionRouteState(const RouteState::Event event) {
// Static matrix representing state transition. Here we handle all events
// across all states. First index represent the current state, second level
// index represents the event. Value represents the new state.
static const std::array<std::array<std::optional<RouteState::State>, 4>, 3>
stateMap = {
{{
/**
* Index-0, State=AWAITING
*/
RouteState::SYNCING, // on Event=RIB_UPDATE
RouteState::AWAITING, // on Event=FIB_CONNECTED,
std::nullopt // on Event=FIB_SYNCED
},
{
/**
* Index-1, State=SYNCING
*/
RouteState::SYNCING, // on Event=RIB_UPDATE
RouteState::SYNCING, // on Event=FIB_CONNECTED,
RouteState::SYNCED // on Event=FIB_SYNCED
},
{
/**
* Index-2, State=SYNCED
*/
RouteState::SYNCED, // on Event=RIB_UPDATE
RouteState::SYNCING, // on Event=FIB_CONNECTED,
std::nullopt // on Event=FIB_SYNCED
}}};
const auto prevState = routeState_.state;
const auto nextState = stateMap.at(prevState).at(event);
CHECK(nextState.has_value()) << "Next state is 'UNDEFINED'";
if (prevState != nextState) {
XLOG(INFO) << "Route state transitions from "
<< routeState_.toStr(prevState) << " to "
<< routeState_.toStr(nextState.value());
}
// Update current state
routeState_.state = nextState.value();
// NOTE: Special processing
// Clear all existing routes if we transition from AWAITING -> SYNCING
// First RIB update is a SYNC and should be treated as source of truth. Any
// previously installed static route should be ignored.
if (prevState == RouteState::AWAITING && nextState == RouteState::SYNCING) {
routeState_.unicastRoutes.clear();
routeState_.mplsRoutes.clear();
}
}
} // namespace openr