fbnet/command_runner/utils.py (171 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import re
import xml.etree.ElementTree as et
from collections import namedtuple
from functools import lru_cache, wraps
from typing import Optional, Set, Union, List, Dict, Any, NamedTuple, TYPE_CHECKING
from fbnet.command_runner.exceptions import (
ValidationErrorException,
LookupErrorException,
)
from fbnet.command_runner_asyncio.CommandRunner import ttypes
if TYPE_CHECKING:
from fbnet.command_runner.device_info import DeviceInfo
from fbnet.command_runner.service import FcrServiceBase
_XML_NAMESPACE_REGEX: str = r"""\{[^}]*\}"""
_NETCONF_BASE_CAPABILITY_REGEX: str = ".*netconf:base:[0-9]+[.][0-9]+$"
CommandInfo = namedtuple("CommandInfo", "cmd precmd prompt_re")
DeviceIP = namedtuple("DeviceIP", ["name", "addr", "mgmt_ip"])
IPInfo = NamedTuple("IPInfo", [("addr", str), ("is_pingable", bool)])
def canonicalize(val):
"""
A helper function to convert all 'str' to 'bytes' in given value. The
values can either be a string or a list. We will recursively convert each
member of the list.
"""
if isinstance(val, list):
return [canonicalize(v) for v in val]
if isinstance(val, str):
return val.encode("utf8")
return val
def _check_device(device: Optional[ttypes.Device]) -> None:
if not device:
raise ValidationErrorException("Required argument (device) cannot be None.")
missing_list = []
if not device.hostname:
missing_list.append("hostname")
if not device.username:
missing_list.append("username")
# Here we check strictly whether the password is None. This is for
# sometimes when the device is unprovisioned, it does not require
# password to login. In this case, we allow user to enter an empty
# string
if device.password is None:
missing_list.append("password")
if missing_list:
raise ValidationErrorException(
f"Following required Device fields are missing: {missing_list}"
)
def _check_session(session: Optional[ttypes.Session]) -> None:
if not session:
raise ValidationErrorException("Required argument (session) cannot be None.")
missing_list = []
if not session.hostname:
missing_list.append("hostname")
if not session.id:
missing_list.append("id")
if not session.name:
missing_list.append("name")
if missing_list:
raise ValidationErrorException(
f"Following required Session fields are missing: {missing_list}"
)
# TODO: Expose cache size to cli option
@lru_cache(maxsize=128)
def construct_netconf_capability_set(
netconf_hello_msg: Optional[Union[str, bytes]]
) -> Set[str]:
"""
Given a str or a bytes of netconf hello message, return a set of netconf base
capabilities in that hello message
For example, given a netconf hello message:
<?xml version="1.0" encoding="UTF-8" ?>
<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<capabilities>
<capability>urn:ietf:params:netconf:base:1.0</capability>
<capability>urn:ietf:params:netconf:capability:rollback-on-error:1.0</capability>
<capability>urn:ietf:params:netconf:capability:validate:1.1</capability>
<capability>urn:ietf:params:netconf:capability:confirmed-commit:1.1</capability>
</capabilities>
</hello>
This function will return {"urn:ietf:params:netconf:base:1.0"}
"""
capabilities = set()
if not netconf_hello_msg:
return capabilities
root = et.fromstring(netconf_hello_msg)
# Retrieve the xml namespace from the tag of root of ElementTree
# For later usage of the iter method that searches for capability
namespace_match = re.search(_XML_NAMESPACE_REGEX, root.tag, re.IGNORECASE)
ns = namespace_match.group(0) if namespace_match else ""
# Iteratively look for element in the xml that has the tag that matches
# the pattern 'namespace' + capability. For example, a typical Netconf 1.0
# hello message would look like this after constructing the element tree:
# <{urn:ietf:params:xml:ns:netconf:base:1.0}hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
# <{urn:ietf:params:xml:ns:netconf:base:1.0}capabilities>
# <{urn:ietf:params:xml:ns:netconf:base:1.0}capability>urn:ietf:params:netconf:base:1.0</capability>
# </capabilities>
# </hello>
# In order to match capability items, we need to include the namespace while searching
for capability in root.iter(f"{ns}capability"):
if not capability.text:
# sanity check
continue
elif re.search(
_NETCONF_BASE_CAPABILITY_REGEX,
capability.text,
re.IGNORECASE,
):
capabilities.add(capability.text)
return capabilities
def input_fields_validator(fn): # noqa C901
@wraps(fn)
async def wrapper(self, *args, **kwargs):
for i, arg in enumerate(args):
if arg is None:
raise ValidationErrorException(
f"The ({i + 1})th argument cannot be None."
)
elif isinstance(arg, ttypes.Device):
_check_device(arg)
elif isinstance(arg, ttypes.Session):
_check_session(arg)
# This if-statememt will match device_to_commands or device_to_configlets
elif isinstance(arg, dict):
for device in arg:
_check_device(device)
for argument, val in kwargs.items():
if argument == "command" and not val:
raise ValidationErrorException(
"Required argument (command) cannot be None."
)
elif argument == "device":
_check_device(val)
elif argument == "session":
_check_session(val)
elif argument == "device_to_commands" or argument == "device_to_configlets":
if not val:
raise ValidationErrorException(
f"Required argument ({argument}) cannot be None."
)
for device in val:
_check_device(device)
return await fn(self, *args, **kwargs)
return wrapper
class IPUtils:
@classmethod
def proxy_required(cls, ip: str) -> bool:
"""
Returns a boolean stating whether an IP address requires proxy connectivity
"""
return False
@classmethod
def should_nat(cls, ip: str, service: Optional["FcrServiceBase"] = None) -> bool:
"""
Returns a boolean stating whether an IP address requires NAT connectivity
"""
return False
@classmethod
async def translate_address(
cls, ip: str, service: Optional["FcrServiceBase"] = None
) -> str:
"""
Returns the translated address (NAT) for a given IP address
"""
return ip
@classmethod
def check_ip(cls, ip: str, service: Optional["FcrServiceBase"] = None) -> bool:
"""
Returns a boolean stating whether an IP address is good for use
Common indicators are pingability / reachability
"""
return True
@classmethod
def is_mgmt_ip(cls, ip: DeviceIP) -> bool:
"""
Returns a boolean stating whether an IP address is a management IP or not
"""
return False
@classmethod
def get_ip(
cls, options: Dict[str, Any], devinfo: "DeviceInfo", service: "FcrServiceBase"
) -> List[IPInfo]:
"""
Returns list of Tuple with IP address of the given DeviceInfo and whether it is pingable or not.
first_ip = devinfo.get_ip(...)[0]
ip_address = first_ip.addr
is_pingable = first_ip.is_pingable
"""
# If user specified an ip address, then use it directly
ip_list: List[IPInfo] = []
ip_address = options.get("ip_address")
if ip_address:
return [IPInfo(ip_address, cls.check_ip(ip_address, service))]
# If use_mgmt_ip is True, then return list of MGMT IP addresses
use_mgmt_ip = options.get("mgmt_ip", False)
if use_mgmt_ip:
devinfo.inc_counter("device_info.mgmt_ip")
ip_list = cls._get_ip_list(
use_mgmt_ip=True, service=service, devinfo=devinfo
)
if len(ip_list) == 0:
# No valid MGMT IPs were found when user specifies use_mgmt_ip, raise
# LookupError
raise LookupErrorException(
"User has set 'mgmt_ip=True' in the request but no mgmt ip is "
f"found for {devinfo.hostname}"
)
return ip_list
# Return all valid IP addresses sorted by pingability
devinfo.inc_counter("device_info.default_ip")
ip_list = cls._get_ip_list(
use_mgmt_ip=use_mgmt_ip, service=service, devinfo=devinfo
)
if len(ip_list) == 0:
# None of the IPs is valid, raise LookupError
raise LookupErrorException(
f"No Valid IP address was found for the device {devinfo.hostname}"
)
return ip_list
@classmethod
def _get_ip_list(
cls, devinfo: "DeviceInfo", service: "FcrServiceBase", use_mgmt_ip: bool = False
) -> List[IPInfo]:
"""
A helper method for get_ip method
"""
pingable_list: List[IPInfo] = []
non_pingable_list: List[IPInfo] = []
for ip in devinfo._pref_ips + [devinfo._ip]:
# ip.addr is None
if not ip.addr:
continue
# Check if MGMT IP and go to the next IP if current IP is not MGMT
if use_mgmt_ip and not cls.is_mgmt_ip(ip):
continue
# Check if its pingable
if cls.check_ip(ip, service):
pingable_list.append(IPInfo(ip.addr, True))
else:
if ip.addr == devinfo._ip.addr:
non_pingable_list = [IPInfo(ip.addr, False)] + non_pingable_list
else:
non_pingable_list.append(IPInfo(ip.addr, False))
# Give preference to IPs that are pingable
return pingable_list + non_pingable_list