fbnet/command_runner/console_session.py (294 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import asyncio import re from typing import ( List, TYPE_CHECKING, Any, AnyStr, Dict, NamedTuple, Optional, Pattern, Tuple, Union, ) from fbnet.command_runner.exceptions import ( PermissionErrorException, RuntimeErrorException, ) from .command_session import SSHCommandSession from .device_info import IPInfo from .options import Option if TYPE_CHECKING: from asyncio import AbstractEventLoop from fbnet.command_runner.service import FcrServiceBase from .command_session import ResponseMatch from .device_info import DeviceInfo class ConsoleInfo(NamedTuple): """ Information about the console """ contype: str host: str server: str port: Union[str, int] def __repr__(self) -> str: """ pretty representation of console information """ return "host:{s.host} {s.contype}: {s.server}:{s.port}".format(s=self) class ConsoleCommandSession(SSHCommandSession): """ A command session that runs over a console connections. Currently we only support SSH connection to the console server """ # The three default class variables below let you preset the prompts in-case vendor is unknown. # If you would like to hard-code or preset the prompts for specific devices, you can do so # by hardcoding _CONFIG_CONSOLE_PROMPTS_RE_DICT and _CONFIG_INTERACT_PROMPTS_RE_DICT _DEFAULT_INTERACT_PROMPTS: Dict[bytes, bytes] = { b"Y": rb"Do you acknowledge\? \(Y/N\)\?" } _DEFAULT_CONSOLE_PROMPTS: Dict[bytes, bytes] = { # For login we need to ignore output like: # Last login: Mon May 8 13:53:17 on ttyS0 b"login": rb".*((?<!Last ).ogin|.sername):\s*$", b"passwd": rb"\n.*assword\s?:\s*$", # Ignore login failure message like P64639613 b"prompt": b"\n.*[$#>%](?!Login incorrect)", b"interact_prompts": rb"Do you acknowledge\? \(Y/N\)\?", # Ignore these prompts during login attempts b"ignore_prompts": b"( to cli \\])|(who is on this device.\\]\\r\\n)|(Press RETURN to get started\r\n)", } _DEFAULT_CONSOLE_PROMPTS_RE: Optional[Pattern] = None _DEFAULT_LOGOUT_COMMAND: bytes = b"exit" _CONSOLE_EXPECT_DELAY: int = 5 _CONSOLE_LOGIN_TIMEOUT_S = Option( "--console_login_timeout_s", help="The upper bound of the time (in seconds) that FCR waits for a " "console server to login to the target device (only applies when a " "console is used to connect to a device). (default: %(default)s)", type=int, default=60, ) _CONFIG_CONSOLE_PROMPTS_RE_DICT: Optional[Dict[bytes, Pattern]] = None _CONFIG_INTERACT_PROMPTS_RE_DICT: Optional[Dict[bytes, Dict[bytes, bytes]]] = None def __init__( self, service: "FcrServiceBase", devinfo: "DeviceInfo", options: Dict[str, Any], loop: "AbstractEventLoop", ) -> None: super().__init__(service, devinfo, options, loop) self._console: str = options["console"] @classmethod def _build_and_set_prompts_re_dict( cls, vendor_prompts: Optional[Dict[bytes, Dict[bytes, bytes]]] = None, vendor_interact_prompts: Optional[Dict[bytes, Dict[bytes, bytes]]] = None, ) -> None: """ This method takes in a dictionary of a dictionary of vendor specific prompts, builds each set of vendor prompts and adds them to the instance dictionary of regexs CONFIG_CONSOLE_PROMPTS_RE_DICT to be matched against console prompts. Also sets the interact_prompts dictionary, if applicable (unneccessary to compile regexes until there is a match). """ cls._CONFIG_CONSOLE_PROMPTS_RE_DICT = {} cls._CONFIG_INTERACT_PROMPTS_RE_DICT = {} if vendor_prompts: for vendor, prompts in vendor_prompts.items(): cls._CONFIG_CONSOLE_PROMPTS_RE_DICT[ # pyre-ignore vendor ] = cls._build_individual_prompt_re(prompts) if vendor_interact_prompts: cls._CONFIG_INTERACT_PROMPTS_RE_DICT = vendor_interact_prompts @classmethod def _build_individual_prompt_re( cls, prompts: Optional[Dict[bytes, bytes]] = None ) -> Pattern: """ This function takes in a dictionary of regexes of a single vendor to match prompts against, then compiles all the prompts into a single grouped regex and returns the regex. """ prompts_re = [] if prompts: prompts_re = [ b"(?P<%s>%s)" % (group, regex) for group, regex in prompts.items() ] prompt_re = b"|".join(prompts_re) return re.compile(prompt_re + rb"\s*$") @classmethod def get_default_console_prompt_re(cls) -> Pattern: # This check ensures that _DEFAULT_CONSOLE_PROMPTS_RE is not None, ignoring pyre warning if not cls._DEFAULT_CONSOLE_PROMPTS_RE: cls._DEFAULT_CONSOLE_PROMPTS_RE = cls._build_individual_prompt_re( cls._DEFAULT_CONSOLE_PROMPTS ) return cls._DEFAULT_CONSOLE_PROMPTS_RE # pyre-ignore @classmethod def get_prompt_re(cls, vendor_name: Optional[str] = None) -> Pattern: """ This method takes in a vendor name and retrieves the pre-compiled group regex corresponding to that vendor from the _CONFIG_CONSOLE_PROMPTS_RE_DICT. If the vendor is not given or does not already exist in the dictionary, return the default (hard-coded) regex. If the default regex is not already built, build it and then set it. """ # This check ensure that _CONFIG_CONSOLE_PROMPTS_RE_DICT is not None, ignoring pyre warning if cls._CONFIG_CONSOLE_PROMPTS_RE_DICT and vendor_name: return cls._CONFIG_CONSOLE_PROMPTS_RE_DICT.get( # pyre-ignore vendor_name.encode("utf-8"), cls.get_default_console_prompt_re() ) return cls.get_default_console_prompt_re() async def dest_info(self) -> Tuple[List[IPInfo], int, str, str]: console = await self.get_console_info() self.logger.info(f"{str(console)}") # By default we assume a console is directly specified by the user, # so we don't want to raise error messages that it is not pingable # Set is_pingable to True since check_ip method always returns True is_pingable = True return ( [IPInfo(console.server, is_pingable)], console.port, self._username, self._password, ) async def expect( self, regex: Pattern, timeout: int = _CONSOLE_EXPECT_DELAY ) -> "ResponseMatch": try: return await asyncio.wait_for( self.wait_prompt(prompt_re=regex, timeout=timeout), timeout, loop=self._loop, ) except asyncio.TimeoutError as ex: data = [] # This statement ensures that _stream_reader is not none if self._stream_reader: data = await self._stream_reader.drain() raise asyncio.TimeoutError( "Timeout during waiting for prompt." f"Currently received data: {data[-200:]}" ) from ex def send(self, data: AnyStr, end: bytes = b"\n") -> None: """ send some data and optionally wait for some data """ if isinstance(data, str): data = data.encode("utf8") # This check ensure _stream_writer is not none, ignoring the pyre warning if self._stream_writer: self._stream_writer.write(data + end) # pyre-ignore async def _try_login( # noqa C901 self, username: Optional[str] = None, passwd: Optional[str] = None, kickstart: bool = False, username_tried: bool = False, pwd_tried: bool = False, get_response_timeout: int = _CONSOLE_EXPECT_DELAY, ) -> None: """ A helper function that tries to login into the device. kickstart gives the option to send a clear line to the console before entering the username and password to prevent false TimeoutError, since sometimes user needs to hit an Enter before logging in. username_tried and pwd_tried are an indicator showing that the FCR has received a login error from the console after sending the username and password to the console, if this triggers, we would raise a PermissionError stating that the console has failed to login, this will also prevent false TimeoutError to happen. """ try: res = await self._get_response(timeout=get_response_timeout) except asyncio.TimeoutError: if kickstart and username: # We likely didn't get anything from the console. Try sending a # newline to kickstart the login process self.logger.debug("kickstart console login") # Clear the current line and send a newline self._send_clearline() return await self._try_login( username=username, passwd=passwd, username_tried=username_tried, pwd_tried=pwd_tried, ) else: raise if res.groupdict.get("ignore_prompts"): # If we match anything in the ignore prompts, set a \r\n self._send_newline(end=b"") await asyncio.sleep(0.2) # Let the console catch up # Now again try to login. return await self._try_login( username=username, passwd=passwd, username_tried=username_tried, pwd_tried=pwd_tried, ) elif res.groupdict.get("login"): if username_tried: raise PermissionErrorException( "Login failure, possibly incorrect username or password, " "or device refuses to login." ) # The device is requesting login information # If we don't have a username, then likely we already sent a # username. The consoles are slow, we may have send extra # carriage returns, resulting in multiple login prompts. We will # simply ignore the subsequent login prompts. if username is not None: # TODO: It seems the original author of this session used the # arguments `username` and `passwd` to indicate whether the # username or password have been sent to the device (it will # set the corresponding argument to None after sending the # credential). This seems to be duplicated with the arguments # `username_tried` and `pwd_tried` we added later. If so, we'll # need to remove one of them self.send(self._username) # if we don't have username, we are likely waiting for password return await self._try_login( passwd=passwd, username_tried=True, pwd_tried=pwd_tried ) elif res.groupdict.get("passwd"): if pwd_tried: raise PermissionErrorException( "Login failure, possibly incorrect username or password, " "or device refuses to login." ) if passwd is None: # passwd information not available # Likely we have alreay sent the password. Bail out instead # of getting stuck in a loop. raise RuntimeErrorException("Failed to login: Missing password") self.send(self._password) return await self._try_login( username_tried=username_tried, pwd_tried=True, get_response_timeout=self._CONSOLE_LOGIN_TIMEOUT_S, ) elif res.groupdict.get("interact_prompts"): # E.g. send 'Y' to to satisfy the post login prompt on Nokia device self._interact_prompts_action(res.groupdict.get("interact_prompts")) return await self._try_login( username_tried=username_tried, pwd_tried=pwd_tried ) elif res.groupdict.get("prompt"): # Finally we matched a prompt. we are done return self._send_newline() else: raise RuntimeErrorException("Matched no group: %s" % (res.groupdict)) async def _get_response(self, timeout: int) -> "ResponseMatch": # A small delay to avoid having to match extraneous input await asyncio.sleep(0.1) res = await self.expect( self.get_prompt_re(self._devinfo.vendor_name), timeout=timeout ) return res async def _try_logout(self, kick_shutdown: bool = False) -> None: """ Run logout command and wait for the login prompt to show up (the login prompt indicates that it successfully logs out and is waiting for the next login). This is to ensure we cleanly diconnect from the device after running the command """ if not self._stream_writer: return logout_cmd = ( self._devinfo.vendor_data.exit_command or self._DEFAULT_LOGOUT_COMMAND ) self.logger.info(f"Logout from device, running logout command: {logout_cmd}") self._stream_writer.write(logout_cmd + b"\n") # Make sure we logout of the system while True: try: res = await self._get_response(timeout=self._CONSOLE_EXPECT_DELAY) except asyncio.TimeoutError: if kick_shutdown: self.logger.info( "Received first timeout while matching console prompt, " "sending a new line character to the console" ) self._send_newline() return await self._try_logout() else: self.logger.exception("Console session timed out while logging out") return except Exception: self.logger.exception("Console session log out failure") return if res.groupdict.get("ignore_prompts"): # If we match anything in the ignore prompts, set a \r\n self._send_newline(end=b"") await asyncio.sleep(0.2) # Let the console catch up elif res.groupdict.get("login"): self.logger.info("Logout successfully") return else: self.logger.error(f"Get unexpected prompt when logging out: {res}") return async def _close(self) -> None: await self._try_logout(kick_shutdown=True) await super()._close() def _send_clearline(self) -> None: self.send(b"\x15\r\n") def _send_newline(self, end: bytes = b"\n") -> None: self.send(b"\r", end) def _interact_prompts_action(self, prompt_match: AnyStr) -> None: # Check if to use default interactive prompts or configured prompts vendor_interact_prompts = self._DEFAULT_CONSOLE_PROMPTS if ( self._CONFIG_INTERACT_PROMPTS_RE_DICT and self._devinfo.vendor_name.encode("utf8") in self._CONFIG_INTERACT_PROMPTS_RE_DICT ): vendor_interact_prompts = self._CONFIG_INTERACT_PROMPTS_RE_DICT.get( self._devinfo.vendor_name.encode("utf8") ) interact_prompts = [ b"(?P<%s>%s)" % (group, regex) # pyre-fixme[16]: `Optional` has no attribute `items`. for group, regex in vendor_interact_prompts.items() ] interact_prompts_re = b"|".join(interact_prompts) interact_prompt_match = re.match( interact_prompts_re, prompt_match # pyre-ignore ) # Ignoring the pyre warning since the logic in _try_login ensures we # find a match for action in interact_prompt_match.groupdict(): # pyre-ignore self.send(action) async def _setup_connection(self) -> None: if self._opts.get("raw_session"): await asyncio.sleep(1) else: # Since this is a normal session, try to login to device. await self._try_login(self._username, self._password, kickstart=True) # Now send the setup commands await super()._setup_connection() async def get_console_info(self) -> ConsoleInfo: """ By default we assume a console is directly specified by the user. Depending on your system, you may want to get this information from your local database. In such case you can override this method according to your needs """ con_srv, con_port = self._console.split(":") return ConsoleInfo("CON", self.hostname, con_srv, con_port) async def _run_command( self, command: bytes, timeout: Optional[int] = None, prompt_re: Optional[Pattern] = None, ) -> bytes: if self._opts.get("raw_session"): # This statement ensures _stream_reader is not none if self._stream_reader: await self._stream_reader.drain() self.send(command) resp = await self.wait_prompt(prompt_re) return resp.data + resp.matched else: return await super()._run_command(command, timeout, prompt_re)