fbnet/command_runner/device_db.py (66 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import typing
from .base_service import PeriodicServiceTask
from .exceptions import LookupErrorException, NotImplementedErrorException
from .options import Option
if typing.TYPE_CHECKING:
from fbnet.command_runner_asyncio.CommandRunner.ttypes import Device
from .device_info import DeviceIP, DeviceInfo
class BaseDeviceDB(PeriodicServiceTask):
"""
Interface to device database.
Adapt this to get devices from your backend system.
"""
DEVICE_DB_UPDATE_INTERVAL = Option(
"--device_db_update_interval",
help="device db update interval (in seconds)",
type=int,
default=30 * 60,
)
DEVICE_NAME_FILTER = Option(
"--device_name_filter",
help="A regex to restrict the database to matching device names",
)
def __init__(self, service, name=None, period=None):
super().__init__(
service,
name or self.__class__.__name__,
period=period or self.DEVICE_DB_UPDATE_INTERVAL,
)
self._data_valid = False
self._devices = {}
@property
def data_valid(self):
return self._data_valid
async def run(self):
"""
Fetch data for devices. This is called periodically.
Here we are reloading data from JSON file periodically. How you get
your data depends on your local setup. Override this according to your
setup
"""
try:
await self._fetch_devices(name_filter=self.DEVICE_NAME_FILTER)
except Exception as ex:
self.service.logger.error(
f"Failed to fetch device info, error message: {str(ex)}"
)
else:
self._data_valid = True
async def _fetch_devices(self, name_filter=None, hostname=None):
devices = await self._fetch_device_data(name_filter, hostname)
for d in devices:
self._devices[d.hostname] = d
if d.alias:
self._devices[d.alias] = d
async def _fetch_device_data(self, name_filter=None, hostname=None):
"""
Fetch device data
Override this to get the device information from your backend systems
"""
raise NotImplementedErrorException(
"Please implement this to get host information"
)
async def wait_for_data(self):
"""Wait for the data to be fetched"""
while not self._data_valid:
self.logger.info("Waiting for data")
await self.wait()
self.logger.info("Device data valid")
async def get(
self, device: typing.Union["Device", str], autofetch: bool = True
) -> "DeviceInfo":
"""
Get device information for a given device.
* First we lookup in our local cache
* If not found then we will try to fetch the specific device from backend if autofetch is true
device parameter can be the hostname string or the Device struct
"""
hostname = device if isinstance(device, str) else device.hostname
if hostname not in self._devices and autofetch:
# Try to fetch the device info
await self._fetch_devices(hostname=hostname)
if hostname in self._devices:
# Found the device
return self._devices.get(hostname)
# still not able to find device, raise an exeception
raise LookupErrorException("Device not found", hostname)
def is_pingable(self, ip: typing.Union[str, "DeviceIP"]) -> bool:
raise NotImplementedErrorException(
"Please implement this to check if an ip is pingable"
)