fbnet/command_runner/ssh_netconf.py (110 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import asyncio import re import typing from fbnet.command_runner.command_session import ResponseMatch, SSHCommandSession from fbnet.command_runner.counters import Counters from fbnet.command_runner.exceptions import ( ValidationErrorException, CommandExecutionTimeoutErrorException, UnsupportedDeviceErrorException, ) from fbnet.command_runner_asyncio.CommandRunner.ttypes import CommandResult from .utils import construct_netconf_capability_set if typing.TYPE_CHECKING: from fbnet.command_runner.device_info import DeviceInfo from fbnet.command_runner.service import FcrServiceBase class SSHNetconf(SSHCommandSession): TERM_TYPE: typing.Optional[str] = None DELIM: bytes = b"]]>]]>" PROMPT: re.Pattern = re.compile(DELIM) HELLO_MESSAGE: bytes = b"""<?xml version="1.0" encoding="UTF-8" ?> <hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <capabilities> <capability>urn:ietf:params:netconf:base:1.0</capability> </capabilities> </hello> """ def __init__( self, service: "FcrServiceBase", devinfo: "DeviceInfo", options: typing.Dict[str, typing.Any], loop: asyncio.AbstractEventLoop, ) -> None: super().__init__(service, devinfo, options, loop) self.server_hello: typing.Optional[str] = None @classmethod def register_counter(cls, counters: Counters): super().register_counters(counters) stats = ["sum", "avg"] counters.add_stats_counter("netconf_capability_construction.error", stats) counters.add_stats_counter("netconf_capability_construction.all", stats) async def _setup_connection(self) -> None: # Wait for the hello message from the peer. We will save this message # and include this with first reply. resp = await self.wait_prompt(self.PROMPT) self.server_hello = resp.data.strip() # Send our hello message to the server self._send_command(self.HELLO_MESSAGE) self._validate_netconf_capabilities() def _send_command(self, cmd: bytes) -> None: # Send a command followed by a delimiter self._stream_writer.write(b"\n" + cmd + self.DELIM + b"\n") def _format_output(self, cmd: bytes, resp: ResponseMatch) -> bytes: return resp.data.strip() def build_result(self, output: str, status: str, command: str) -> CommandResult: result = super().build_result(output, status, command) if self.server_hello: result.capabilities = self.server_hello self.server_hello = None return result def _validate_netconf_capabilities(self) -> None: """ Validates that the remote netconf host (device) has FCR's netconf base capability. Raise exception if not. """ assert self.server_hello, "We haven't received hello message from Device yet!" self.inc_counter("netconf_capability_construction.all") try: remote_host_netconf_base_capabilities_set = ( construct_netconf_capability_set(self.server_hello) ) local_netconf_base_capabilities_set = construct_netconf_capability_set( self.HELLO_MESSAGE ) except Exception: # Failed at constructing the capability set, let's continue the session # without validating the capabilities self.logger.exception("Failed at constructing remote host's capability set") self.inc_counter("netconf_capability_construction.error") return if not ( remote_host_netconf_base_capabilities_set & local_netconf_base_capabilities_set ): # Device does not share common capability with us, terminate the connection super().close() raise UnsupportedDeviceErrorException( "Remote host and FCR do not share common Netconf base capabilities!\n" f"Current FCR supported Netconf base capabilities: {local_netconf_base_capabilities_set}" ) async def _run_command( self, command: bytes, timeout: typing.Optional[int] = None, prompt_re: typing.Optional[typing.Pattern] = None, ) -> bytes: try: self.logger.info(f"Sending command to device. Command: {command}") self._send_command(command) # Wait for response with timeout resp = await asyncio.wait_for( self.wait_prompt(self.PROMPT), timeout or self._devinfo.vendor_data.cmd_timeout_sec, loop=self._loop, ) return self._format_output(command, resp) except asyncio.TimeoutError: self.logger.error("Timeout waiting for command response") data = await self._stream_reader.drain() raise CommandExecutionTimeoutErrorException( "Command Response Timeout", data[-200:] ) # pyre-fixme: Inconsistent return type async def _connect( self, subsystem: typing.Optional[str] = None, exec_command: typing.Optional[str] = None, ) -> None: command = None device = self._opts.get("device") # One of subsystem/command needs to be specified. If subsystem is specified # we will ignore the commmand subsystem = device.session_data.subsystem if device.session_data else None if not subsystem: command = device.session_data.exec_command if device.session_data else None if not command: raise ValidationErrorException( "either subsystem or exec_command must be specified " "for netconf session" ) # pyre-fixme: Inconsistent return type return await super()._connect(subsystem=subsystem, exec_command=command)