fboss/agent/LacpMachines.cpp (738 lines of code) (raw):

/* * Copyright (c) 2004-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * */ #include "fboss/agent/LacpMachines.h" #include "fboss/agent/LacpController.h" #include "fboss/agent/LinkAggregationManager.h" #include "fboss/lib/AlertLogger.h" #include <folly/Conv.h> #include <folly/ExceptionString.h> #include <folly/io/async/EventBase.h> #include <folly/logging/xlog.h> #include <algorithm> #include <exception> namespace facebook::fboss { using folly::ByteRange; void toAppend(ReceiveMachine::ReceiveState state, std::string* result) { std::string stateAsString; switch (state) { case ReceiveMachine::ReceiveState::CURRENT: stateAsString = "current"; break; case ReceiveMachine::ReceiveState::EXPIRED: stateAsString = "expired"; break; case ReceiveMachine::ReceiveState::INITIALIZED: stateAsString = "initialized"; break; case ReceiveMachine::ReceiveState::DEFAULTED: stateAsString = "defaulted"; break; case ReceiveMachine::ReceiveState::DISABLED: stateAsString = "disabled"; break; } folly::toAppend(stateAsString, result); } void toAppend( PeriodicTransmissionMachine::PeriodicState state, std::string* result) { std::string stateAsString; switch (state) { case PeriodicTransmissionMachine::PeriodicState::NONE: stateAsString = "none"; break; case PeriodicTransmissionMachine::PeriodicState::SLOW: stateAsString = "slow"; break; case PeriodicTransmissionMachine::PeriodicState::FAST: stateAsString = "fast"; break; case PeriodicTransmissionMachine::PeriodicState::TX: stateAsString = "tx"; break; } folly::toAppend(stateAsString, result); } void toAppend(MuxMachine::MuxState state, std::string* result) { std::string stateAsString; switch (state) { case MuxMachine::MuxState::DETACHED: stateAsString = "detached"; break; case MuxMachine::MuxState::WAITING: stateAsString = "waiting"; break; case MuxMachine::MuxState::ATTACHED: stateAsString = "attached"; break; case MuxMachine::MuxState::COLLECTING_DISTRIBUTING: stateAsString = "collecting&distributing"; break; } folly::toAppend(stateAsString, result); } // Needed for CHECK_* macros to work with ReceiveMachine::ReceiveState std::ostream& operator<<(std::ostream& out, ReceiveMachine::ReceiveState s) { std::string stateAsString; switch (s) { case ReceiveMachine::ReceiveState::CURRENT: stateAsString = "current"; break; case ReceiveMachine::ReceiveState::EXPIRED: stateAsString = "expired"; break; case ReceiveMachine::ReceiveState::INITIALIZED: stateAsString = "initialized"; break; case ReceiveMachine::ReceiveState::DEFAULTED: stateAsString = "defaulted"; break; case ReceiveMachine::ReceiveState::DISABLED: stateAsString = "disabled"; break; } return (out << stateAsString); } // Needed for CHECK_* macros to work with MuxMachine::MuxState std::ostream& operator<<(std::ostream& out, MuxMachine::MuxState s) { std::string stateAsString; switch (s) { case MuxMachine::MuxState::DETACHED: stateAsString = "detached"; break; case MuxMachine::MuxState::WAITING: stateAsString = "waiting"; break; case MuxMachine::MuxState::ATTACHED: stateAsString = "attached"; break; case MuxMachine::MuxState::COLLECTING_DISTRIBUTING: stateAsString = "collecting&distributing"; break; } return (out << stateAsString); } ReceiveMachine::ReceiveMachine( LacpController& controller, folly::EventBase* evb, LacpServicerIf* servicer, uint16_t holdTimerMultiplier) : folly::AsyncTimeout(evb), controller_(controller), servicer_(servicer), slowEpochSeconds_(std::chrono::seconds(30 * holdTimerMultiplier)), fastEpochSeconds_(std::chrono::seconds(1 * holdTimerMultiplier)) {} ReceiveMachine::~ReceiveMachine() {} void ReceiveMachine::start() { initialize(); } void ReceiveMachine::stop() { cancelTimeout(); } void ReceiveMachine::rx(LACPDU lacpdu) { CHECK(controller_.evb()->inRunningEventBaseThread()); XLOG(DBG4) << "ReceiveMachine[" << controller_.portID() << "]: " << "RX(" << lacpdu.describe() << ")"; if (state_ == ReceiveState::DISABLED) { XLOG(DBG4) << "ReceiveMachine[" << controller_.portID() << "]: " << "Ignoring frame reception in DISABLED state"; return; } current(lacpdu); } void ReceiveMachine::portUp() { CHECK(controller_.evb()->inRunningEventBaseThread()); if (state_ != ReceiveState::DISABLED) { XLOG(DBG2) << "ReceiveMachine[" << controller_.portID() << "]: Ignoring UP in state " << state_; return; } XLOG(DBG2) << "ReceiveMachine[" << controller_.portID() << "]: UP"; expired(); } void ReceiveMachine::portDown() { CHECK(controller_.evb()->inRunningEventBaseThread()); XLOG(DBG4) << "ReceiveMachine[" << controller_.portID() << "]: DOWN"; disabled(); } void ReceiveMachine::initialize() { // This state is entered on either initialization or port movement. Since // port movement is not yet implemented, it can currently only be entered // during initialization, when state_ == ReceiveState::INITIALIZED. // TODO(samank): remove CHECK once port movement functionality has been added CHECK_EQ(state_, ReceiveState::INITIALIZED); controller_.unselected(); recordDefault(); controller_.setActorState(controller_.actorState() & ~LacpState::EXPIRED); disabled(); // UCT } void ReceiveMachine::defaulted() { updateState(ReceiveState::DEFAULTED); updateDefaultSelected(); recordDefault(); controller_.setActorState(controller_.actorState() & ~LacpState::EXPIRED); } void ReceiveMachine::expired() { updateState(ReceiveState::EXPIRED); partnerInfo_.state |= LacpState::SHORT_TIMEOUT; partnerInfo_.state &= ~LacpState::IN_SYNC; controller_.notMatched(); controller_.setActorState(controller_.actorState() | LacpState::EXPIRED); startNextEpoch(fastEpochSeconds_); } void ReceiveMachine::disabled() { updateState(ReceiveState::DISABLED); endThisEpoch(); partnerInfo_.state &= ~LacpState::IN_SYNC; controller_.notMatched(); } void ReceiveMachine::updateSelected(LACPDU& lacpdu) { if (lacpdu.actorInfo.systemID == partnerInfo_.systemID && lacpdu.actorInfo.systemPriority == partnerInfo_.systemPriority && lacpdu.actorInfo.key == partnerInfo_.key && lacpdu.actorInfo.portPriority == partnerInfo_.portPriority && lacpdu.actorInfo.port == partnerInfo_.port && (lacpdu.actorInfo.state & LacpState::AGGREGATABLE) == (partnerInfo_.state & LacpState::AGGREGATABLE)) { return; } XLOG(DBG4) << "ReceiveMachine[" << controller_.portID() << "]: " << "Partner claimed " << lacpdu.actorInfo.describe() << " but I have " << partnerInfo_.describe(); controller_.unselected(); } void ReceiveMachine::updateDefaultSelected() { auto adminDefault = ParticipantInfo(); if (adminDefault != controller_.partnerInfo()) { controller_.unselected(); } } bool ReceiveMachine::updateNTT(LACPDU& lacpdu) { // TODO(samank): skip over DEFAULTED, EXPIRED, COLLECTING, DISTRIBUTING? return lacpdu.partnerInfo != controller_.actorInfo(); } void ReceiveMachine::current(LACPDU lacpdu) { updateState(ReceiveState::CURRENT); updateSelected(lacpdu); bool ntt = updateNTT(lacpdu); bool activityChanged = (lacpdu.actorInfo.state & LacpState::ACTIVE) != (partnerInfo_.state & LacpState::ACTIVE); recordPDU(lacpdu); controller_.setActorState(controller_.actorState() & ~LacpState::EXPIRED); controller_.select(); if (ntt) { controller_.ntt(); } if (activityChanged) { controller_.startPeriodicTransmissionMachine(); } startNextEpoch(epochDuration()); } std::chrono::seconds ReceiveMachine::epochDuration() { return controller_.actorInfo().state & LacpState::SHORT_TIMEOUT ? fastEpochSeconds_ : slowEpochSeconds_; } void ReceiveMachine::recordPDU(LACPDU& lacpdu) { bool partnerChanged = partnerInfo_ != lacpdu.actorInfo; // TODO(samank): needs to be moved after controller_.setActorState(controller_.actorState() & ~LacpState::DEFAULTED); bool matched = lacpdu.partnerInfo.systemPriority == controller_.actorInfo().systemPriority && lacpdu.partnerInfo.systemID == controller_.actorInfo().systemID && lacpdu.partnerInfo.key == controller_.actorInfo().key && lacpdu.partnerInfo.portPriority == controller_.actorInfo().portPriority && lacpdu.partnerInfo.port == controller_.actorInfo().port; if ((matched || ((lacpdu.actorInfo.state & LacpState::AGGREGATABLE) == 0)) && (lacpdu.actorInfo.state & LacpState::IN_SYNC)) { partnerInfo_ = lacpdu.actorInfo; partnerInfo_.state |= LacpState::IN_SYNC; controller_.matched(partnerChanged); } else { // if partner transitions out of sync, LACP will disable fwding on the port if (partnerInfo_.state & LacpState::IN_SYNC) { XLOG(WARNING) << PortAlert() << "ReceiveMachine[" << controller_.portID() << "]: " << "Partner not in sync. Got LACP PDU (" << lacpdu.describe() << ") Previous State (" << partnerInfo_.describe() << ")"; servicer_->recordLacpMismatchPduTeardown(); } partnerInfo_ = lacpdu.actorInfo; partnerInfo_.state &= ~LacpState::IN_SYNC; controller_.notMatched(); } } void ReceiveMachine::startNextEpoch(std::chrono::seconds duration) { scheduleTimeout(duration); } void ReceiveMachine::endThisEpoch() { cancelTimeout(); } void ReceiveMachine::timeoutExpired() noexcept { try { switch (state_) { case ReceiveState::CURRENT: XLOG(WARNING) << PortAlert() << "ReceiveMachine[" << controller_.portID() << "]: RX timer expired in CURRENT state"; expired(); servicer_->recordLacpTimeout(); break; case ReceiveState::EXPIRED: defaulted(); break; case ReceiveState::DISABLED: case ReceiveState::INITIALIZED: case ReceiveState::DEFAULTED: throw LACPError("invalid transition to ", state_); break; } } catch (...) { std::exception_ptr e = std::current_exception(); CHECK(e); XLOG(FATAL) << "ReceiveMachine::timeoutExpired(): " << folly::exceptionStr(e); } } void ReceiveMachine::recordDefault() { partnerInfo_ = ParticipantInfo(); controller_.setActorState(controller_.actorState() | LacpState::DEFAULTED); } // TODO(samank): compiler not seeing overlaod of ostream& operator<< in // in LOG macros but does see it for CHECK macros... void ReceiveMachine::updateState(ReceiveState nextState) { prevState_ = state_; state_ = nextState; XLOG(DBG4) << "ReceiveMachine[" << controller_.portID() << "]: " << prevState_ << "-->" << state_; } ParticipantInfo ReceiveMachine::partnerInfo() const { return partnerInfo_; } void ReceiveMachine::restoreState(AggregatePort::PartnerState partnerState) { updateState(ReceiveState::CURRENT); partnerInfo_ = partnerState; controller_.setActorState( controller_.actorState() | LacpState::DISTRIBUTING | LacpState::COLLECTING | LacpState::IN_SYNC); XLOG(DBG3) << "ReceiveMachine[" << controller_.portID() << "]: restored actor state " << controller_.actorState(); startNextEpoch(epochDuration()); } const std::chrono::seconds PeriodicTransmissionMachine::SHORT_PERIOD(1); const std::chrono::seconds PeriodicTransmissionMachine::LONG_PERIOD(30); PeriodicTransmissionMachine::PeriodicTransmissionMachine( LacpController& controller, folly::EventBase* evb) : folly::AsyncTimeout(evb), controller_(controller) {} PeriodicTransmissionMachine::~PeriodicTransmissionMachine() {} void PeriodicTransmissionMachine::start() { state_ = determineTransmissionRate(); beginNextPeriod(); } void PeriodicTransmissionMachine::stop() { cancelTimeout(); } void PeriodicTransmissionMachine::portUp() { // TODO(samank): start vs portUp CHECK(controller_.evb()->inRunningEventBaseThread()); state_ = determineTransmissionRate(); beginNextPeriod(); } void PeriodicTransmissionMachine::portDown() { CHECK(controller_.evb()->inRunningEventBaseThread()); cancelTimeout(); } void PeriodicTransmissionMachine::beginNextPeriod() { switch (state_) { case PeriodicState::SLOW: XLOG(DBG4) << "PeriodicTransmissionMachine[" << controller_.portID() << "]: scheduling timeout for long period"; scheduleTimeout(LONG_PERIOD); break; case PeriodicState::FAST: XLOG(DBG4) << "PeriodicTransmissionMachine[" << controller_.portID() << "]: scheduling timeout for short period"; scheduleTimeout(SHORT_PERIOD); break; case PeriodicState::NONE: XLOG(DBG4) << "PeriodicTransmissionMachine[" << controller_.portID() << "]: not scheduling a timeout"; break; case PeriodicState::TX: throw LACPError("invalid transition to ", state_); break; } } void PeriodicTransmissionMachine::timeoutExpired() noexcept { try { XLOG(DBG4) << "PeriodicTransmissionMachine[" << controller_.portID() << "]: end of period"; state_ = PeriodicState::TX; controller_.ntt(); state_ = determineTransmissionRate(); beginNextPeriod(); } catch (...) { std::exception_ptr e = std::current_exception(); CHECK(e); XLOG(FATAL) << "PeriodicTranmissionMachine::timeoutExpired(): " << folly::exceptionStr(e); } } PeriodicTransmissionMachine::PeriodicState PeriodicTransmissionMachine::determineTransmissionRate() { auto actorInfo = controller_.actorInfo(); auto partnerInfo = controller_.partnerInfo(); if ((actorInfo.state & LacpState::ACTIVE) == 0 && (partnerInfo.state & LacpState::ACTIVE) == 0) { return PeriodicState::NONE; } return controller_.partnerInfo().state & LacpState::SHORT_TIMEOUT ? PeriodicState::FAST : PeriodicState::SLOW; } const std::chrono::seconds TransmitMachine::TX_REPLENISH_RATE(1); const int TransmitMachine::MAX_TRANSMISSIONS_IN_SHORT_PERIOD = 3; TransmitMachine::TransmitMachine( LacpController& controller, folly::EventBase* evb, LacpServicerIf* servicer) : folly::AsyncTimeout(evb), controller_(controller), servicer_(servicer) {} TransmitMachine::~TransmitMachine() {} void TransmitMachine::start() { scheduleTimeout(TransmitMachine::TX_REPLENISH_RATE); } void TransmitMachine::stop() { cancelTimeout(); } void TransmitMachine::timeoutExpired() noexcept { replenishTranmissionsLeft(); } void TransmitMachine::replenishTranmissionsLeft() noexcept { transmissionsLeft_ = std::min( transmissionsLeft_ + 1, TransmitMachine::MAX_TRANSMISSIONS_IN_SHORT_PERIOD); scheduleTimeout(TransmitMachine::TX_REPLENISH_RATE); } void TransmitMachine::ntt(LACPDU lacpdu) { CHECK(controller_.evb()->inRunningEventBaseThread()); if (transmissionsLeft_ == 0) { // TODO(samank): figure out stale ntt details XLOG(DBG4) << "TransmitMachine[" << controller_.portID() << "]: " << "skipping ntt request"; return; } auto outPort = controller_.portID(); if (!servicer_->transmit(lacpdu, outPort)) { return; } XLOG(DBG4) << "TransmitMachine[" << controller_.portID() << "]: " << "TX(" << lacpdu.describe() << ")"; --transmissionsLeft_; XLOG(DBG4) << transmissionsLeft_ << " transmissions left"; } const std::chrono::seconds MuxMachine::AGGREGATE_WAIT_DURATION(2); MuxMachine::MuxMachine( LacpController& controller, folly::EventBase* evb, LacpServicerIf* servicer) : AsyncTimeout(evb), controller_(controller), servicer_(servicer) {} MuxMachine::~MuxMachine() {} void MuxMachine::selected(AggregatePortID selection) { CHECK(controller_.evb()->inRunningEventBaseThread()); selection_ = selection; switch (state_) { case MuxState::DETACHED: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: SELECTED in " << state_; waiting(true); break; case MuxState::WAITING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: SELECTED in " << state_; attached(); case MuxState::ATTACHED: case MuxState::COLLECTING_DISTRIBUTING: XLOG(WARNING) << "MuxMachine[" << controller_.portID() << "]: Ignoring SELECTED in " << state_; break; } } void MuxMachine::unselected() { CHECK(controller_.evb()->inRunningEventBaseThread()); switch (state_) { case MuxState::DETACHED: XLOG(WARNING) << "MuxMachine[" << controller_.portID() << "]: Ignoring UNSELECTED in " << state_; break; case MuxState::WAITING: case MuxState::ATTACHED: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: UNSELECTED in " << state_; detached(); break; case MuxState::COLLECTING_DISTRIBUTING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: UNSELECTED in " << state_; attached(); break; } } void MuxMachine::standby() { CHECK(controller_.evb()->inRunningEventBaseThread()); switch (state_) { case MuxState::DETACHED: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: STANDBY in " << state_; waiting(false); break; case MuxState::WAITING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: Ignoring STANDBY in " << state_; break; case MuxState::ATTACHED: case MuxState::COLLECTING_DISTRIBUTING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: STANDBY in " << state_; detached(); break; } } void MuxMachine::matched(bool partnerChanged) { CHECK(controller_.evb()->inRunningEventBaseThread()); matched_ = true; switch (state_) { case MuxState::DETACHED: case MuxState::WAITING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: Ignoring MATCHED in " << state_; break; case MuxState::COLLECTING_DISTRIBUTING: if (partnerChanged) { XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: MATCHED partner changed in " << state_; enableCollectingDistributing(); } else { XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: Ignoring MATCHED in " << state_; } break; case MuxState::ATTACHED: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: MATCHED in " << state_; collectingDistributing(); break; } } void MuxMachine::notMatched() { CHECK(controller_.evb()->inRunningEventBaseThread()); matched_ = false; switch (state_) { case MuxState::DETACHED: case MuxState::WAITING: case MuxState::ATTACHED: XLOG(WARNING) << "MuxMachine[" << controller_.portID() << "]: Ignoring NOT MATCHED in " << state_; break; case MuxState::COLLECTING_DISTRIBUTING: XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: NOT MATCHED in " << state_; attached(); break; } } void MuxMachine::start() {} void MuxMachine::stop() {} void MuxMachine::enableCollectingDistributing() const { auto portID = controller_.portID(); auto aggPortID = selection_; /* In testing, we've found that it can take up to 100 ms _after_ a link-up * interrupt has been issued for the link to actually be usable (ie. able to * transmit and receive frames). Thus, immediately expanding the LAG group * to include the newly-up port can result in packet loss. * Packet loss of this nature is avoided in the case of dynamic link * aggregation by an application of fate-sharing: instead of immediately * expanding the LAG group, the LACP machinery is first notified of the event, * resulting in a LAC PDU exchange between the local interface and the remote * interface. Once the LACP handshake has completed, the LAG group is expanded * to include the newly-up port. Since the link must have been able to * transmit and receive to complete the handshake, expanding the LAG group * afterwards must avoid packet loss. Conversely, if the LACP handshake does * not complete, whether due to the link not yet being usable or due to normal * LACP operation, then we would not want to add the newly-up port to the LAG * group, as dictated by LACP. */ servicer_->enableForwardingAndSetPartnerState( portID, aggPortID, controller_.partnerInfo()); } void MuxMachine::disableCollectingDistributing() const { auto portID = controller_.portID(); auto aggPortID = selection_; servicer_->disableForwardingAndSetPartnerState( portID, aggPortID, controller_.partnerInfo()); } void MuxMachine::detached() { updateState(MuxState::DETACHED); // Detach_Mux_From_Aggregator controller_.setActorState( controller_.actorState() & ~(LacpState::IN_SYNC | LacpState::COLLECTING)); disableCollectingDistributing(); controller_.setActorState( controller_.actorState() & ~LacpState::DISTRIBUTING); controller_.ntt(); } void MuxMachine::waiting(bool shouldScheduleTimeout) { updateState(MuxState::WAITING); if (shouldScheduleTimeout) { scheduleTimeout(AGGREGATE_WAIT_DURATION); } } void MuxMachine::timeoutExpired() noexcept { try { attached(); if (matched_) { collectingDistributing(); } } catch (...) { std::exception_ptr e = std::current_exception(); CHECK(e); XLOG(FATAL) << "MuxMachine::timeoutExpired(): " << folly::exceptionStr(e); } } void MuxMachine::attached() { updateState(MuxState::ATTACHED); // Attach_Mux_To_Aggregator // TODO(samank): this needs to change on port down for min-links controller_.setActorState(controller_.actorState() | LacpState::IN_SYNC); controller_.setActorState(controller_.actorState() & ~LacpState::COLLECTING); disableCollectingDistributing(); controller_.setActorState( controller_.actorState() & ~LacpState::DISTRIBUTING); controller_.ntt(); } void MuxMachine::collectingDistributing() { updateState(MuxState::COLLECTING_DISTRIBUTING); controller_.setActorState(controller_.actorState() | LacpState::DISTRIBUTING); enableCollectingDistributing(); controller_.setActorState(controller_.actorState() | LacpState::COLLECTING); controller_.ntt(); } void MuxMachine::updateState(MuxState nextState) { prevState_ = state_; state_ = nextState; XLOG(DBG4) << "MuxMachine[" << controller_.portID() << "]: " << prevState_ << "-->" << state_; } void MuxMachine::restoreState(void) { updateState(MuxState::COLLECTING_DISTRIBUTING); } Selector::Selector(LacpController& controller, uint8_t minLinkCount) : controller_(controller), minLinkCount_(minLinkCount) {} void Selector::start() {} void Selector::stop() {} // TODO(samank): Factor out into MininumLinksSelector void Selector::select() { auto targetLagID = LinkAggregationGroupID::from( controller_.actorInfo(), controller_.partnerInfo()); // If the Link Aggregation Group this port has SELECTED or STANDBYed is the // same as that identified by targetLagID, then there's nothing to do. auto maybeSelection = getSelectionIf(); if (maybeSelection && maybeSelection->lagID == targetLagID) { XLOG(DBG4) << "Selection[" << controller_.portID() << "]: skipping selection logic"; return; } auto chooseTargetLag = [this, targetLagID](SelectionState state) { Selector::portToSelection().insert( std::make_pair(controller_.portID(), Selection(targetLagID, state))); if (state == SelectionState::SELECTED) { XLOG(DBG4) << "Selection[" << controller_.portID() << "]: selected " << targetLagID.describe(); controller_.selected(); } else { XLOG(DBG4) << "Selection[" << controller_.portID() << "]: standby " << targetLagID.describe(); controller_.standby(); } }; // If either the actor or the partner has specified it must operate as // INDIVIDUAL, we must select the Aggregator that uniquely belongs to it if ((controller_.partnerInfo().state & LacpState::AGGREGATABLE) == 0 || (controller_.actorInfo().state & LacpState::AGGREGATABLE) == 0) { chooseTargetLag(SelectionState::SELECTED); return; } bool targetLagSelectedByOtherPort = false; for (const auto& portAndSelection : portToSelection()) { if (portAndSelection.second.lagID == targetLagID) { targetLagSelectedByOtherPort = true; break; } } bool chosen = false; if (targetLagSelectedByOtherPort) { chooseTargetLag(SelectionState::STANDBY); chosen = true; } else { // The target Link Aggregation Group can only be chosen if no other port has // SELECTED or STANDBYed a LAG with the same AggregatePortID (ie. the other // port's LinkAggregationGroupID.actorKey == controller_.aggregatePortID()) if (isAvailable(controller_.aggregatePortID())) { chooseTargetLag(SelectionState::STANDBY); chosen = true; } } if (chosen) { auto targetLagMemberCount = std::count_if( portToSelection().begin(), portToSelection().end(), [targetLagID](PortIDToSelection::value_type el) { Selection s = el.second; return s.lagID == targetLagID && s.state == SelectionState::STANDBY; }); if (targetLagMemberCount == minLinkCount_) { auto portsToSignal = getPortsWithSelection( Selection(targetLagID, SelectionState::STANDBY)); controller_.selected( folly::range(portsToSignal.begin(), portsToSignal.end())); } } // TODO(samank): aggregate with the nil aggregator } Selector::Selection Selector::getSelection() { auto it = portToSelection().find(controller_.portID()); if (it == portToSelection().end()) { throw LACPError(); } return it->second; } std::optional<Selector::Selection> Selector::getSelectionIf() { auto it = portToSelection().find(controller_.portID()); if (it == portToSelection().end()) { return std::nullopt; } return it->second; } bool Selector::isAvailable(AggregatePortID aggPortID) const { for (const auto& portAndSelection : portToSelection()) { Selection s = portAndSelection.second; if (static_cast<AggregatePortID>(s.lagID.actorKey) == aggPortID) { return false; } } return true; } Selector::PortIDToSelection& Selector::portToSelection() { static Selector::PortIDToSelection portToSelection_; return portToSelection_; } std::vector<PortID> Selector::getPortsWithSelection(Selection s) const { std::vector<PortID> ports; for (const auto& portIDAndSelection : portToSelection()) { auto portID = portIDAndSelection.first; auto selection = portIDAndSelection.second; if (selection == s) { ports.push_back(portID); } } return ports; } void Selector::portDown() { auto maybeSelection = getSelectionIf(); if (!maybeSelection) { XLOG(DBG4) << "Selection[" << controller_.portID() << "]: no LAG chosen"; return; } // ReceiveMachine::portDown() was invoked prior to this, which will eventually // signal NOT partner.SYNC. If the MuxMachine was in state // MuxState::COLLECTING_DISTRIBUTING, then it would have transitioned to // MuxMachine::ATTACHED on this signal. Signalling UNSELECTED at this point // will transition it to MuxMachine::DETACHED. controller_.unselected(); } void Selector::unselected() { auto it = portToSelection().find(controller_.portID()); if (it == portToSelection().end()) { return; } LinkAggregationGroupID myLagID = it->second.lagID; portToSelection().erase(it); auto ports = getPortsWithSelection(Selection(myLagID, SelectionState::SELECTED)); controller_.standby(folly::range(ports.begin(), ports.end())); } void Selector::selected() { auto it = portToSelection().find(controller_.portID()); CHECK(it != portToSelection().end()); it->second.state = SelectionState::SELECTED; } void Selector::standby() { auto it = portToSelection().find(controller_.portID()); if (it == portToSelection().end()) { return; } it->second.state = SelectionState::STANDBY; } void Selector::restoreState() { auto targetLagID = LinkAggregationGroupID::from( controller_.actorInfo(), controller_.partnerInfo()); Selector::portToSelection().insert(std::make_pair( controller_.portID(), Selection(targetLagID, SelectionState::SELECTED))); XLOG(DBG4) << "Selection[" << controller_.portID() << "]: selected " << targetLagID.describe(); controller_.selected(); } } // namespace facebook::fboss