python/qpid_dispatch_internal/router/engine.py (126 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import time from traceback import format_exc, extract_stack from .data import MessageHELLO, MessageRA, MessageLSU, MessageLSR, \ isCompatibleVersion, getIdAndVersion from .hello import HelloProtocol from .link import LinkStateEngine from .path import PathEngine from .node import NodeTracker from .message import Message ## # Import the Dispatch adapters from the environment. If they are not found # (i.e. we are in a test bench, etc.), load the stub versions. ## from ..dispatch import IoAdapter, LogAdapter, LOG_TRACE, LOG_INFO, LOG_ERROR, LOG_WARNING, LOG_STACK_LIMIT from ..dispatch import TREATMENT_MULTICAST_FLOOD class RouterEngine: """ """ def __init__(self, router_adapter, router_id, area, max_routers, config_override=None): """ Initialize an instance of a router for a domain. """ config_override = config_override or {} ## # Record important information about this router instance ## self.domain = "domain" self.router_adapter = router_adapter self._config = None # Not yet loaded self._log_hello = LogAdapter("ROUTER_HELLO") self._log_ls = LogAdapter("ROUTER_LS") self._log_general = LogAdapter("ROUTER") self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', TREATMENT_MULTICAST_FLOOD), IoAdapter(self.receive, "qdrouter", 'T', '0', TREATMENT_MULTICAST_FLOOD), IoAdapter(self.receive, "qdhello", 'L', '0', TREATMENT_MULTICAST_FLOOD)] self.max_routers = max_routers self.id = router_id self.instance = int(time.time()) self.area = area self.incompatIds = [] self.log(LOG_INFO, "Router Engine Instantiated: id=%s instance=%d max_routers=%d" % (self.id, self.instance, self.max_routers)) ## # Launch the sub-module engines ## self.node_tracker = NodeTracker(self, self.max_routers) self.hello_protocol = HelloProtocol(self, self.node_tracker) self.link_state_engine = LinkStateEngine(self) self.path_engine = PathEngine(self) # ======================================================================================== # Adapter Entry Points - invoked from the adapter # ======================================================================================== def getId(self): """ Return the router's ID """ return self.id @property def config(self): if not self._config: try: self._config = self.router_adapter.get_agent().find_entity_by_type('router')[0] except IndexError: raise ValueError("No router configuration found") return self._config def setMobileSeq(self, router_maskbit, mobile_seq): """ Another router's mobile sequence number has been changed and the Python router needs to store this number. """ self.node_tracker.set_mobile_seq(router_maskbit, mobile_seq) def setMyMobileSeq(self, mobile_seq): """ This router's mobile sequence number has been changed and the Python router needs to store this number and immediately send a router-advertisement message to reflect the change. """ self.link_state_engine.set_mobile_seq(mobile_seq) self.link_state_engine.send_ra(time.time()) def linkLost(self, link_id): """ The control-link to a neighbor has been dropped. We can cancel the neighbor from the link-state immediately instead of waiting for the hello-timeout to expire. """ self.node_tracker.link_lost(link_id) def handleTimerTick(self): """ """ try: now = time.time() self.hello_protocol.tick(now) self.link_state_engine.tick(now) self.node_tracker.tick(now) except Exception: self.log(LOG_ERROR, "Exception in timer processing\n%s" % format_exc(LOG_STACK_LIMIT)) def handleControlMessage(self, opcode, body, link_id, cost): """ """ if not isCompatibleVersion(body): rid, version = getIdAndVersion(body) if rid not in self.incompatIds: self.incompatIds.append(rid) self.log(LOG_WARNING, "Received %s at protocol version %d from %s. Ignoring." % (opcode, version, rid)) return try: now = time.time() if opcode == 'HELLO': msg = MessageHELLO(body) self.log_hello(LOG_TRACE, "RCVD: %r" % msg) self.hello_protocol.handle_hello(msg, now, link_id, cost) elif opcode == 'RA': msg = MessageRA(body) self.log_ls(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_ra(msg, now) elif opcode == 'LSU': msg = MessageLSU(body) self.log_ls(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_lsu(msg, now) elif opcode == 'LSR': msg = MessageLSR(body) self.log_ls(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_lsr(msg, now) except Exception: self.log(LOG_ERROR, "Exception in control message processing\n%s" % format_exc(LOG_STACK_LIMIT)) self.log(LOG_ERROR, "Control message error: opcode=%s body=%r" % (opcode, body)) def receive(self, message, link_id, cost): """ This is the IoAdapter message-receive handler """ try: self.handleControlMessage(message.properties['opcode'], message.body, link_id, cost) except Exception: self.log(LOG_ERROR, "Exception in raw message processing\n%s" % format_exc(LOG_STACK_LIMIT)) self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r" % (message.properties, message.body)) def getRouterData(self, kind): """ """ if kind == 'help': return {'help' : "Get list of supported values for kind", 'link-state' : "This router's link state", 'link-state-set' : "The set of link states from known routers", 'next-hops' : "Next hops to each known router" } if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() if kind == 'link-state-set' : copy = {} for _id, _ls in self.link_state_engine.collection.items(): copy[_id] = _ls.to_dict() return copy return {'notice': 'Use kind="help" to get a list of possibilities'} # ======================================================================================== # Adapter Calls - outbound calls to Dispatch # ======================================================================================== def log(self, level, text): """ Emit a log message to the host's event log """ info = extract_stack(limit=2)[0] # Caller frame info self._log_general.log(level, text, info[0], info[1]) def log_hello(self, level, text): """ Emit a log message to the host's event log """ info = extract_stack(limit=2)[0] # Caller frame info self._log_hello.log(level, text, info[0], info[1]) def log_ls(self, level, text): """ Emit a log message to the host's event log """ info = extract_stack(limit=2)[0] # Caller frame info self._log_ls.log(level, text, info[0], info[1]) def log_ma(self, level, text): """ Emit a log message to the host's event log """ info = extract_stack(limit=2)[0] # Caller frame info self._log_ma.log(level, text, info[0], info[1]) def send(self, dest, msg): """ Send a control message to another router. """ app_props = {'opcode' : msg.get_opcode()} self.io_adapter[0].send(Message(address=dest, properties=app_props, body=msg.to_dict()), True, True) def node_updated(self, addr, reachable, neighbor): """ """ self.router_adapter(addr, reachable, neighbor)