fbnet/command_runner/device_vendor.py (173 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import json import os import re from fbnet.command_runner_asyncio.CommandRunner.ttypes import SessionType from . import utils from .base_service import ServiceObj from .command_session import SSHCommandSession from .options import Option from .ssh_netconf import SSHNetconf class VendorConfig: def __init__(self, defaults, session_names): self._cfg = {} self._session_names = session_names self.update(defaults) def __getattr__(self, attr): return self._cfg.get(attr) def update(self, cfg): for prop, val in cfg.items(): self._cfg[prop] = utils.canonicalize(val) if "supported_sessions" in cfg: # Refresh supported_sessions only if it's updated self._cfg["supported_sessions"] = { self._session_names[s] for s in self._cfg["supported_sessions"] } if "session_type" in cfg: self._cfg["session_type"] = self._session_names[self._cfg["session_type"]] # Default session type should be supported self._cfg["supported_sessions"].add(self._cfg["session_type"]) class DeviceVendor(ServiceObj): _DEFAULTS = { "cli_setup": [b"term len 0", b"term width 511"], "prompt_regex": [rb"[\w.]+[>#$]"], "cmd_timeout_sec": 30, "clear_command": b"\x15", "session_type": b"ssh", "supported_sessions": {b"ssh", b"netconf"}, "autocomplete": True, "port": 22, } _PROMPTS_RE = re.compile( # pyre-fixme[16]: `int` has no attribute `__iter__`. b"|".join([b"(%s)" % p for p in _DEFAULTS["prompt_regex"]]), re.M, ) _SESSION_NAMES = {b"ssh": SessionType.SSH, b"netconf": SessionType.SSH_NETCONF} _SESSION_TYPES = { SessionType.SSH: SSHCommandSession, SessionType.SSH_NETCONF: SSHNetconf, } def __init__(self, vendor_name, service): super().__init__(service, "DeviceVendor") self._vendor_name = vendor_name self._config = VendorConfig(self._DEFAULTS, self._SESSION_NAMES) self._prompt_re = self._PROMPTS_RE def __repr__(self): props = { "cli_setup": self._config.cli_setup, "prompt_regex": self._config.prompt_regex, "cmd_timeout_sec": self._config.cmd_timeout_sec, "autocomplete": self._config.autocomplete, } return "DeviceVendor(%s) %s" % (self.vendor_name, props) @classmethod def register_counters(cls, stats_mgr): for session_type in cls._SESSION_TYPES.values(): session_type.register_counters(stats_mgr) stats_mgr.register_counter("device_vendor.all_sessions") stats_mgr.register_counter("device_vendor.unsupported_session") def get_prompt_re(self, trailer=None): """ Get prompt regex for the device. Optionally a trailer can be specified. This is extra text expected after the prompt. Mostly useful for interactive command. E.g. when we get a list of completion, the intial command is inserted after the prompt """ if not trailer: return self._prompt_re return self._get_prompt_re(trailer) def get_port(self): return self._config.port @property def vendor_name(self): return self._vendor_name @property def cmd_timeout_sec(self): return self._config.cmd_timeout_sec @property def clear_command(self): return self._config.clear_command @property def exit_command(self): return self._config.exit_command @property def cli_setup(self): return self._config.cli_setup @property def session_type(self): return self._SESSION_TYPES[self._config.session_type] @property def autocomplete(self): return self._config.autocomplete def select_session_type(self, options): """ Select session type for given set of options. Users can override session type here, by specifying session_type in options. This needs to be implemented for vendors supporting multiple session types """ self.inc_counter("device_vendor.all_sessions") session_type = options.get("session_type", None) if session_type in self._config.supported_sessions: return self._SESSION_TYPES.get(session_type, self.session_type) else: if session_type is not None: self.logger.warning( "Device vendor {} does not support session {}".format( self._vendor_name, session_type ) ) self.inc_counter("device_vendor.unsupported_session") return self.session_type def update_config(self, vendor_config): self._config.update(vendor_config) self._update_prompts_re() def set_user_prompts(self, prompts): self._config.update({"user_prompts": prompts}) self._update_prompts_re() def _update_prompts_re(self): self._prompt_re = self._get_prompt_re() def _get_prompt_re(self, trailer=None): prompts = self._config.prompt_regex if self._config.shell_prompts: prompts += self._config.shell_prompts if self._config.user_prompts: prompts += self._config.user_prompts if self._config.bootstrap_prompts: prompts += self._config.bootstrap_prompts return self._build_prompt_re(prompts, trailer) @classmethod def _build_prompt_re(cls, prompts, trailer=None): all_prompts = (b"(%s)" % prompt for prompt in prompts) trailer = trailer or b"" # the prompt must be at the start of the line. # Also since we are sending one command at a time, it must also be the # last text in the text. Although still not perfect, this greatly # reduces the probability of this matching some random text in the # output. Not that we are matching at end of the text, not at the end of # each line in text (re.M is not specified) return re.compile( b"(?<=[\n\r])(?P<prompt>" + b"|".join(all_prompts) + rb")\s*" + trailer + b"$", re.M, ) class DeviceVendors(ServiceObj): # User specified device vendor information device_vendors = Option( "--device_vendors", help="A JSON file containing vendor information", default=None, ) def __init__(self, service, name=None): super().__init__(service, name) self._vendors = {} self._load_vendors_data() @classmethod def register_counters(cls, stats_mgr): DeviceVendor.register_counters(stats_mgr) def get(self, name): return self._vendors.get(name) or self._createVendor(name) def _update_user_prompts(self, path, cfg): if cfg is not None: for vendor, prompts in cfg["prompt_regexs"].items(): self.get(vendor).set_user_prompts(prompts) def _update_device_vendors(self, path, cfg): # now load the vendor information for name, props in cfg["vendor_config"].items(): vendor = self.get(name) vendor.update_config(props) def load_vendors(self, path, json_str): cfg = json.loads(json_str) return self._update_device_vendors(path, cfg) def _load_device_vendors(self): """ Load device vendors specified on command line """ if self.device_vendors and os.path.exists(self.device_vendors): self.logger.info("loading local file") with open(self.device_vendors, "rb") as fh: jsonb = fh.read() return self.load_vendors(self.device_vendors, jsonb.decode("utf-8")) def _load_vendors_data(self): """ Load vendors information """ self._load_device_vendors() def _createVendor(self, name): vendor = DeviceVendor(name, self.service) self._vendors[name] = vendor return vendor