pci_lib/pci_lib.py (627 lines of code) (raw):
'''
Copyright (c) Facebook, Inc. and its affiliates.
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.
'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from collections import defaultdict, namedtuple
from os.path import realpath
from contextlib import contextmanager, closing
from pci_vpd_lib.pci_vpd_lib import (
VitalProductDataReader,
VPDDataException,
)
from six import int2byte, iterbytes
import re
import os
import logging
import struct
import subprocess
log = logging.getLogger(__name__)
class cached_property(object):
"""
A property that is only computed once per instance and then replaces itself
with an ordinary attribute. Deleting the attribute resets the property.
"""
def __init__(self, func):
self.func = func
self.__doc__ = func.__doc__
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
CLASS_ALIASES = { # noqa
'nvme': 0x010802,
'ethernet': 0x020000,
'raid': 0x010400,
'gpu': 0x030200,
}
PCI_CLASS_MASKS = {
'nvme': (0x010802, 0xFFFFFF),
'ethernet': (0x020000, 0xFFFF00),
'raid': (0x010400, 0xFFFF00),
'gpu': (0x030000, 0xFF0000),
}
SYSFS_PCI_BUS_DEVICES = "/sys/bus/pci/devices/"
PCI_HAS_CAP_LIST = 0x10
PCI_CONFIG_VENDOR_OFFSET = 0x4
PCI_CAP_LIST_PTR = 0x34
PCI_CAP_FLAGS = 0x2
PCI_CAP_EXPRESS = 0x10
PCI_EXP_LNKCAP = 0xc
PCI_EXP_LNKSTA = 0x12
PCI_EXP_LNKCTL2 = 0x30
PCI_EXP_SLTCAP = 0x14
PCI_EXP_SLTCTL = 0x18
PCI_EXP_SLTSTA = 0x1a
PCI_EXP_SLOT_CAP_POWER = 1 << 1
PCI_EXP_SLOT_CAP_ATTN_LED = 1 << 3
PCI_EXP_SLOT_CTL_POWER = 1 << 10
PCI_EXP_SLOT_PRESENCE = 1 << 6
PCI_EXP_SLOT_CTL_ATTN_LED_MASK = 0x00c0
PCI_EXPRESS_FLAG_SLOT = 0x0100
PCI_STATUS_REGISTER = 0x6
PCI_EXP_SLOT_CTL_ATTN_LED_VALUES = {
0x00: 'reserved',
0x00c0: 'off',
0x0080: 'blink',
0x0040: 'on',
}
# Long PCI address format is as follows
# Domain(32bits):Bus(8bits):Device(5bits):Function(3bits)
# Domain is *not* always 0! (ARM systems have multiple ones)
LONG_PCI_ADDR_REGEX = re.compile(
r'^([0-9a-fA-F]{2,8}):([0-9a-fA-F]{2}):([01][0-9a-fA-F])[:\.]0*([0-7])$')
# Short PCI address format is as follows
# Bus(8bits):Device(5bits).Function(3bits)
SHORT_PCI_ADDR_REGEX = re.compile(r'^([0-9a-fA-F]{2}):([01][0-9a-fA-F])\.([0-7])$')
class NonZeroDomain(Exception):
"""
Cannot shorten PCI addrs with a non-zero Domain
"""
pass
class CapabilityDecodeError(Exception):
"""
Could not decode PCI capabilities from PCI config space
"""
pass
def read_u32(data, offset):
r, = struct.unpack('<L', data[offset:offset + 4])
return r
def read_u16(data, offset):
r, = struct.unpack('<H', data[offset:offset + 2])
return r
def read_u8(data, offset):
r, = struct.unpack('B', data[offset:offset + 1])
return r
def find_capability(config, cap):
"""
Finds the offset of a particular capability register in a PCI configuration
space
"""
vendor = read_u16(config, PCI_CONFIG_VENDOR_OFFSET)
if vendor == 0xffff:
# This indicates the device has likely gone missing
config.exceptions.add(
CapabilityDecodeError('PCI config space for device is inaccessible')
)
return None
status = read_u16(config, PCI_STATUS_REGISTER)
if (status & PCI_HAS_CAP_LIST) == 0:
return None
# detect looping
config.been_there = defaultdict(bool)
pos = PCI_CAP_LIST_PTR
while pos < len(config):
if config.been_there[pos]:
config.exceptions.add(
CapabilityDecodeError('Detected looping in capability decoding')
)
return None
config.been_there[pos] = True
pos = read_u8(config, pos)
cap_id = read_u8(config, pos)
if cap_id == cap:
return pos
pos += 1
# we exhausted the config space without finding it
@contextmanager
def raw_open(path, flags):
fd = os.open(path, flags)
yield fd
os.close(fd)
EXPRESS_TYPES = {
0x0: 'endpoint',
0x1: 'legacy_endpoint',
0x4: 'root_port',
0x5: 'upstream_port',
0x6: 'downstream_port',
0x7: 'pci_bridge',
0x8: 'pcie_bridge',
0x9: 'root_complex_endpoint',
0xa: 'root_complex_event_collector',
}
EXPRESS_SPEED = {
1: "2.5GT/s",
2: "5GT/s",
3: "8GT/s",
4: "16GT/s",
}
PCIExpressLink = namedtuple('PCIExpressLink',
('cur_speed', 'cur_width',
'capable_speed', 'capable_width',
'target_speed'))
PCIExpressSlot = namedtuple('PCIExpressSlot',
('slot', 'presence', 'power', 'attn_led'))
def align32(start, stop):
# Mask off low bits of start to align down
astart = start & ~0b11
# If stop is aligned leave as is
if stop & 0b11 == 0:
astop = stop
else:
# Otherwise align-down and add 4
astop = (stop & ~0b11) + 4
return astart, astop
class PCIConfigSpace(object):
"""Caching view of a PCI(e) device's config space
Index with slices to get bytes
Usage:
config = PCIConfigSpace.get(address)
bytes = config[10:15]
"""
configspaces = {}
deferclose = False
defer = set()
def __init__(self, devname):
if devname in PCIConfigSpace.configspaces:
raise Exception(
'Attempt to open PCI config space twice for device: {}'.
format(self)
)
self.device_name = devname
self.path = '{}{}/config'.format(
SYSFS_PCI_BUS_DEVICES, self.device_name
)
self.fd = None
self.open()
self.size = self._stat_size()
self.cache = [None] * self.size
self.been_there = defaultdict(bool)
self.exceptions = set()
PCIConfigSpace.configspaces[devname] = self
@classmethod
def begin_defer(cls):
cls.deferclose = True
@classmethod
def end_defer(cls):
cls.deferclose = False
for inst in cls.defer:
inst.close()
cls.defer = set()
def _stat_size(self):
return os.fstat(self.fd).st_size
# soft errors which are of note but do not end execution
def encountered_errors(self):
return self.exceptions
# use after closing to determine entire affected PCI config space
# use during to determing space used for certain operations
def used_config_space(self):
return list(self.been_there.keys())
def open(self):
if self.fd is None:
self.fd = os.open(self.path, os.O_RDONLY)
def close(self):
if PCIConfigSpace.deferclose:
PCIConfigSpace.defer.add(self)
return
if self.fd is not None:
os.close(self.fd)
self.fd = None
def __getitem__(self, k):
if isinstance(k, slice):
if k.step is not None:
raise Exception('Cannot step in PCI config space')
start = k.start
stop = k.stop
else:
raise Exception('Can only use slices to index config space')
if stop > self.size:
raise Exception(
'Attempt to read at {}, past end of config space: {}'.format(
(start, stop), self
)
)
data = self.cache[start:stop]
if any(d is None for d in data):
self.open()
# Align to 32 bit words
rstart, rstop = align32(start, stop)
os.lseek(self.fd, rstart, os.SEEK_SET)
data = os.read(self.fd, rstop - rstart)
self.cache[rstart:rstop] = list(iterbytes(data))
data = self.cache[start:stop]
return b''.join(int2byte(b) for b in data)
def __len__(self):
return self.size
@classmethod
def get(cls, devname):
if devname in cls.configspaces:
return cls.configspaces[devname]
return PCIConfigSpace(devname)
def __repr__(self):
return 'PCIConfigSpace(\'{}\', sz={})'.format(
self.device_name, self.size
)
@contextmanager
def defer_closes():
"""
Use with `with` to make pci_lib reuse fds and defer closing them until the
end of your with block.
Usage:
with pci_lib.defer_closes():
[... do some stuff with PCIDevice objects ...]
"""
try:
PCIConfigSpace.begin_defer()
yield
finally:
PCIConfigSpace.end_defer()
class PCIDevice(namedtuple('PCIDevice', ('device_name', 'vendor_id',
'device_id', 'class_id',
'subsystem_vendor',
'subsystem_device'))):
@property
def domain_id(self):
return int(self.device_name.split(":", 1)[0], 16)
@property
def bus_id(self):
return int(self.device_name.split(":", 2)[1], 16)
@property
def bus_device_id(self):
return int(self.device_name.rsplit(":", 1)[1].split(".", 1)[0], 16)
@property
def device_function_id(self):
return int(self.device_name.rsplit(".", 1)[1], 16)
@cached_property
def parent(self):
"""
Get this device's parent device.
@return a PCIDevice, or None if this device is a top-level device
"""
mypath = os.path.join(SYSFS_PCI_BUS_DEVICES, self.device_name)
mypath = realpath(mypath)
parentpath = realpath(os.path.join(mypath, '..'))
parentdev = os.path.basename(parentpath)
if not re.match(LONG_PCI_ADDR_REGEX, parentdev):
return None
return map_pci_device(parentdev)
@cached_property
def name(self):
"""
Human readable device name, if available. Uses
/usr/share/hwdata/pci.ids to map vendor and device ID to names.
"""
vendor, device = lookup_device(self.vendor_id, self.device_id)
if vendor and device:
return '{} ({:04x}) {} ({:04x})'.format(
vendor, self.vendor_id, device, self.device_id)
if vendor:
return '{} ({:04x}), device {:04x}'.format(
vendor, self.vendor_id, self.device_id)
return '{:04x}:{:04x}'.format(self.vendor_id, self.device_id)
@cached_property
def location(self):
"""
A string describing how this device is connected to the system
None if this is a built-in device
"""
path = self.get_path()
slotmap = get_dmidecode_pci_slots()
connection = path[1:]
if connection:
pathels = []
for dev in connection:
pathpart = []
dmislot = slotmap.get(dev.device_name)
if dmislot:
pathels.append(dmislot['designation'])
continue
explink = dev.express_link
exptype = dev.express_type
slot = dev.express_slot
if slot:
pathpart.append('slot {}'.format(slot.slot))
if explink:
pathpart.append('{}'.format(exptype))
if pathpart:
pathels.append(', '.join(pathpart))
return ' -> '.join(pathels)
else:
return None
@cached_property
def vpd(self):
vpdfile = '{}{}/vpd'.format(SYSFS_PCI_BUS_DEVICES, self.device_name)
if os.path.exists(vpdfile):
try:
vpdreader = VitalProductDataReader(vpdfile)
return {'identifier_string': vpdreader.identifier_string,
'fields': vpdreader.fields}
except VPDDataException:
return None
except OSError:
# VPD was unreadable, rather than readable but unparseable.
return None
return None
def get_path(self):
"""
Get a list of PCIDevices including this device and all devices up to
the root port it is connected to.
"""
path = []
current = self
while current:
path.append(current)
current = current.parent
return path
def get_debugging_details(self):
"""
Get a set of errors we encountered while trying to request information
about the device until this point, as well as the used config space.
"""
with closing(PCIConfigSpace.get(self.device_name)) as config:
return (config.encountered_errors(), config.used_config_space())
@cached_property
def express_cap_version(self):
"""
Version number of the Express capability register.
None if not a PCIe device.
"""
with closing(PCIConfigSpace.get(self.device_name)) as config:
express = find_capability(config, PCI_CAP_EXPRESS)
if express is None:
return None
flags = read_u16(config, express + PCI_CAP_FLAGS)
version = flags & 0xf
return version
@cached_property
def express_slot(self):
"""
Retrieve PCI-Express slot information.
Returns None if this is not a PCIe slot, otherwise returns a
PCIExpressSlot
The 'power' field will be a bool indicating this slot's current power
setting if it has power control capability, otherwise the 'power' field
will be set to None.
"""
with closing(PCIConfigSpace.get(self.device_name)) as config:
express = find_capability(config, PCI_CAP_EXPRESS)
if express is None:
return None
flags = read_u16(config, express + PCI_CAP_FLAGS)
if (flags & PCI_EXPRESS_FLAG_SLOT) == 0:
return None
slotcap = read_u32(config, express + PCI_EXP_SLTCAP)
slotstatus = read_u32(config, express + PCI_EXP_SLTSTA)
slotctl = read_u32(config, express + PCI_EXP_SLTCTL)
slotnum = slotcap >> 19
presence = (slotstatus & PCI_EXP_SLOT_PRESENCE) != 0
if slotcap & PCI_EXP_SLOT_CAP_ATTN_LED != 0:
attn_led = PCI_EXP_SLOT_CTL_ATTN_LED_VALUES.get(
slotctl & PCI_EXP_SLOT_CTL_ATTN_LED_MASK, 'off')
else:
attn_led = 'unsupported'
power = None
if (slotcap & PCI_EXP_SLOT_CAP_POWER) != 0:
# 1 is powered -off-, 0 is powered -on-
power = (slotctl & PCI_EXP_SLOT_CTL_POWER) == 0
return PCIExpressSlot(slotnum, presence, power, attn_led)
@cached_property
def express_aer(self):
"""
Retrieve the device's PCIe Advanced Error Reporting (AER) statistics,
if the device is AER capable and the kernel provides the corresponding
pseudo fs interface under /sys/bus/pci/devices/. The information is
gleaned from the following files, when present:
For devices:
/sys/bus/pci/devices/<dev>/aer_dev_correctable
/sys/bus/pci/devices/<dev>/aer_dev_fatal
/sys/bus/pci/devices/<dev>/aer_dev_nonfatal
along with the following for Root Port error counts:
/sys/bus/pci/devices/<dev>/aer_rootport_total_err_cor
/sys/bus/pci/devices/<dev>/aer_rootport_total_err_fatal
/sys/bus/pci/devices/<dev>/aer_rootport_total_err_nonfatal
Returns a dictionary with device and rootport dictionaries containing
various key/value pairs or counts provided via the pseudo fs files.
Empty device/rootport dictionaries are not included and None is
returned when no AER information has been found.
"""
if self.express_type is None:
return None
aer = {}
dev_stats = aer_dev_stats(self.device_name, ['aer_dev_correctable',
'aer_dev_fatal',
'aer_dev_nonfatal'])
if dev_stats is not None:
aer["device"] = dev_stats
if self.express_type == 'root_port':
rp_counts = aer_rootport_counts(self.device_name,
['aer_rootport_total_err_cor',
'aer_rootport_total_err_fatal',
'aer_rootport_total_err_nonfatal'])
if rp_counts is not None:
aer["rootport"] = rp_counts
if len(aer) == 0:
return None
return aer
@cached_property
def express_type(self):
"""
@return PCI-Express device type, None if this is a PCI device.
"""
with closing(PCIConfigSpace.get(self.device_name)) as config:
express = find_capability(config, PCI_CAP_EXPRESS)
if express is None:
return None
flags = read_u16(config, express + PCI_CAP_FLAGS)
exptype = EXPRESS_TYPES.get((flags & 0xf0) >> 4)
return exptype
@cached_property
def express_link(self):
"""
Retrieve the PCI-Express link status.
Returns None if this is not a PCIe device, otherwise returns a
PCIExpressLink
"""
with closing(PCIConfigSpace.get(self.device_name)) as config:
express = find_capability(config, PCI_CAP_EXPRESS)
if express is None:
return None
flags = read_u16(config, express + PCI_CAP_FLAGS)
exptype = EXPRESS_TYPES.get((flags & 0xf0) >> 4)
if exptype is None:
return None
if exptype not in ['root_complex_endpoint',
'root_complex_event_collector']:
lnksta = read_u16(config, express + PCI_EXP_LNKSTA)
lnkcap = read_u16(config, express + PCI_EXP_LNKCAP)
lnkctl2 = None
if self.express_cap_version >= 2:
lnkctl2 = read_u16(config, express + PCI_EXP_LNKCTL2)
cur_speed = EXPRESS_SPEED.get(lnksta & 0xf, 'unknown')
cur_width = ((lnksta & 0x3f0) >> 4)
capable_speed = EXPRESS_SPEED.get(lnkcap & 0xf, 'unknown')
capable_width = ((lnkcap & 0x3f0) >> 4)
target_speed = None
if lnkctl2 is not None:
target_speed = EXPRESS_SPEED.get(lnkctl2 & 0xf, None)
return PCIExpressLink(cur_speed, cur_width,
capable_speed, capable_width, target_speed)
def __repr__(self):
return ('<PCIDevice vendor_id=0x{p.vendor_id:04X} '
'device_id=0x{p.device_id:04X} class_id=0x{p.class_id:04x} '
'domain_id=0x{p.domain_id:04X} bus_id=0x{p.bus_id:04X} '
'bus_device_id=0x{p.bus_device_id:04X} '
'device_function_id=0x{p.device_function_id:04X}>').format(
p=self)
def __str__(self):
return self.device_name
def get_dmidecode_pci_slots():
slotmap = {}
try:
dmiout = subprocess.check_output(['/sbin/dmidecode', '-t', 'slot'])
dmiout = dmiout.decode('utf-8')
slots = []
slot = None
for line in dmiout.splitlines():
line = line.strip()
if line == 'System Slot Information':
slot = {}
if ': ' in line:
k, v = line.split(': ', 1)
slot[k] = v
if slot and not line:
slots.append(slot)
slot = {}
if slot and not line:
slots.append(slot)
for slot in slots:
try:
addr = slot['Bus Address'].lower()
slotmap[addr] = {
'designation': slot['Designation'],
'type': slot['Type'],
}
except KeyError:
# skip
pass
except Exception:
pass
return slotmap
def aer_dev_stats(device_name, stat_names):
"""Gather PCIe device AER information, when available."""
dev_stats = {}
device_name = expand_pci_addr(device_name)
if not device_name:
return None
for stat_name in stat_names:
filename = os.path.join(SYSFS_PCI_BUS_DEVICES, device_name, stat_name)
if not os.path.isfile(filename):
continue
with open(filename) as file_obj:
# For AER device stats we expect multiple lines, each with a key
# and value. For example:
# RxErr 0
# BadTLP 0
# BadDLLP 0
stats = {}
for line in file_obj.readlines():
key, value = line.strip().split()
try:
stats[key] = int(value)
except ValueError:
pass
dev_stats[stat_name] = stats
if len(dev_stats) == 0:
return None
return dev_stats
def aer_rootport_counts(device_name, count_names):
"""Gather PCIe root port device AER error counts, when available."""
rootport_counts = {}
device_name = expand_pci_addr(device_name)
if not device_name:
return None
for count_name in count_names:
filename = os.path.join(SYSFS_PCI_BUS_DEVICES, device_name, count_name)
if not os.path.isfile(filename):
continue
with open(filename) as file_obj:
# For AER root port counts we expect a single line with
# the integer error count that is associated with the
# specific count_name. We'll use the count_name for the
# value's key.
try:
rootport_counts[count_name] = int(file_obj.readline())
except ValueError:
pass
if len(rootport_counts) == 0:
return None
return rootport_counts
def map_pci_device(device_name):
device_name = expand_pci_addr(device_name)
if not device_name:
return None
dev_path = os.path.join(SYSFS_PCI_BUS_DEVICES, device_name)
with open(os.path.join(dev_path, "vendor")) as vendor_fd:
vendor = int(vendor_fd.read(), 16)
with open(os.path.join(dev_path, "device")) as device_fd:
device = int(device_fd.read(), 16)
with open(os.path.join(dev_path, "class")) as pci_class_fd:
pci_class = int(pci_class_fd.read(), 16)
with open(os.path.join(dev_path, "subsystem_vendor")) as pci_sv_fd:
pci_subvendor = int(pci_sv_fd.read(), 16)
with open(os.path.join(dev_path, "subsystem_device")) as pci_ss_fd:
pci_subsystem = int(pci_ss_fd.read(), 16)
return PCIDevice(device_name, vendor, device, pci_class,
pci_subvendor, pci_subsystem)
def list_devices():
for device in os.listdir(SYSFS_PCI_BUS_DEVICES):
yield map_pci_device(device)
def find_devices(**kwargs):
# XXX: check kwargs against PCIDevice
for pci_device in list_devices():
for key, val in kwargs.items():
v = getattr(pci_device, key)
if isinstance(val, (list, tuple, set)):
if v not in val:
break
else:
if v != val:
break
else:
# all specified keys match
yield pci_device
def expand_pci_addr(pci_addr):
'''
Convert a possibly shortened PCI address to its expanded form, including
normalizing the formatting of long addresses
'''
m1 = LONG_PCI_ADDR_REGEX.match(pci_addr)
m2 = SHORT_PCI_ADDR_REGEX.match(pci_addr)
if m1:
domain, bus, device, function = \
map(lambda n: int(n, 16), m1.groups())
return '{:04x}:{:02x}:{:02x}.{:x}'.format(
domain, bus, device, function)
if m2:
bus, device, function = \
map(lambda n: int(n, 16), m2.groups())
return '{:04x}:{:02x}:{:02x}.{:x}'.format(
0, bus, device, function)
return None
def maybe_shorten_pci_addr(pci_addr):
'''Shorten a PCI address, but only if its domain is not 0'''
try:
return shorten_pci_addr(pci_addr)
except NonZeroDomain:
return pci_addr
def shorten_pci_addr(pci_addr):
'''
Convert a long pci address to the short version, nothing to be done if pci
address is already a short version.
Short addresses do not necessarily uniquely identify a device! Only use
this for displaying address to humans. This will raise NonZeroBus if passed
an address that cannot be shortened. Consider `maybe_shorten_pci_addr`
'''
m1 = LONG_PCI_ADDR_REGEX.match(pci_addr)
m2 = SHORT_PCI_ADDR_REGEX.match(pci_addr)
if m1:
if m1.group(1) != '0000':
raise NonZeroDomain()
pci_addr = '{}:{}.{}'.format(
m1.group(2), m1.group(3), m1.group(4))
elif m2:
pass
else:
log.error('Invalid pci address %s', pci_addr)
pci_addr = None
return pci_addr
def load_pci_ids():
db = {}
pci_ids_locations = [
"/usr/share/hwdata/pci.ids",
"/usr/share/misc/pci.ids",
]
pci_ids_location = None
for loc in pci_ids_locations:
if os.path.isfile(loc):
pci_ids_location = loc
break
if not pci_ids_location:
raise RuntimeError(
"No pci.ids file avail in %r" % pci_ids_locations
)
with open(pci_ids_location) as f:
vid = None
did = None
for line in f:
if line.startswith('#'):
continue
if line.startswith('C '):
vid = None
continue
if len(line.strip()) == 0:
continue
parts = line.split(' ', 1)
if parts[0].startswith("\t\t"):
if not (vid and did):
continue
subvid, subdid = parts[0].split()
subvid = int(subvid, 16)
subdid = int(subdid, 16)
name = parts[1].strip()
db[(vid, did, subvid, subdid)] = name
continue
if parts[0].startswith("\t"):
if not vid:
continue
did = parts[0].strip()
did = int(did, 16)
name = parts[1].strip()
db[(vid, did)] = name
continue
vid = parts[0]
vid = int(vid, 16)
name = parts[1].strip()
db[vid] = name
did = None
return db
pci_db = None
no_pci_db = False
def get_pci_db():
global pci_db
global no_pci_db
if no_pci_db:
return None
if pci_db:
return pci_db
else:
pci_db = load_pci_ids()
if pci_db is None:
no_pci_db = True
return pci_db
def lookup_device(vendorid, deviceid):
db = get_pci_db()
if db is None:
return None, None
vendor = db.get(vendorid)
device = db.get((vendorid, deviceid))
return vendor, device