Lib/compiler/pyassem.py (697 lines of code) (raw):

# Portions copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) # pyre-unsafe """A flow graph representation for Python bytecode""" from __future__ import annotations import sys from contextlib import contextmanager from types import CodeType from typing import Generator, List, Optional from . import opcodes, opcode_cinder from .consts import CO_NEWLOCALS, CO_OPTIMIZED, CO_SUPPRESS_JIT from .peephole import Optimizer try: import cinder MAX_BYTECODE_OPT_ITERS = 5 except ImportError: MAX_BYTECODE_OPT_ITERS = 1 def sign(a): if not isinstance(a, float): raise TypeError(f"Must be a real number, not {type(a)}") if a != a: return 1.0 # NaN case return 1.0 if str(a)[0] != "-" else -1.0 def instrsize(oparg): if oparg <= 0xFF: return 1 elif oparg <= 0xFFFF: return 2 elif oparg <= 0xFFFFFF: return 3 else: return 4 def cast_signed_byte_to_unsigned(i): if i < 0: i = 255 + i + 1 return i FVC_MASK = 0x3 FVC_NONE = 0x0 FVC_STR = 0x1 FVC_REPR = 0x2 FVC_ASCII = 0x3 FVS_MASK = 0x4 FVS_HAVE_SPEC = 0x4 class Instruction: __slots__ = ("opname", "oparg", "target", "ioparg") def __init__( self, opname: str, oparg: object, ioparg: int = 0, target: Optional[Block] = None, ): self.opname = opname self.oparg = oparg self.ioparg = ioparg self.target = target def __repr__(self): args = [f"{self.opname!r}", f"{self.oparg!r}", f"{self.ioparg!r}"] if self.target is not None: args.append(f"{self.target!r}") return f"Instruction({', '.join(args)})" class CompileScope: START_MARKER = "compile-scope-start-marker" __slots__ = "blocks" def __init__(self, blocks): self.blocks = blocks class FlowGraph: def __init__(self): self.block_count = 0 # List of blocks in the order they should be output for linear # code. As we deal with structured code, this order corresponds # to the order of source level constructs. (The original # implementation from Python2 used a complex ordering algorithm # which more buggy and erratic than useful.) self.ordered_blocks = [] # Current block being filled in with instructions. self.current = None self.entry = Block("entry") self.exit = Block("exit") self.startBlock(self.entry) # Source line number to use for next instruction. self.lineno = 0 # Whether line number was already output (set to False to output # new line number). self.lineno_set = False # First line of this code block. This field is expected to be set # externally and serve as a reference for generating all other # line numbers in the code block. (If it's not set, it will be # deduced). self.firstline = 0 # Line number of first instruction output. Used to deduce .firstline # if it's not set explicitly. self.first_inst_lineno = 0 # If non-zero, do not emit bytecode self.do_not_emit_bytecode = 0 @contextmanager def new_compile_scope(self) -> Generator[CompileScope, None, None]: prev_current = self.current prev_ordered_blocks = self.ordered_blocks prev_line_no = self.first_inst_lineno try: self.ordered_blocks = [] self.current = self.newBlock(CompileScope.START_MARKER) yield CompileScope(self.ordered_blocks) finally: self.current = prev_current self.ordered_blocks = prev_ordered_blocks self.first_inst_lineno = prev_line_no def apply_from_scope(self, scope: CompileScope): # link current block with the block from out of order result block: Block = scope.blocks[0] assert block.prev is not None assert block.prev.label == CompileScope.START_MARKER block.prev = None self.current.addNext(block) self.ordered_blocks.extend(scope.blocks) self.current = scope.blocks[-1] def startBlock(self, block): if self._debug: if self.current: print("end", repr(self.current)) print(" next", self.current.next) print(" prev", self.current.prev) print(" ", self.current.get_children()) print(repr(block)) block.bid = self.block_count self.block_count += 1 self.current = block if block and block not in self.ordered_blocks: if block is self.exit: if self.ordered_blocks and self.ordered_blocks[-1].has_return(): return self.ordered_blocks.append(block) def nextBlock(self, block=None, label=""): if self.do_not_emit_bytecode: return # XXX think we need to specify when there is implicit transfer # from one block to the next. might be better to represent this # with explicit JUMP_ABSOLUTE instructions that are optimized # out when they are unnecessary. # # I think this strategy works: each block has a child # designated as "next" which is returned as the last of the # children. because the nodes in a graph are emitted in # reverse post order, the "next" block will always be emitted # immediately after its parent. # Worry: maintaining this invariant could be tricky if block is None: block = self.newBlock(label=label) # Note: If the current block ends with an unconditional control # transfer, then it is techically incorrect to add an implicit # transfer to the block graph. Doing so results in code generation # for unreachable blocks. That doesn't appear to be very common # with Python code and since the built-in compiler doesn't optimize # it out we don't either. self.current.addNext(block) self.startBlock(block) def newBlock(self, label=""): b = Block(label) return b def startExitBlock(self): self.nextBlock(self.exit) _debug = 0 def _enable_debug(self): self._debug = 1 def _disable_debug(self): self._debug = 0 def emit(self, opcode: str, oparg: object = 0): self.maybeEmitSetLineno() if opcode != "SET_LINENO" and isinstance(oparg, Block): if not self.do_not_emit_bytecode: self.current.addOutEdge(oparg) self.current.emit(Instruction(opcode, 0, 0, target=oparg)) return ioparg = self.convertArg(opcode, oparg) if not self.do_not_emit_bytecode: self.current.emit(Instruction(opcode, oparg, ioparg)) if opcode == "SET_LINENO" and not self.first_inst_lineno: self.first_inst_lineno = ioparg def maybeEmitSetLineno(self): if not self.do_not_emit_bytecode and not self.lineno_set and self.lineno: self.lineno_set = True self.emit("SET_LINENO", self.lineno) def convertArg(self, opcode: str, oparg: object) -> int: if isinstance(oparg, int): return oparg raise ValueError(f"invalid oparg {oparg!r} for {opcode!r}") def getBlocksInOrder(self): """Return the blocks in the order they should be output.""" return self.ordered_blocks def getBlocks(self): if self.exit not in self.ordered_blocks: return self.ordered_blocks + [self.exit] return self.ordered_blocks def getRoot(self): """Return nodes appropriate for use with dominator""" return self.entry def getContainedGraphs(self): result = [] for b in self.getBlocks(): result.extend(b.getContainedGraphs()) return result class Block: def __init__(self, label=""): self.insts = [] self.outEdges = set() self.label = label self.bid = None self.next = None self.prev = None self.returns = False self.offset = 0 self.seen = False # visited during stack depth calculation self.startdepth = -1 def __repr__(self): data = [] data.append(f"id={self.bid}") data.append(f"startdepth={self.startdepth}") if self.next: data.append(f"next={self.next.bid}") extras = ", ".join(data) if self.label: return f"<block {self.label} {extras}>" else: return f"<block {extras}>" def __str__(self): insts = map(str, self.insts) insts = "\n".join(insts) return f"<block label={self.label} bid={self.bid} startdepth={self.startdepth}: {insts}>" def emit(self, instr: Instruction) -> None: if instr.opname == "RETURN_VALUE": self.returns = True self.insts.append(instr) def getInstructions(self): return self.insts def addOutEdge(self, block): self.outEdges.add(block) def addNext(self, block): assert self.next is None, next self.next = block assert block.prev is None, block.prev block.prev = self _uncond_transfer = ( "RETURN_PRIMITIVE", "RETURN_VALUE", "RAISE_VARARGS", "JUMP_ABSOLUTE", "JUMP_FORWARD", ) def has_unconditional_transfer(self): """Returns True if there is an unconditional transfer to an other block at the end of this block. This means there is no risk for the bytecode executer to go past this block's bytecode.""" if self.insts and self.insts[-1][0] in self._uncond_transfer: return True def has_return(self): return self.insts and self.insts[-1].opname == "RETURN_VALUE" def get_children(self): return list(self.outEdges) + [self.next] def get_followers(self): """Get the whole list of followers, including the next block.""" followers = {self.next} # Blocks that must be emitted *after* this one, because of # bytecode offsets (e.g. relative jumps) pointing to them. for inst in self.insts: if inst[0] in self.opcode.hasjrel: followers.add(inst[1]) return followers def getContainedGraphs(self): """Return all graphs contained within this block. For example, a MAKE_FUNCTION block will contain a reference to the graph for the function body. """ contained = [] for inst in self.insts: if len(inst) == 1: continue op = inst[1] if hasattr(op, "graph"): contained.append(op.graph) return contained # flags for code objects # the FlowGraph is transformed in place; it exists in one of these states ACTIVE = "ACTIVE" # accepting calls to .emit() CLOSED = "CLOSED" # closed to new instructions, ready for codegen FLAT = "FLAT" # flattened DONE = "DONE" class IndexedSet: """Container that behaves like a `set` that assigns stable dense indexes to each element. Put another way: This behaves like a `list` where you check `x in <list>` before doing any insertion to avoid duplicates. But contrary to the list this does not require an O(n) member check.""" __delitem__ = None def __init__(self, iterable=()): self.keys = {} for item in iterable: self.get_index(item) def __add__(self, iterable): result = IndexedSet() for item in self.keys.keys(): result.get_index(item) for item in iterable: result.get_index(item) return result def __contains__(self, item): return item in self.keys def __iter__(self): # This relies on `dict` maintaining insertion order. return iter(self.keys.keys()) def __len__(self): return len(self.keys) def get_index(self, item): """Return index of name in collection, appending if necessary""" assert type(item) is str idx = self.keys.get(item) if idx is not None: return idx idx = len(self.keys) self.keys[item] = idx return idx def index(self, item): assert type(item) is str idx = self.keys.get(item) if idx is not None: return idx raise ValueError() def update(self, iterable): for item in iterable: self.get_index(item) class PyFlowGraph(FlowGraph): super_init = FlowGraph.__init__ opcode = opcodes.opcode def __init__( self, name: str, filename: str, scope, flags: int = 0, args=(), kwonlyargs=(), starargs=(), optimized: int = 0, klass: bool = False, docstring: Optional[str] = None, firstline: int = 0, peephole_enabled: bool = True, posonlyargs: int = 0, ) -> None: self.super_init() self.name = name self.filename = filename self.scope = scope self.docstring = None self.args = args self.kwonlyargs = kwonlyargs self.posonlyargs = posonlyargs self.starargs = starargs self.klass = klass self.stacksize = 0 self.docstring = docstring self.peephole_enabled = peephole_enabled self.flags = flags if optimized: self.setFlag(CO_OPTIMIZED | CO_NEWLOCALS) self.consts = {} self.names = IndexedSet() # Free variables found by the symbol table scan, including # variables used only in nested scopes, are included here. self.freevars = IndexedSet(self.scope.get_free_vars()) self.cellvars = IndexedSet(self.scope.get_cell_vars()) # The closure list is used to track the order of cell # variables and free variables in the resulting code object. # The offsets used by LOAD_CLOSURE/LOAD_DEREF refer to both # kinds of variables. self.closure = self.cellvars + self.freevars varnames = IndexedSet() varnames.update(args) varnames.update(kwonlyargs) varnames.update(starargs) self.varnames = varnames self.stage = ACTIVE self.firstline = firstline self.first_inst_lineno = 0 self.lineno_set = False self.lineno = 0 # Add any extra consts that were requested to the const pool self.extra_consts = [] self.initializeConsts() self.fast_vars = set() def setFlag(self, flag: int) -> None: self.flags |= flag def checkFlag(self, flag: int) -> Optional[int]: if self.flags & flag: return 1 def initializeConsts(self) -> None: # Docstring is first entry in co_consts for normal functions # (Other types of code objects deal with docstrings in different # manner, e.g. lambdas and comprehensions don't have docstrings, # classes store them as __doc__ attribute. if self.name == "<lambda>": self.consts[self.get_const_key(None)] = 0 elif not self.name.startswith("<") and not self.klass: if self.docstring is not None: self.consts[self.get_const_key(self.docstring)] = 0 else: self.consts[self.get_const_key(None)] = 0 def convertArg(self, opcode: str, oparg: object) -> int: assert self.stage == ACTIVE, self.stage if self.do_not_emit_bytecode and opcode in self._quiet_opcodes: # return -1 so this errors if it ever ends up in non-dead-code due # to a bug. return -1 conv = self._converters.get(opcode) if conv is not None: return conv(self, oparg) return super().convertArg(opcode, oparg) def getCode(self): """Get a Python code object""" assert self.stage == ACTIVE, self.stage self.stage = CLOSED self.computeStackDepth() self.flattenGraph() assert self.stage == FLAT, self.stage self.makeByteCode() assert self.stage == DONE, self.stage code = self.newCodeObject() return code def dump(self, io=None): if io: save = sys.stdout sys.stdout = io pc = 0 for block in self.getBlocks(): print(repr(block)) for instr in block.getInstructions(): opname = instr.opname if opname == "SET_LINENO": continue if instr.target is None: print("\t", "%3d" % pc, opname, instr.oparg) elif instr.target.label: print( "\t", f"{pc:3} {opname} {instr.target.bid} ({instr.target.label})", ) else: print("\t", f"{pc:3} {opname} {instr.target.bid}") pc += self.opcode.CODEUNIT_SIZE if io: sys.stdout = save def push_block(self, worklist: List[Block], block: Block, depth: int): assert ( block.startdepth < 0 or block.startdepth >= depth ), f"{block!r}: {block.startdepth} vs {depth}" if block.startdepth < depth: block.startdepth = depth worklist.append(block) def stackdepth_walk(self, block): maxdepth = 0 worklist = [] self.push_block(worklist, block, 0) while worklist: block = worklist.pop() next = block.next depth = block.startdepth assert depth >= 0 for instr in block.getInstructions(): if instr.opname == "SET_LINENO": continue delta = self.opcode.stack_effect_raw(instr.opname, instr.oparg, False) new_depth = depth + delta if new_depth > maxdepth: maxdepth = new_depth assert depth >= 0 op = self.opcode.opmap[instr.opname] if self.opcode.has_jump(op): delta = self.opcode.stack_effect_raw( instr.opname, instr.oparg, True ) target_depth = depth + delta if target_depth > maxdepth: maxdepth = target_depth assert target_depth >= 0 self.push_block(worklist, instr.target, target_depth) depth = new_depth if instr.opname in ( "JUMP_ABSOLUTE", "JUMP_FORWARD", "RETURN_VALUE", "RAISE_VARARGS", ): # Remaining code is dead next = None break # TODO(dinoviehland): we could save the delta we came up with here and # reapply it on subsequent walks rather than having to walk all of the # instructions again. if next: self.push_block(worklist, next, depth) return maxdepth def computeStackDepth(self): """Compute the max stack depth. Find the flow path that needs the largest stack. We assume that cycles in the flow graph have no net effect on the stack depth. """ assert self.stage == CLOSED, self.stage for block in self.getBlocksInOrder(): # We need to get to the first block which actually has instructions if block.getInstructions(): self.stacksize = self.stackdepth_walk(block) break def flattenGraph(self): """Arrange the blocks in order and resolve jumps""" assert self.stage == CLOSED, self.stage # This is an awful hack that could hurt performance, but # on the bright side it should work until we come up # with a better solution. # # The issue is that in the first loop blocksize() is called # which calls instrsize() which requires i_oparg be set # appropriately. There is a bootstrap problem because # i_oparg is calculated in the second loop. # # So we loop until we stop seeing new EXTENDED_ARGs. # The only EXTENDED_ARGs that could be popping up are # ones in jump instructions. So this should converge # fairly quickly. extended_arg_recompile = True while extended_arg_recompile: extended_arg_recompile = False self.insts = insts = [] pc = 0 for b in self.getBlocksInOrder(): b.offset = pc for inst in b.getInstructions(): insts.append(inst) if inst.opname != "SET_LINENO": pc += instrsize(inst.ioparg) pc = 0 for inst in insts: if inst.opname == "SET_LINENO": continue pc += instrsize(inst.ioparg) op = self.opcode.opmap[inst.opname] if self.opcode.has_jump(op): oparg = inst.ioparg target = inst.target offset = target.offset if op in self.opcode.hasjrel: offset -= pc offset *= 2 if instrsize(oparg) != instrsize(offset): extended_arg_recompile = True assert offset >= 0, "Offset value: %d" % offset inst.ioparg = offset self.stage = FLAT def sort_cellvars(self): self.closure = self.cellvars + self.freevars def _convert_LOAD_CONST(self, arg: object) -> int: getCode = getattr(arg, "getCode", None) if getCode is not None: arg = getCode() key = self.get_const_key(arg) res = self.consts.get(key, self) if res is self: res = self.consts[key] = len(self.consts) return res def get_const_key(self, value: object): if isinstance(value, float): return type(value), value, sign(value) elif isinstance(value, complex): return type(value), value, sign(value.real), sign(value.imag) elif isinstance(value, (tuple, frozenset)): return ( type(value), value, tuple(self.get_const_key(const) for const in value), ) return type(value), value def _convert_LOAD_FAST(self, arg: object) -> int: self.fast_vars.add(arg) return self.varnames.get_index(arg) def _convert_LOAD_LOCAL(self, arg: object) -> int: self.fast_vars.add(arg) assert isinstance(arg, tuple), "invalid oparg {arg!r}" return self._convert_LOAD_CONST((self.varnames.get_index(arg[0]), arg[1])) def _convert_NAME(self, arg: object) -> int: return self.names.get_index(arg) def _convert_LOAD_SUPER(self, arg: object) -> int: assert isinstance(arg, tuple), "invalid oparg {arg!r}" return self._convert_LOAD_CONST((self._convert_NAME(arg[0]), arg[1])) def _convert_DEREF(self, arg: object) -> int: # Sometimes, both cellvars and freevars may contain the same var # (e.g., for class' __class__). In this case, prefer freevars. if arg in self.freevars: return self.freevars.get_index(arg) + len(self.cellvars) return self.closure.get_index(arg) # similarly for other opcodes... _converters = { "LOAD_CONST": _convert_LOAD_CONST, "INVOKE_FUNCTION": _convert_LOAD_CONST, "INVOKE_METHOD": _convert_LOAD_CONST, "LOAD_FIELD": _convert_LOAD_CONST, "STORE_FIELD": _convert_LOAD_CONST, "CAST": _convert_LOAD_CONST, "PRIMITIVE_BOX": _convert_LOAD_CONST, "PRIMITIVE_UNBOX": _convert_LOAD_CONST, "TP_ALLOC": _convert_LOAD_CONST, "CHECK_ARGS": _convert_LOAD_CONST, "BUILD_CHECKED_MAP": _convert_LOAD_CONST, "BUILD_CHECKED_LIST": _convert_LOAD_CONST, "PRIMITIVE_LOAD_CONST": _convert_LOAD_CONST, "LOAD_FAST": _convert_LOAD_FAST, "STORE_FAST": _convert_LOAD_FAST, "DELETE_FAST": _convert_LOAD_FAST, "LOAD_LOCAL": _convert_LOAD_LOCAL, "STORE_LOCAL": _convert_LOAD_LOCAL, "LOAD_NAME": _convert_NAME, "LOAD_CLOSURE": lambda self, arg: self.closure.get_index(arg), "COMPARE_OP": lambda self, arg: self.opcode.CMP_OP.index(arg), "LOAD_GLOBAL": _convert_NAME, "STORE_GLOBAL": _convert_NAME, "DELETE_GLOBAL": _convert_NAME, "CONVERT_NAME": _convert_NAME, "STORE_NAME": _convert_NAME, "STORE_ANNOTATION": _convert_NAME, "DELETE_NAME": _convert_NAME, "IMPORT_NAME": _convert_NAME, "IMPORT_FROM": _convert_NAME, "STORE_ATTR": _convert_NAME, "LOAD_ATTR": _convert_NAME, "DELETE_ATTR": _convert_NAME, "LOAD_METHOD": _convert_NAME, "LOAD_DEREF": _convert_DEREF, "STORE_DEREF": _convert_DEREF, "DELETE_DEREF": _convert_DEREF, "LOAD_CLASSDEREF": _convert_DEREF, "REFINE_TYPE": _convert_LOAD_CONST, "LOAD_METHOD_SUPER": _convert_LOAD_SUPER, "LOAD_ATTR_SUPER": _convert_LOAD_SUPER, "LOAD_TYPE": _convert_LOAD_CONST, "FUNC_CREDENTIAL": _convert_LOAD_CONST, "READONLY_OPERATION": _convert_LOAD_CONST, } # Opcodes which do not add names to co_consts/co_names/co_varnames in dead code (self.do_not_emit_bytecode) _quiet_opcodes = { "LOAD_CONST", "IMPORT_NAME", "STORE_ATTR", "LOAD_ATTR", "DELETE_ATTR", "LOAD_METHOD", "STORE_FAST", "LOAD_FAST", } def makeByteCode(self): assert self.stage == FLAT, self.stage self.lnotab = lnotab = LineAddrTable(self.opcode) lnotab.setFirstLine(self.firstline or self.first_inst_lineno or 1) for t in self.insts: if t.opname == "SET_LINENO": lnotab.nextLine(t.oparg) continue oparg = t.ioparg assert 0 <= oparg <= 0xFFFFFFFF, oparg if oparg > 0xFFFFFF: lnotab.addCode(self.opcode.EXTENDED_ARG, (oparg >> 24) & 0xFF) if oparg > 0xFFFF: lnotab.addCode(self.opcode.EXTENDED_ARG, (oparg >> 16) & 0xFF) if oparg > 0xFF: lnotab.addCode(self.opcode.EXTENDED_ARG, (oparg >> 8) & 0xFF) lnotab.addCode(self.opcode.opmap[t.opname], oparg & 0xFF) self.stage = DONE def newCodeObject(self): assert self.stage == DONE, self.stage if (self.flags & CO_NEWLOCALS) == 0: nlocals = len(self.fast_vars) else: nlocals = len(self.varnames) firstline = self.firstline # For module, .firstline is initially not set, and should be first # line with actual bytecode instruction (skipping docstring, optimized # out instructions, etc.) if not firstline: firstline = self.first_inst_lineno # If no real instruction, fallback to 1 if not firstline: firstline = 1 consts = self.getConsts() code = self.lnotab.getCode() lnotab = self.lnotab.getTable() if self.peephole_enabled: for _i in range(MAX_BYTECODE_OPT_ITERS): opt = self.make_optimizer(code, consts, lnotab).optimize() if opt is not None: if code == opt.byte_code: break code, consts, lnotab = opt.byte_code, opt.consts, opt.lnotab consts = consts + tuple(self.extra_consts) return self.make_code(nlocals, code, consts, firstline, lnotab) def make_optimizer(self, code, consts, lnotab) -> Optimizer: return Optimizer(code, consts, lnotab, self.opcode) def make_code(self, nlocals, code, consts, firstline, lnotab) -> CodeType: return CodeType( len(self.args), self.posonlyargs, len(self.kwonlyargs), nlocals, self.stacksize, self.flags, code, consts, tuple(self.names), tuple(self.varnames), self.filename, self.name, firstline, lnotab, tuple(self.freevars), tuple(self.cellvars), ) def getConsts(self): """Return a tuple for the const slot of the code object""" # Just return the constant value, removing the type portion. Order by const index. return tuple( const[1] for const, idx in sorted(self.consts.items(), key=lambda o: o[1]) ) class PyFlowGraphCinder(PyFlowGraph): opcode = opcode_cinder.opcode def make_code(self, nlocals, code, consts, firstline, lnotab) -> CodeType: flags = self.flags | CO_SUPPRESS_JIT if self.scope.suppress_jit else self.flags return CodeType( len(self.args), self.posonlyargs, len(self.kwonlyargs), nlocals, self.stacksize, flags, code, consts, tuple(self.names), tuple(self.varnames), self.filename, self.name, firstline, lnotab, tuple(self.freevars), tuple(self.cellvars), ) class LineAddrTable: """lnotab This class builds the lnotab, which is documented in compile.c. Here's a brief recap: For each SET_LINENO instruction after the first one, two bytes are added to lnotab. (In some cases, multiple two-byte entries are added.) The first byte is the distance in bytes between the instruction for the last SET_LINENO and the current SET_LINENO. The second byte is offset in line numbers. If either offset is greater than 255, multiple two-byte entries are added -- see compile.c for the delicate details. """ def __init__(self, opcode): self.code = [] self.codeOffset = 0 self.firstline = 0 self.lastline = 0 self.lastoff = 0 self.lnotab = [] self.opcode = opcode def setFirstLine(self, lineno): self.firstline = lineno self.lastline = lineno def addCode(self, opcode, oparg): self.code.append(opcode) self.code.append(oparg) self.codeOffset += self.opcode.CODEUNIT_SIZE def nextLine(self, lineno): if self.firstline == 0: self.firstline = lineno self.lastline = lineno else: # compute deltas addr_delta = self.codeOffset - self.lastoff line_delta = lineno - self.lastline if not addr_delta and not line_delta: return push = self.lnotab.append while addr_delta > 255: push(255) push(0) addr_delta -= 255 if line_delta < -128 or 127 < line_delta: if line_delta < 0: k = -128 ncodes = (-line_delta) // 128 else: k = 127 ncodes = line_delta // 127 line_delta -= ncodes * k push(addr_delta) push(cast_signed_byte_to_unsigned(k)) addr_delta = 0 for _ in range(ncodes - 1): push(0) push(cast_signed_byte_to_unsigned(k)) assert -128 <= line_delta and line_delta <= 127 push(addr_delta) push(cast_signed_byte_to_unsigned(line_delta)) self.lastline = lineno self.lastoff = self.codeOffset def getCode(self): return bytes(self.code) def getTable(self): return bytes(self.lnotab)