tools/pynuttx/nxgdb/mm.py (485 lines of code) (raw):

############################################################################ # tools/pynuttx/nxgdb/mm.py # # SPDX-License-Identifier: Apache-2.0 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. The # ASF licenses this file to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance with the # License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # ############################################################################ from __future__ import annotations import argparse from typing import Generator, List, Tuple import gdb from . import lists, utils from .protocols import mm as p from .utils import Value CONFIG_MM_BACKTRACE = utils.get_symbol_value("CONFIG_MM_BACKTRACE") CONFIG_MM_BACKTRACE = -1 if CONFIG_MM_BACKTRACE is None else int(CONFIG_MM_BACKTRACE) PID_MM_INVALID = -100 PID_MM_MEMPOOL = -1 class MemPoolBlock: """ Memory pool block instance. """ MAGIC_ALLOC = 0x5555_5555 mempool_backtrace_s = utils.lookup_type("struct mempool_backtrace_s") def __init__(self, addr: int, blocksize: int, overhead: int) -> None: """ Initialize the memory pool block instance. block: must be start address of the block, blocksize: block size without backtrace overhead, overhead: backtrace overhead size. """ self.overhead = overhead self.from_pool = True self.is_orphan = False self.address = addr self.blocksize = int(blocksize) self.nodesize = int(blocksize) + self.overhead # Lazy evaluation self._backtrace = self._pid = self._seqno = self._magic = self._blk = None def __repr__(self) -> str: return f"block@{hex(self.address)},size:{self.blocksize},seqno:{self.seqno},pid:{self.pid}" def __str__(self) -> str: return self.__repr__() def __hash__(self) -> int: return hash((self.pid, self.nodesize, self.backtrace)) def __eq__(self, value: MemPoolBlock) -> bool: return ( self.pid == value.pid and self.nodesize == value.nodesize and self.backtrace == value.backtrace ) def contains(self, address: int) -> bool: """Check if the address is in block's range, excluding overhead""" return self.address <= address < self.address + self.blocksize @property def blk(self) -> p.MemPoolBlock: if not self._blk: addr = int(self.address) + self.blocksize self._blk = ( gdb.Value(addr).cast(self.mempool_backtrace_s.pointer()).dereference() ) return self._blk @property def is_free(self) -> bool: if CONFIG_MM_BACKTRACE < 0: return False if not self._magic: self._magic = int(self.blk["magic"]) return self._magic != self.MAGIC_ALLOC @property def seqno(self) -> int: if not self._seqno: self._seqno = int(self.blk["seqno"]) if CONFIG_MM_BACKTRACE >= 0 else -100 return self._seqno @property def pid(self) -> int: if not self._pid: self._pid = ( int(self.blk["pid"]) if CONFIG_MM_BACKTRACE >= 0 else PID_MM_INVALID ) return self._pid @property def backtrace(self) -> Tuple[int]: if CONFIG_MM_BACKTRACE <= 0: return () if not self._backtrace: self._backtrace = tuple( int(self.blk["backtrace"][i]) for i in range(CONFIG_MM_BACKTRACE) ) return self._backtrace @property def prevnode(self) -> MemPoolBlock: addr = self.address - self.nodesize return MemPoolBlock(addr, self.blocksize, self.overhead) @property def nextnode(self) -> MemPoolBlock: addr = self.address + self.nodesize return MemPoolBlock(addr, self.blocksize, self.overhead) def read_memory(self) -> memoryview: return gdb.selected_inferior().read_memory(self.address, self.blocksize) class MemPool(Value, p.MemPool): """ Memory pool instance. """ def __init__(self, mpool: Value, name=None) -> None: if mpool.type.code == gdb.TYPE_CODE_PTR: mpool = mpool.dereference() super().__init__(mpool) self._blksize = None self._nfree = None self._nifree = None self._overhead = None def __repr__(self) -> str: return f"{self.name}@{hex(self.address)},size:{self.size}/{self['blocksize']},nused:{self.nused},nfree:{self.nfree}" def __str__(self) -> str: return self.__repr__() @property def name(self) -> str: try: return self.procfs.name.string() except Exception: return "<noname>" @property def memranges(self) -> Generator[Tuple[int, int], None, None]: """Memory ranges of the pool""" sq_entry_t = utils.lookup_type("sq_entry_t") blksize = self.size if self.ibase: blks = int(self.interruptsize) // blksize base = int(self.ibase) yield (base, base + blks * blksize) if not self.equeue.head: return None # First queue has size of initialsize ninit = int(self.initialsize) ninit = ninit and (ninit - sq_entry_t.sizeof) // blksize nexpand = (int(self.expandsize) - sq_entry_t.sizeof) // blksize for entry in lists.NxSQueue(self.equeue): blks = ninit or nexpand ninit = 0 yield (int(entry) - blks * blksize, int(entry)) @property def nqueue(self) -> int: return lists.sq_count(self.equeue) @property def size(self) -> int: """Real block size including backtrace overhead""" if not self._blksize: blksize = self["blocksize"] backtrace = utils.get_symbol_value("CONFIG_MM_BACKTRACE") if CONFIG_MM_BACKTRACE is not None and backtrace >= 0: mempool_backtrace_s = utils.lookup_type("struct mempool_backtrace_s") size_t = utils.lookup_type("size_t") align = ( utils.get_symbol_value("CONFIG_MM_DEFAULT_ALIGNMENT") or 2 * size_t.sizeof ) blksize = blksize + mempool_backtrace_s.sizeof blksize = (blksize + align - 1) & ~(align - 1) self._blksize = int(blksize) return self._blksize @property def overhead(self) -> int: if not self._overhead: self._overhead = self.size - int(self["blocksize"]) return self._overhead @property def nwaiter(self) -> int: return -int(self.waitsem.semcount) if self.wait and self.expandsize == 0 else 0 @property def nused(self) -> int: return int(self.nalloc) @property def free(self) -> int: return (self.nfree + self.nifree) * self.size @property def nfree(self) -> int: if not self._nfree: self._nfree = lists.sq_count(self.queue) return self._nfree + self.nifree @property def nifree(self) -> int: """Interrupt pool free blocks count""" if not self._nifree: self._nifree = lists.sq_count(self.iqueue) return self._nifree @property def total(self) -> int: nqueue = lists.sq_count(self.equeue) sq_entry_t = utils.lookup_type("sq_entry_t") blocks = self.nused + self.nfree return int(nqueue * sq_entry_t.sizeof + blocks * self.size) @property def blks(self) -> Generator[MemPoolBlock, None, None]: """Iterate over all blocks in the pool""" sq_entry_t = utils.lookup_type("sq_entry_t") blksize = self.size # Real block size including backtrace overhead blocksize = self["blocksize"] def iterate(entry, nblocks): base = int(entry) - nblocks * blksize while nblocks > 0: yield MemPoolBlock(base, blocksize, self.overhead) base += blksize nblocks -= 1 if self.ibase: blks = int(self.interruptsize) // blksize yield from iterate(self.ibase + blks * blksize, blks) if not self.equeue.head: return None # First queue has size of initialsize ninit = int(self.initialsize) ninit = ninit and (ninit - sq_entry_t.sizeof) // blksize nexpand = (int(self.expandsize) - sq_entry_t.sizeof) // blksize for entry in lists.NxSQueue(self.equeue): yield from iterate(entry, ninit or nexpand) ninit = 0 def contains(self, address: int) -> Tuple[bool, Value]: ranges = self.memranges if not ranges: return False, None for start, end in ranges: if start <= address < end: return True, None def find(self, address: int) -> Value: """Find the block that contains the given address""" sq_entry_t = utils.lookup_type("sq_entry_t") blksize = self.size blocksize = self["blocksize"] def get_blk(base): blkstart = base + (address - base) // blksize * blksize return MemPoolBlock(blkstart, blocksize, self.overhead) if self.ibase: # Check if it belongs to interrupt pool blks = int(self.interruptsize) // blksize base = int(self.ibase) if base <= address < base + blks * blksize: return get_blk(base) if not self.equeue.head: return None # First queue has size of initialsize ninit = int(self.initialsize) ninit = ninit and (ninit - sq_entry_t.sizeof) // blksize nexpand = (int(self.expandsize) - sq_entry_t.sizeof) // blksize for entry in lists.NxSQueue(self.equeue): blks = ninit or nexpand ninit = 0 base = int(entry) - blks * blksize if base <= address < int(entry): return get_blk(base) def blks_free(self) -> Generator[MemPoolBlock, None, None]: """Iterate over all free blocks in the pool""" blocksize = self["blocksize"] for entry in lists.NxSQueue(self.queue): yield MemPoolBlock(int(entry), blocksize, self.overhead) def blks_used(self) -> Generator[MemPoolBlock, None, None]: """Iterate over all used blocks in the pool""" return filter(lambda blk: not blk.is_free, self.blks) class MemPoolMultiple(Value, p.MemPoolMultiple): """ Multiple level memory pool instance. """ def __init__(self, mpool: Value, name=None) -> None: if mpool.type.code == gdb.TYPE_CODE_PTR: mpool = mpool.dereference() super().__init__(mpool) def __repr__(self) -> str: return f"Multiple Level Memory Pool: {self.address}" def __str__(self) -> str: return self.__repr__() @property def pools(self) -> Generator[MemPool, None, None]: for pool in utils.ArrayIterator(self["pools"], self.npools): yield MemPool(pool) @property def free(self) -> int: return sum(pool.free for pool in self.pools) class MMNode(gdb.Value, p.MMFreeNode): """ One memory node in the memory manager heap, either free or allocated. The instance is always dereferenced to the actual node. """ MM_ALLOC_BIT = 0x1 MM_PREVFREE_BIT = 0x2 MM_MASK_BIT = MM_ALLOC_BIT | MM_PREVFREE_BIT MM_SIZEOF_ALLOCNODE = utils.sizeof("struct mm_allocnode_s") MM_ALLOCNODE_OVERHEAD = MM_SIZEOF_ALLOCNODE - utils.sizeof("mmsize_t") MM_MIN_CHUNK = utils.get_symbol_value("MM_MIN_CHUNK", locspec="mm_initialize") def __init__(self, node: gdb.Value): if node.type.code == gdb.TYPE_CODE_PTR: node = node.dereference() self._backtrace = None self._address = None self._nodesize = None super().__init__(node) def __repr__(self): return ( f"{hex(self.address)}({'F' if self.is_free else 'A'}{'F' if self.is_prev_free else 'A'})" f" size:{self.nodesize}/{self.prevsize if self.is_prev_free else '-'}" f" seq:{self.seqno} pid:{self.pid} " ) def __str__(self) -> str: return self.__repr__() def __hash__(self) -> int: return hash((self.pid, self.nodesize, self.backtrace)) def __eq__(self, value: MMNode) -> bool: return ( self.pid == value.pid and self.nodesize == value.nodesize and self.backtrace == value.backtrace ) def contains(self, address): """Check if the address is in node's range, excluding oeprhead""" return ( self.address + self.overhead <= address < self.address + self.nodesize - MMNode.MM_ALLOCNODE_OVERHEAD ) def read_memory(self): addr = int(self.address) + MMNode.MM_ALLOCNODE_OVERHEAD size = self.nodesize - MMNode.MM_ALLOCNODE_OVERHEAD return gdb.selected_inferior().read_memory(addr, size) @property def address(self) -> int: """Change 'void *' to int""" if not self._address: self._address = int(super().address) return self._address @property def prevsize(self) -> int: """Size of preceding chunk size""" return int(self["preceding"]) & ~MMNode.MM_MASK_BIT @property def nodesize(self) -> int: """Size of this chunk, including overhead""" if not self._nodesize: self._nodesize = int(self["size"]) & ~MMNode.MM_MASK_BIT return self._nodesize @property def usersize(self) -> int: """Size of this chunk, excluding overhead""" return self.nodesize - MMNode.MM_ALLOCNODE_OVERHEAD @property def flink(self): # Only free node has flink and blink return MMNode(self["flink"]) if self.is_free and self["flink"] else None @property def blink(self): # Only free node has flink and blink return MMNode(self["blink"]) if self.is_free and self["blink"] else None @property def pid(self) -> int: # Only available when CONFIG_MM_BACKTRACE >= 0 if CONFIG_MM_BACKTRACE >= 0: return int(self["pid"]) return PID_MM_INVALID @property def seqno(self) -> int: return int(self["seqno"]) if CONFIG_MM_BACKTRACE >= 0 else -1 @property def backtrace(self) -> List[Tuple[int, str, str]]: if CONFIG_MM_BACKTRACE <= 0: return () if not self._backtrace: self._backtrace = tuple( int(self["backtrace"][i]) for i in range(CONFIG_MM_BACKTRACE) ) return self._backtrace @property def prevnode(self) -> MMNode: if not self.is_prev_free: return None addr = int(self.address) - self.prevsize type = utils.lookup_type("struct mm_freenode_s").pointer() return MMNode(gdb.Value(addr).cast(type)) @property def nextnode(self) -> MMNode: if not self.nodesize: gdb.write(f"\n\x1b[31;1m Node corrupted: {self} \x1b[m\n") return None addr = int(self.address) + self.nodesize type = utils.lookup_type("struct mm_freenode_s").pointer() # Use gdb.Value for better performance return MMNode(gdb.Value(addr).cast(type)) @property def is_free(self) -> bool: return not self["size"] & MMNode.MM_ALLOC_BIT @property def is_prev_free(self) -> bool: return self["size"] & MMNode.MM_PREVFREE_BIT @property def is_orphan(self) -> bool: # Report orphaned node and node likely to be orphaned(free-used-used-free) return self.is_prev_free or self.nextnode.is_free @property def from_pool(self) -> bool: return False @property def overhead(self) -> int: return MMNode.MM_ALLOCNODE_OVERHEAD class MMHeap(Value, p.MMHeap): """ One memory manager heap. It may contains multiple regions. """ def __init__(self, heap: Value, name=None) -> None: if heap.type.code == gdb.TYPE_CODE_PTR: heap = heap.dereference() super().__init__(heap) self.name = name or "<noname>" self._regions = None def __repr__(self) -> str: regions = [ f"{hex(start.address)}~{hex(end.address)}" for start, end in self.regions ] return f"{self.name}@{self.address}, {int(self.heapsize) / 1024 :.1f}kB {self.nregions}regions: {','.join(regions)}" def __str__(self) -> str: return self.__repr__() @property def curused(self) -> int: return int(self.mm_curused) @property def heapsize(self) -> int: return int(self.mm_heapsize) @property def free(self) -> int: return self.heapsize - self.curused @property def nregions(self) -> int: return int(utils.get_field(self, "mm_nregions", default=1)) @property def mm_mpool(self) -> Value: return utils.get_field(self, "mm_mpool", default=None) @property def regions(self): if not self._regions: regions = self.nregions self._regions = [] for start, end in zip( utils.ArrayIterator(self.mm_heapstart, regions), utils.ArrayIterator(self.mm_heapend, regions), ): self._regions.append((MMNode(start), MMNode(end))) return self._regions @property def nodes(self) -> Generator[MMNode, None, None]: for start, end in self.regions: node = start while node and node.address <= end.address: yield node node = node.nextnode def nodes_free(self) -> Generator[MMNode, None, None]: return filter(lambda node: node.is_free, self.nodes) def nodes_used(self) -> Generator[MMNode, None, None]: return filter(lambda node: not node.is_free, self.nodes) def contains(self, address: int) -> bool: ranges = [[int(start.address), int(end.address)] for start, end in self.regions] ranges[0][0] = int(self.address) # The heap itself is also in the range return any(start <= address <= end for start, end in ranges) def find(self, address: int) -> MMNode: for node in self.nodes: if node.address <= address < node.address + node.nodesize: return node def get_heaps() -> List[MMHeap]: # parse g_procfs_meminfo to get all heaps heaps = [] meminfo: p.ProcfsMeminfoEntry = utils.gdb_eval_or_none("g_procfs_meminfo") if not meminfo and (heap := gdb.parse_and_eval("g_mmheap")): heaps.append(MMHeap(heap)) while meminfo: heaps.append(MMHeap(meminfo.heap, name=meminfo.name.string())) meminfo = meminfo.next return heaps def get_pools(heaps: List[Value] = []) -> Generator[MemPool, None, None]: for heap in heaps or get_heaps(): if not (mm_pool := heap.mm_mpool): continue mpool = MemPoolMultiple(mm_pool) for pool in mpool.pools: yield pool class MMHeapInfo(gdb.Command): """Show basic heap information""" def __init__(self): super().__init__("mm heap", gdb.COMMAND_USER) def invoke(self, arg: str, from_tty: bool) -> None: for heap in get_heaps(): regions = [(start.address, end.address) for start, end in heap.regions] gdb.write(f"{heap} - has {len(list(heap.nodes))} nodes, regions:") gdb.write(" ".join(f"{hex(start)}~{hex(end)}" for start, end in regions)) gdb.write("\n") class MMPoolInfo(gdb.Command): """Show basic heap information""" def __init__(self): super().__init__("mm pool", gdb.COMMAND_USER) utils.alias("mempool", "mm pool") def invoke(self, arg: str, from_tty: bool) -> None: parser = argparse.ArgumentParser(description="Dump memory pool information.") parser.add_argument( "--heap", type=str, help="Which heap's pool to show", default=None ) try: args = parser.parse_args(gdb.string_to_argv(arg)) except SystemExit: return heaps = [gdb.parse_and_eval(args.heap)] if args.heap else get_heaps() if not (pools := list(get_pools(heaps))): gdb.write("No pools found.\n") return count = len(pools) gdb.write(f"Total {count} pools\n") name_max = max(len(pool.name) for pool in pools) + 11 # 11: "@0x12345678" formatter = ( "{:>%d} {:>11} {:>9} {:>9} {:>9} {:>9} {:>9} {:>9} {:>9} {:>9}\n" % name_max ) head = ( "", "total", "blocksize", "bsize", "overhead", "nused", "nfree", "nifree", "nwaiter", "nqueue", ) gdb.write(formatter.format(*head)) for pool in pools: gdb.write( formatter.format( f"{pool.name}@{pool.address:#x}", pool.total, pool.blocksize, pool.size, pool.overhead, pool.nused, pool.nfree, pool.nifree, pool.nwaiter, pool.nqueue, ) )