fboss/lib/link_snapshots/snapshot_lib.py (266 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
from __future__ import annotations
import enum
import logging
import re
import typing as t
from datetime import datetime, timedelta
import MySQLdb
from libfb.py.employee import get_current_unix_user_fbid
from neteng.fboss.phy.phy.types import PhyInfo, LinkSnapshot
from neteng.fboss.transceiver.types import TransceiverInfo
from nettools.nowa.building_blocks.all.ngt.link_check.common import ssh_util
from rfe.client_py3 import get_client as get_rfe_client
from rfe.RockfortExpress.types import QueryCommon
from thrift.py3 import deserialize, Protocol
T = t.TypeVar("T")
NGT_SERVICE_FBID = 89002005347827
SQL = """
SELECT
`msg`
FROM `network_event_log`
WHERE
((`device_name`) IN ('{}'))
AND ((`originator`) IN ('wedge_agent', 'qsfp_service'))
AND (CONTAINS(`msg`, ARRAY('linkSnapshot')))
AND (CONTAINS(`msg`, ARRAY('<port:{}>')))
AND `timestamp` BETWEEN {} and {}
"""
SNAPSHOT_FORMAT = r"{}LINK_SNAPSHOT_EVENT(LINK_SNAPSHOT): Collected snapshot for ports <port:{}> <linkSnapshot:{}>"
SNAPSHOT_REGEX = (
SNAPSHOT_FORMAT.replace("(", r"\(")
.replace(")", r"\)")
.replace(r"<port:{}> ", r"(<port:{}> )+")
.format(r".*", r"eth\d+/\d+/\d+", r"(.*)")
)
DEFAULT_TIME_RANGE = timedelta(minutes=1)
FETCH_SNAPSHOT_LOG_COMMAND = "zgrep 'LINK_SNAPSHOT_EVENT' {} | grep {} | grep -v sshd"
ARCHIVE_PATH = "/var/facebook/logs/fboss/archive/"
AGENT_CURRENT_LOG = "/var/facebook/logs/fboss/wedge_agent.log"
QSFP_CURRENT_LOG = "/var/facebook/logs/fboss/qsfp_service.log"
FILENAME_REGEX = (
ARCHIVE_PATH
+ r"(?:wedge_agent|qsfp_service).log-(\d\d\d\d)(\d\d)(\d\d)(\d\d)(\d\d).gz"
)
DEFAULT_LOGGER = logging.getLogger()
DEFAULT_LOGGER.setLevel("INFO")
class Backend(enum.Enum):
SCUBA = enum.auto()
SSH = enum.auto()
class SnapshotCollection:
def __init__(
self,
iphy_snaps: t.Mapping[int, PhyInfo],
xphy_snaps: t.Mapping[int, PhyInfo],
tcvr_snaps: t.Mapping[int, TransceiverInfo],
):
self._iphy_snaps: t.Mapping[int, PhyInfo] = iphy_snaps
self._xphy_snaps: t.Mapping[int, PhyInfo] = xphy_snaps
self._tcvr_snaps: t.Mapping[int, TransceiverInfo] = tcvr_snaps
# unpacks (iphy, xphy, transceiver) snapshots
def unpack(
self,
) -> t.Tuple[
t.Mapping[int, PhyInfo],
t.Mapping[int, PhyInfo],
t.Mapping[int, TransceiverInfo],
]:
return self._iphy_snaps, self._xphy_snaps, self._tcvr_snaps
def empty(self) -> bool:
return not (
len(self._iphy_snaps) or len(self._xphy_snaps) or len(self._tcvr_snaps)
)
def escape_sql(sql: str) -> str:
return MySQLdb.escape_string(sql.encode()).decode()
# convert Datetime object to nanosecond timestamp
def datetime_to_ns(dt: datetime) -> int:
return int(dt.timestamp() * 1e9)
def filter_timeseries(
mapping: t.Mapping[int, T], time_start: datetime, time_end: datetime
) -> t.Mapping[int, T]:
return {
ts: v
for ts, v in mapping.items()
if time_start.timestamp() <= ts <= time_end.timestamp()
}
class SnapshotClient:
def __init__(self, user: int, logger: logging.Logger = DEFAULT_LOGGER):
self._user = user
self._logger: logging.Logger = logger
async def get_logfiles_in_timeframe(
self, hostname: str, time_start: datetime, time_end: datetime
) -> t.List[str]:
# returns the mapping of timestamp -> logfile
def parse_timestamps(filenames: t.List[str]) -> t.List[t.Tuple[float, str]]:
timestamps: t.List[t.Tuple[float, str]] = []
for filename in filenames:
match = re.fullmatch(FILENAME_REGEX, filename)
if match is None:
raise Exception(
"File has invalid timestamp format\nFile name was: {}".format(
filename
)
)
else:
dt = datetime(
year=int(match.group(1)),
month=int(match.group(2)),
day=int(match.group(3)),
hour=int(match.group(4)),
minute=int(match.group(5)),
)
timestamps.append((dt.timestamp(), filename))
return timestamps
# Filter which logfiles could possibly contain the given timeframe.
def filter_by_timestamp(
log_timestamps: t.List[t.Tuple[float, str]], current_log: str
) -> t.List[str]:
possible_logfiles: t.List[str] = []
timestamp: float = 0
for timestamp, filename in log_timestamps:
# get all log files ending between the ranges, along with the first one after time_end
if (
time_start.timestamp() <= timestamp <= time_end.timestamp()
or timestamp >= time_end.timestamp()
):
possible_logfiles.append(filename)
if timestamp >= time_end.timestamp():
break
if (
len(log_timestamps) == 0
or time_end.timestamp() >= log_timestamps[-1][0]
):
possible_logfiles.append(current_log)
return possible_logfiles
logfiles = (
(
await ssh_util.run_ssh_cmd(
hostname,
# List the archive directory, look for wedge_agent and qsfp
# archives. Don't raise exception if output is empty (error
# code 1).
f"find {ARCHIVE_PATH} | grep -e wedge_agent -e qsfp_service || [[ $? == 1 ]]",
self._logger,
)
)
.strip()
.split("\n")
)
# split("") = ['']. Remove empty strs to clean this up
logfiles = [x for x in logfiles if x != ""]
log_timestamps = parse_timestamps(logfiles)
return filter_by_timestamp(
[(ts, log) for ts, log in log_timestamps if "wedge_agent" in log],
AGENT_CURRENT_LOG,
) + filter_by_timestamp(
[(ts, log) for ts, log in log_timestamps if "qsfp_service" in log],
QSFP_CURRENT_LOG,
)
# fetches snapshots within the past 3 hours
async def fetch_recent_snapshots(
self,
hostname: str,
port_name: str,
backend: Backend = Backend.SSH,
) -> SnapshotCollection:
return await self.fetch_snapshots_around_time(
hostname, port_name, datetime.now(), timedelta(hours=3), backend
)
# Fetch the snapshots posted between (timestamp - time_delta) and (timestamp + time_delta)
# Note that for scuba this filters based on the time that the snapshot was posted,
# and not the time that the snapshot was collected
async def fetch_snapshots_around_time(
self,
hostname: str,
port_name: str,
timestamp: datetime,
time_delta: timedelta = DEFAULT_TIME_RANGE,
backend: Backend = Backend.SSH,
) -> SnapshotCollection:
return await self.fetch_snapshots_in_timespan(
hostname, port_name, timestamp - time_delta, timestamp + time_delta, backend
)
async def fetch_snapshots_in_timespan(
self,
hostname: str,
port_name: str,
time_start: datetime,
time_end: datetime,
backend: Backend = Backend.SSH,
) -> SnapshotCollection:
self._logger.info(
f"Fetching snapshots for host {hostname}:{port_name} between {time_start.isoformat()} and {time_end.isoformat()} via {str(backend)})"
)
if backend == Backend.SSH:
return await self.fetch_snapshots_in_timespan_via_ssh(
hostname, port_name, time_start, time_end
)
elif backend == Backend.SCUBA:
return await self.fetch_snapshots_in_timespan_via_scuba(
hostname, port_name, time_start, time_end
)
else:
raise Exception(
f"Invalid backend passed to fetch_snapshots_in_timespan: {str(backend)}"
)
async def fetch_snapshots_in_timespan_via_ssh(
self,
hostname: str,
port_name: str,
time_start: datetime,
time_end: datetime,
) -> SnapshotCollection:
possible_logfiles: t.List[str] = await self.get_logfiles_in_timeframe(
hostname, time_start, time_end
)
cmd = FETCH_SNAPSHOT_LOG_COMMAND.format(" ".join(possible_logfiles), port_name)
output = (
await ssh_util.run_ssh_cmd(
hostname,
cmd,
self._logger,
len(possible_logfiles) * ssh_util.BYTES_STDOUT_LIMIT,
)
).strip()
collection = await self.process_snapshot_lines(output.split("\n"))
iphy, xphy, tcvr = collection.unpack()
iphy = filter_timeseries(iphy, time_start, time_end)
xphy = filter_timeseries(xphy, time_start, time_end)
tcvr = filter_timeseries(tcvr, time_start, time_end)
return SnapshotCollection(iphy, xphy, tcvr)
async def fetch_snapshots_in_timespan_via_scuba(
self,
hostname: str,
port_name: str,
time_start: datetime,
time_end: datetime,
) -> SnapshotCollection:
qc = QueryCommon(
source="snapshot_manager",
user_id=self._user,
instance="network_event_log",
)
sql = SQL.format(
escape_sql(hostname),
escape_sql(port_name),
datetime_to_ns(time_start),
datetime_to_ns(time_end),
)
async with get_rfe_client() as client:
scuba_entries = await client.querySQL(qc, sql)
lines = [row[0] for row in scuba_entries.value]
return await self.process_snapshot_lines(lines)
async def process_snapshot_lines(self, lines: t.List[str]) -> SnapshotCollection:
iphy_snapshots: t.Dict[int, PhyInfo] = {}
xphy_snapshots: t.Dict[int, PhyInfo] = {}
tcvr_snapshots: t.Dict[int, TransceiverInfo] = {}
for line in lines:
match = re.fullmatch(SNAPSHOT_REGEX, line)
if match is None:
raise Exception(
"link snapshot has invalid format\nSnapshot was: {}".format(line)
)
else:
snapshot_str = match.group(2)
# TODO(ccpowers): remove this line once we fix qsfp service to
# not read compliance code on CMIS modules (which don't support it)
snapshot_str = snapshot_str.replace(
'"extendedSpecificationComplianceCode":32,', ""
)
snapshot = deserialize(
LinkSnapshot, snapshot_str.encode(), protocol=Protocol.JSON
)
if snapshot.type is LinkSnapshot.Type.phyInfo:
ts = snapshot.phyInfo.timeCollected
if snapshot.phyInfo.system is None:
iphy_snapshots[ts] = snapshot.phyInfo
else:
xphy_snapshots[ts] = snapshot.phyInfo
elif snapshot.type is LinkSnapshot.Type.transceiverInfo:
ts = snapshot.transceiverInfo.timeCollected
if ts is None:
raise Exception("No timestamp found for transceiver snapshots")
tcvr_snapshots[ts] = snapshot.transceiverInfo
else:
raise Exception("Invalid type for link snapshot: ", snapshot.type)
return SnapshotCollection(iphy_snapshots, xphy_snapshots, tcvr_snapshots)
async def get_client(
user: t.Optional[int] = None, logger: logging.Logger = DEFAULT_LOGGER
) -> SnapshotClient:
if not user:
user = get_current_unix_user_fbid() or NGT_SERVICE_FBID
return SnapshotClient(user=user, logger=logger)