library/compiler/pycodegen.py (2,453 lines of code) (raw):

# Portions copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) # pyre-unsafe from __future__ import annotations import ast import importlib.util import itertools import marshal import os import sys from ast import AST, ClassDef from builtins import compile as builtin_compile from contextlib import contextmanager from . import consts36, consts38, future, misc, pyassem, symbols from .consts import ( CO_ASYNC_GENERATOR, CO_COROUTINE, CO_GENERATOR, CO_NESTED, CO_VARARGS, CO_VARKEYWORDS, SC_CELL, SC_FREE, SC_GLOBAL_EXPLICIT, SC_GLOBAL_IMPLICIT, SC_LOCAL, PyCF_MASK_OBSOLETE, PyCF_ONLY_AST, PyCF_SOURCE_IS_UTF8, ) from .optimizer import AstOptimizer from .py38.optimizer import AstOptimizer38 from .pyassem import PyFlowGraph from .symbols import SymbolVisitor from .unparse import to_expr from .visitor import ASTVisitor, walk TYPE_CHECKING = False if TYPE_CHECKING: from typing import List, Optional, Sequence, Union, Type try: import _parser # pyre-ignore[21] except ImportError: parse_callable = builtin_compile else: parse_callable = _parser.parse callfunc_opcode_info = { # (Have *args, Have **args) : opcode (0, 0): "CALL_FUNCTION", (1, 0): "CALL_FUNCTION_VAR", (0, 1): "CALL_FUNCTION_KW", (1, 1): "CALL_FUNCTION_VAR_KW", } # This is a super set of all Python versions LOOP = 1 EXCEPT = 2 TRY_FINALLY = 3 END_FINALLY = 4 WHILE_LOOP = 5 FOR_LOOP = 6 TRY_FINALLY_BREAK = 7 WITH = 8 ASYNC_WITH = 9 HANDLER_CLEANUP = 10 _ZERO = (0).to_bytes(4, "little") _DEFAULT_MODNAME = sys.intern("<module>") def make_header(mtime, size): return _ZERO + mtime.to_bytes(4, "little") + size.to_bytes(4, "little") def compileFile(filename, display=0, compiler=None, modname=_DEFAULT_MODNAME): # compile.c uses marshal to write a long directly, with # calling the interface that would also generate a 1-byte code # to indicate the type of the value. simplest way to get the # same effect is to call marshal and then skip the code. fileinfo = os.stat(filename) with open(filename, "U") as f: buf = f.read() code = compile(buf, filename, "exec", compiler=compiler, modname=modname) with open(filename + "c", "wb") as f: hdr = make_header(int(fileinfo.st_mtime), fileinfo.st_size) f.write(importlib.util.MAGIC_NUMBER) f.write(hdr) marshal.dump(code, f) def compile( source, filename, mode, flags=0, dont_inherit=None, optimize=-1, compiler=None, modname=_DEFAULT_MODNAME, ): """Replacement for builtin compile() function""" if dont_inherit is not None: raise RuntimeError("not implemented yet") result = make_compiler(source, filename, mode, flags, optimize, compiler, modname) if flags & PyCF_ONLY_AST: return result return result.getCode() def parse(source, filename, mode, flags): return parse_callable(source, filename, mode, flags | PyCF_ONLY_AST) def make_compiler( source, filename, mode, flags=0, optimize=-1, generator=None, modname=_DEFAULT_MODNAME, peephole_enabled=True, ast_optimizer_enabled=True, ): if mode not in ("single", "exec", "eval"): raise ValueError("compile() mode must be 'exec', 'eval' or 'single'") if generator is None: generator = get_default_generator() consts = generator.consts if flags & ~(consts.PyCF_MASK | PyCF_MASK_OBSOLETE | consts.PyCF_COMPILE_MASK): raise ValueError("compile(): unrecognised flags", hex(flags)) flags |= PyCF_SOURCE_IS_UTF8 if isinstance(source, ast.AST): tree = source else: tree = parse(source, filename, mode, flags & consts.PyCF_MASK) if flags & PyCF_ONLY_AST: return tree optimize = sys.flags.optimize if optimize == -1 else optimize return generator.make_code_gen( modname, tree, filename, flags, optimize, peephole_enabled=peephole_enabled, ast_optimizer_enabled=ast_optimizer_enabled, ) def is_const(node): is_const_node = isinstance( node, (ast.Num, ast.Str, ast.Ellipsis, ast.Bytes, ast.NameConstant, ast.Constant), ) is_debug = isinstance(node, ast.Name) and node.id == "__debug__" return is_const_node or is_debug def all_items_const(seq, begin, end): for item in seq[begin:end]: if not is_const(item): return False return True CONV_STR = ord("s") CONV_REPR = ord("r") CONV_ASCII = ord("a") class CodeGenerator(ASTVisitor): """Defines basic code generator for Python bytecode This class is an abstract base class. Concrete subclasses must define an __init__() that defines self.graph and then calls the __init__() defined in this class. """ optimized = 0 # is namespace access optimized? __initialized = None class_name = None # provide default for instance variable future_flags = 0 flow_graph = pyassem.PyFlowGraph consts = consts36 def __init__( self, parent: Optional[CodeGenerator], node: AST, symbols: SymbolVisitor, graph: PyFlowGraph, flags=0, optimization_lvl=0, ): super().__init__() self.module_gen = self if parent is None else parent.module_gen self.tree = node self.symbols = symbols self.graph = graph self.scopes = symbols.scopes self.setups = misc.Stack() self.last_lineno = None self._setupGraphDelegation() self.interactive = False self.graph.setFlag(self.module_gen.future_flags) self.scope = self.scopes[node] self.flags = flags self.optimization_lvl = optimization_lvl self.strip_docstrings = optimization_lvl == 2 self.__with_count = 0 self.did_setup_annotations = False self._qual_name = None def _setupGraphDelegation(self): self.emit = self.graph.emit self.newBlock = self.graph.newBlock self.nextBlock = self.graph.nextBlock def getCode(self): """Return a code object""" return self.graph.getCode() def set_qual_name(self, qualname): pass @contextmanager def noEmit(self): self.graph.do_not_emit_bytecode += 1 try: yield finally: self.graph.do_not_emit_bytecode -= 1 @contextmanager def noOp(self): yield def maybeEmit(self, test): return self.noOp() if test else self.noEmit() def skip_visit(self): """On <3.8 if we aren't emitting bytecode we shouldn't even visit the nodes.""" return self.graph.do_not_emit_bytecode def mangle(self, name): if self.class_name is not None: return misc.mangle(name, self.class_name) else: return name def get_module(self): raise RuntimeError("should be implemented by subclasses") # Next five methods handle name access def storeName(self, name): self._nameOp("STORE", name) def loadName(self, name): self._nameOp("LOAD", name) def delName(self, name): self._nameOp("DELETE", name) def _nameOp(self, prefix, name): name = self.mangle(name) scope = self.scope.check_name(name) if scope == SC_LOCAL: if not self.optimized: self.emit(prefix + "_NAME", name) else: self.emit(prefix + "_FAST", name) elif scope == SC_GLOBAL_EXPLICIT: self.emit(prefix + "_GLOBAL", name) elif scope == SC_GLOBAL_IMPLICIT: if not self.optimized: self.emit(prefix + "_NAME", name) else: self.emit(prefix + "_GLOBAL", name) elif scope == SC_FREE or scope == SC_CELL: if isinstance(self.scope, symbols.ClassScope): if prefix == "STORE" and name not in self.scope.nonlocals: self.emit(prefix + "_NAME", name) return if isinstance(self.scope, symbols.ClassScope) and prefix == "LOAD": self.emit(prefix + "_CLASSDEREF", name) else: self.emit(prefix + "_DEREF", name) else: raise RuntimeError("unsupported scope for var %s: %d" % (name, scope)) def _implicitNameOp(self, prefix, name): """Emit name ops for names generated implicitly by for loops The interpreter generates names that start with a period or dollar sign. The symbol table ignores these names because they aren't present in the program text. """ if self.optimized: self.emit(prefix + "_FAST", name) else: self.emit(prefix + "_NAME", name) def set_lineno(self, node): if hasattr(node, "lineno"): self.graph.lineno = node.lineno self.graph.lineno_set = False def update_lineno(self, node): if hasattr(node, "lineno") and node.lineno != self.graph.lineno: self.set_lineno(node) def skip_docstring(self, body): """Given list of statements, representing body of a function, class, or module, skip docstring, if any. """ if ( body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str) ): return body[1:] return body # The first few visitor methods handle nodes that generator new # code objects. They use class attributes to determine what # specialized code generators to use. def visitInteractive(self, node): self.interactive = True self.visit(node.body) self.emit("LOAD_CONST", None) self.emit("RETURN_VALUE") def findFutures(self, node): consts = self.consts future_flags = self.flags & consts.PyCF_MASK for feature in future.find_futures(node): if feature == "generator_stop": future_flags |= consts.CO_FUTURE_GENERATOR_STOP elif feature == "barry_as_FLUFL": future_flags |= consts.CO_FUTURE_BARRY_AS_BDFL return future_flags def visitModule(self, node): self.future_flags = self.findFutures(node) self.graph.setFlag(self.future_flags) if node.body: self.set_lineno(node.body[0]) # Set current line number to the line number of first statement. # This way line number for SETUP_ANNOTATIONS will always # coincide with the line number of first "real" statement in module. # If body is empy, then lineno will be set later in assemble. if self.findAnn(node.body): self.emit("SETUP_ANNOTATIONS") self.did_setup_annotations = True doc = self.get_docstring(node) if doc is not None: self.emit("LOAD_CONST", doc) self.storeName("__doc__") self.visit(self.skip_docstring(node.body)) # See if the was a live statement, to later set its line number as # module first line. If not, fall back to first line of 1. if not self.graph.first_inst_lineno: self.graph.first_inst_lineno = 1 self.emit_module_return(node) def emit_module_return(self, node: ast.Module) -> None: self.emit("LOAD_CONST", None) self.emit("RETURN_VALUE") def visitExpression(self, node): self.visit(node.body) self.emit("RETURN_VALUE") def visitFunctionDef(self, node): self.set_lineno(node) self._visitFuncOrLambda(node, isLambda=0) self.storeName(node.name) visitAsyncFunctionDef = visitFunctionDef def visitJoinedStr(self, node): self.update_lineno(node) for value in node.values: self.visit(value) if len(node.values) != 1: self.emit("BUILD_STRING", len(node.values)) def visitFormattedValue(self, node): self.update_lineno(node) self.visit(node.value) if node.conversion == CONV_STR: oparg = pyassem.FVC_STR elif node.conversion == CONV_REPR: oparg = pyassem.FVC_REPR elif node.conversion == CONV_ASCII: oparg = pyassem.FVC_ASCII else: assert node.conversion == -1, str(node.conversion) oparg = pyassem.FVC_NONE if node.format_spec: self.visit(node.format_spec) oparg |= pyassem.FVS_HAVE_SPEC self.emit("FORMAT_VALUE", oparg) def visitLambda(self, node): self.update_lineno(node) self._visitFuncOrLambda(node, isLambda=1) def processBody(self, node, body, gen): if isinstance(body, list): for stmt in body: gen.visit(stmt) else: gen.visit(body) def _visitAnnotation(self, node): return self.visit(node) skip_func_docstring = skip_docstring def _visitFuncOrLambda(self, node, isLambda=0): if not isLambda and node.decorator_list: for decorator in node.decorator_list: self.visit(decorator) ndecorators = len(node.decorator_list) first_lineno = node.decorator_list[0].lineno else: ndecorators = 0 first_lineno = node.lineno flags = 0 name = sys.intern("<lambda>") if isLambda else node.name gen = self.make_func_codegen(node, name, first_lineno) body = node.body if not isLambda: body = self.skip_func_docstring(body) self.processBody(node, body, gen) gen.finishFunction() if node.args.defaults: for default in node.args.defaults: self.visit(default) flags |= 0x01 self.emit("BUILD_TUPLE", len(node.args.defaults)) kwdefaults = [] for kwonly, default in zip(node.args.kwonlyargs, node.args.kw_defaults): if default is not None: kwdefaults.append(self.mangle(kwonly.arg)) self.visit(default) if kwdefaults: self.emit("LOAD_CONST", tuple(kwdefaults)) self.emit("BUILD_CONST_KEY_MAP", len(kwdefaults)) flags |= 0x02 ann_args = self.annotate_args(node.args) # Cannot annotate return type for lambda if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.returns: self._visitAnnotation(node.returns) ann_args.append("return") if ann_args: flags |= 0x04 self.emit("LOAD_CONST", tuple(ann_args)) self.emit("BUILD_CONST_KEY_MAP", len(ann_args)) self._makeClosure(gen, flags) for _ in range(ndecorators): self.emit("CALL_FUNCTION", 1) def annotate_args(self, args: ast.arguments) -> List[str]: ann_args = [] for arg in args.args: self.annotate_arg(arg, ann_args) if args.vararg: # pyre-fixme[6]: Expected `arg` for 1st param but got `Optional[_ast.arg]`. self.annotate_arg(args.vararg, ann_args) for arg in args.kwonlyargs: self.annotate_arg(arg, ann_args) if args.kwarg: # pyre-fixme[6]: Expected `arg` for 1st param but got `Optional[_ast.arg]`. self.annotate_arg(args.kwarg, ann_args) return ann_args def annotate_arg(self, arg: ast.arg, ann_args: List[str]): if arg.annotation: self._visitAnnotation(arg.annotation) ann_args.append(self.mangle(arg.arg)) def visitClassDef(self, node): self.set_lineno(node) first_lineno = None for decorator in node.decorator_list: if first_lineno is None: first_lineno = decorator.lineno self.visit(decorator) gen = self.make_class_codegen(node, first_lineno or node.lineno) gen.emit("LOAD_NAME", "__name__") gen.storeName("__module__") gen.emit("LOAD_CONST", gen.get_qual_prefix(gen) + gen.name) gen.storeName("__qualname__") if gen.findAnn(node.body): gen.did_setup_annotations = True gen.emit("SETUP_ANNOTATIONS") doc = gen.get_docstring(node) if doc is not None: gen.update_lineno(node.body[0]) gen.emit("LOAD_CONST", doc) gen.storeName("__doc__") self.walkClassBody(node, gen) gen.graph.startExitBlock() if "__class__" in gen.scope.cells: gen.emit("LOAD_CLOSURE", "__class__") gen.emit("DUP_TOP") gen.emit("STORE_NAME", "__classcell__") else: gen.emit("LOAD_CONST", None) gen.emit("RETURN_VALUE") self.emit("LOAD_BUILD_CLASS") self._makeClosure(gen, 0) self.emit("LOAD_CONST", node.name) self._call_helper(2, node.bases, node.keywords) for _ in range(len(node.decorator_list)): self.emit("CALL_FUNCTION", 1) self.store_type_name_and_flags(node) def store_type_name_and_flags(self, node: ClassDef) -> None: self.storeName(node.name) def walkClassBody(self, node: ClassDef, gen: "CodeGenerator"): walk(self.skip_docstring(node.body), gen) # The rest are standard visitor methods # The next few implement control-flow statements def visitIf(self, node): self.set_lineno(node) test = node.test test_const = self.get_bool_const(test) # Emulate co_firstlineno behavior of C compiler if test_const is False and not node.orelse: self.graph.maybeEmitSetLineno() end = self.newBlock("if_end") orelse = None if node.orelse: orelse = self.newBlock("if_else") if test_const is None: self.compileJumpIf(test, orelse or end, False) with self.maybeEmit(test_const is not False): self.nextBlock() self.visit(node.body) if node.orelse: if test_const is None: self.emit("JUMP_FORWARD", end) with self.maybeEmit(test_const is not True): self.nextBlock(orelse) self.visit(node.orelse) self.nextBlock(end) def visitWhile(self, node): self.set_lineno(node) test_const = self.get_bool_const(node.test) if test_const is False: if node.orelse: self.visit(node.orelse) return loop = self.newBlock("while_loop") else_ = self.newBlock("while_else") after = self.newBlock("while_after") self.emit("SETUP_LOOP", after) self.nextBlock(loop) self.setups.push((LOOP, loop)) if test_const is not True: self.compileJumpIf(node.test, else_ or after, False) self.nextBlock(label="while_body") self.visit(node.body) self.emit("JUMP_ABSOLUTE", loop) if not self.get_bool_const(node.test): self.nextBlock(else_ or after) # or just the POPs if not else clause self.emit("POP_BLOCK") self.setups.pop() if node.orelse: self.visit(node.orelse) self.nextBlock(after) def push_loop(self, kind, start, end): self.emit("SETUP_LOOP", end) self.setups.push((LOOP, start)) def pop_loop(self): self.emit("POP_BLOCK") self.setups.pop() def visitFor(self, node): start = self.newBlock() anchor = self.newBlock() after = self.newBlock() self.set_lineno(node) self.push_loop(FOR_LOOP, start, after) self.visit(node.iter) self.emit("GET_ITER") self.nextBlock(start) self.emit("FOR_ITER", anchor) self.visit(node.target) self.visit(node.body) self.emit("JUMP_ABSOLUTE", start) self.nextBlock(anchor) self.pop_loop() if node.orelse: self.visit(node.orelse) self.nextBlock(after) def emitAsyncIterYieldFrom(self): self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") def visitAsyncFor(self, node): try_ = self.newBlock("async_for_try") except_ = self.newBlock("except") end = self.newBlock("end") after_try = self.newBlock("after_try") try_cleanup = self.newBlock("try_cleanup") after_loop_else = self.newBlock("after_loop_else") self.set_lineno(node) self.emit("SETUP_LOOP", end) self.setups.push((LOOP, try_)) self.visit(node.iter) self.emit("GET_AITER") self.emitAsyncIterYieldFrom() self.nextBlock(try_) self.emit("SETUP_EXCEPT", except_) self.setups.push((EXCEPT, try_)) self.emit("GET_ANEXT") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.visit(node.target) self.emit("POP_BLOCK") self.setups.pop() self.emit("JUMP_FORWARD", after_try) self.nextBlock(except_) self.emit("DUP_TOP") self.emit("LOAD_GLOBAL", "StopAsyncIteration") self.emit("COMPARE_OP", "exception match") self.emit("POP_JUMP_IF_TRUE", try_cleanup) self.emit("END_FINALLY") self.nextBlock(after_try) self.visit(node.body) self.emit("JUMP_ABSOLUTE", try_) self.nextBlock(try_cleanup) self.emit("POP_TOP") self.emit("POP_TOP") self.emit("POP_TOP") self.emit("POP_EXCEPT") self.emit("POP_TOP") self.emit("POP_BLOCK") self.setups.pop() self.nextBlock(after_loop_else) if node.orelse: self.visit(node.orelse) self.nextBlock(end) def visitBreak(self, node): if not self.setups: raise SyntaxError("'break' outside loop", self.syntax_error_position(node)) self.set_lineno(node) self.emit("BREAK_LOOP") def visitContinue(self, node): if not self.setups: raise SyntaxError( "'continue' not properly in loop", self.syntax_error_position(node) ) self.set_lineno(node) kind, block = self.setups.top() if kind == LOOP: self.emit("JUMP_ABSOLUTE", block) self.nextBlock() elif kind == EXCEPT or kind == TRY_FINALLY: # find the block that starts the loop top = len(self.setups) while top > 0: top = top - 1 kind, loop_block = self.setups[top] if kind == LOOP: break elif kind == END_FINALLY: raise SyntaxError( "'continue' not supported inside 'finally' clause", self.syntax_error_position(node), ) if kind != LOOP: raise SyntaxError( "'continue' not properly in loop", self.syntax_error_position(node) ) self.emit("CONTINUE_LOOP", loop_block) self.nextBlock() elif kind == END_FINALLY: raise SyntaxError( "'continue' not supported inside 'finally' clause", self.syntax_error_position(node), ) def syntax_error_position(self, node): import linecache source_line = linecache.getline(self.graph.filename, node.lineno) return self.graph.filename, node.lineno, node.col_offset, source_line or None def syntax_error(self, msg, node): import linecache source_line = linecache.getline(self.graph.filename, node.lineno) return SyntaxError( msg, (self.graph.filename, node.lineno, node.col_offset, source_line or None), ) def compileJumpIfPop(self, test, label, is_if_true): self.visit(test) self.emit( "JUMP_IF_TRUE_OR_POP" if is_if_true else "JUMP_IF_FALSE_OR_POP", label ) def visitTest(self, node, is_if_true: bool): end = self.newBlock() for child in node.values[:-1]: self.compileJumpIfPop(child, end, is_if_true) self.nextBlock() self.visit(node.values[-1]) self.nextBlock(end) def visitBoolOp(self, node): self.visitTest(node, type(node.op) == ast.Or) _cmp_opcode = { ast.Eq: "==", ast.NotEq: "!=", ast.Lt: "<", ast.LtE: "<=", ast.Gt: ">", ast.GtE: ">=", ast.Is: "is", ast.IsNot: "is not", ast.In: "in", ast.NotIn: "not in", } def compileJumpIf(self, test, next, is_if_true): self.visit(test) self.emit("POP_JUMP_IF_TRUE" if is_if_true else "POP_JUMP_IF_FALSE", next) def visitIfExp(self, node): endblock = self.newBlock() elseblock = self.newBlock() self.compileJumpIf(node.test, elseblock, False) self.visit(node.body) self.emit("JUMP_FORWARD", endblock) self.nextBlock(elseblock) self.visit(node.orelse) self.nextBlock(endblock) def emitChainedCompareStep(self, op, value, cleanup, jump="JUMP_IF_FALSE_OR_POP"): self.visit(value) self.emit("DUP_TOP") self.emit("ROT_THREE") self.defaultEmitCompare(op) self.emit(jump, cleanup) self.nextBlock(label="compare_or_cleanup") def defaultEmitCompare(self, op): self.emit("COMPARE_OP", self._cmp_opcode[type(op)]) def visitCompare(self, node): self.update_lineno(node) self.visit(node.left) cleanup = self.newBlock("cleanup") for op, code in zip(node.ops[:-1], node.comparators[:-1]): self.emitChainedCompareStep(op, code, cleanup) # now do the last comparison if node.ops: op = node.ops[-1] code = node.comparators[-1] self.visit(code) self.defaultEmitCompare(op) if len(node.ops) > 1: end = self.newBlock("end") self.emit("JUMP_FORWARD", end) self.nextBlock(cleanup) self.emit("ROT_TWO") self.emit("POP_TOP") self.nextBlock(end) def get_qual_prefix(self, gen): prefix = "" if gen.scope.global_scope: return prefix # Construct qualname prefix parent = gen.scope.parent while not isinstance(parent, symbols.ModuleScope): # Only real functions use "<locals>", nested scopes like # comprehensions don't. if type(parent) in (symbols.FunctionScope, symbols.LambdaScope): prefix = parent.name + ".<locals>." + prefix else: prefix = parent.name + "." + prefix if parent.global_scope: break parent = parent.parent return prefix def _makeClosure(self, gen, flags): prefix = "" if not isinstance(gen.tree, ast.ClassDef): prefix = self.get_qual_prefix(gen) frees = gen.scope.get_free_vars() if frees: for name in frees: self.emit("LOAD_CLOSURE", name) self.emit("BUILD_TUPLE", len(frees)) flags |= 0x08 gen.set_qual_name(prefix + gen.name) self.emit("LOAD_CONST", gen) self.emit("LOAD_CONST", prefix + gen.name) # py3 qualname self.emit("MAKE_FUNCTION", flags) def visitDelete(self, node): self.set_lineno(node) self.visit(node.targets) def compile_comprehension(self, node, name, elt, val, opcode, oparg=0): node.args = self.conjure_arguments([ast.arg(".0", None)]) node.body = [] self.update_lineno(node) gen = self.make_func_codegen(node, name, node.lineno) if opcode: gen.emit(opcode, oparg) gen.compile_comprehension_body(node.generators, 0, elt, val, type(node), True) if not isinstance(node, ast.GeneratorExp): gen.emit("RETURN_VALUE") gen.finishFunction() self._makeClosure(gen, 0) # precomputation of outmost iterable self.visit(node.generators[0].iter) if node.generators[0].is_async: self.emit("GET_AITER") self.emitAsyncIterYieldFrom() else: self.emit("GET_ITER") self.emit("CALL_FUNCTION", 1) if gen.scope.coroutine and type(node) is not ast.GeneratorExp: self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") def visitGeneratorExp(self, node): self.compile_comprehension(node, sys.intern("<genexpr>"), node.elt, None, None) def visitListComp(self, node): self.compile_comprehension( node, sys.intern("<listcomp>"), node.elt, None, "BUILD_LIST" ) def visitSetComp(self, node): self.compile_comprehension( node, sys.intern("<setcomp>"), node.elt, None, "BUILD_SET" ) def visitDictComp(self, node): self.compile_comprehension( node, sys.intern("<dictcomp>"), node.key, node.value, "BUILD_MAP" ) def compile_comprehension_body(self, generators, gen_index, elt, val, type, load_iter_from_dot0=False): if generators[gen_index].is_async: self.compile_async_comprehension(generators, gen_index, elt, val, type, load_iter_from_dot0) else: self.compile_sync_comprehension(generators, gen_index, elt, val, type, load_iter_from_dot0) def compile_async_comprehension(self, generators, gen_index, elt, val, type, load_iter_from_dot0): try_ = self.newBlock("try") after_try = self.newBlock("after_try") except_ = self.newBlock("except") if_cleanup = self.newBlock("if_cleanup") try_cleanup = self.newBlock("try_cleanup") gen = generators[gen_index] if load_iter_from_dot0: self.loadName(".0") else: self.visit(gen.iter) self.emit("GET_AITER") self.emitAsyncIterYieldFrom() self.nextBlock(try_) self.emit("SETUP_EXCEPT", except_) self.setups.push((EXCEPT, try_)) self.emit("GET_ANEXT") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.visit(gen.target) self.emit("POP_BLOCK") self.setups.pop() self.emit("JUMP_FORWARD", after_try) self.nextBlock(except_) self.emit("DUP_TOP") self.emit("LOAD_GLOBAL", "StopAsyncIteration") self.emit("COMPARE_OP", "exception match") self.emit("POP_JUMP_IF_TRUE", try_cleanup) self.emit("END_FINALLY") self.nextBlock(after_try) for if_ in gen.ifs: self.compileJumpIf(if_, if_cleanup, False) self.newBlock() gen_index += 1 if gen_index < len(generators): self.compile_comprehension_body(generators, gen_index, elt, val, type) elif type is ast.GeneratorExp: self.visit(elt) self.emit("YIELD_VALUE") self.emit("POP_TOP") elif type is ast.ListComp: self.visit(elt) self.emit("LIST_APPEND", gen_index + 1) elif type is ast.SetComp: self.visit(elt) self.emit("SET_ADD", gen_index + 1) elif type is ast.DictComp: self.compile_dictcomp_element(elt, val) self.emit("MAP_ADD", gen_index + 1) else: raise NotImplementedError("unknown comprehension type") self.nextBlock(if_cleanup) self.emit("JUMP_ABSOLUTE", try_) self.nextBlock(try_cleanup) self.emit("POP_TOP") self.emit("POP_TOP") self.emit("POP_TOP") self.emit("POP_EXCEPT") # for SETUP_EXCEPT self.emit("POP_TOP") def compile_sync_comprehension(self, generators, gen_index, elt, val, type, load_iter_from_dot0): start = self.newBlock("start") skip = self.newBlock("skip") if_cleanup = self.newBlock("if_cleanup") anchor = self.newBlock("anchor") gen = generators[gen_index] if load_iter_from_dot0: self.loadName(".0") else: self.visit(gen.iter) self.emit("GET_ITER") self.nextBlock(start) self.emit("FOR_ITER", anchor) self.nextBlock() self.visit(gen.target) for if_ in gen.ifs: self.compileJumpIf(if_, if_cleanup, False) self.newBlock() gen_index += 1 if gen_index < len(generators): self.compile_comprehension_body(generators, gen_index, elt, val, type) else: if type is ast.GeneratorExp: self.visit(elt) self.emit("YIELD_VALUE") self.emit("POP_TOP") elif type is ast.ListComp: self.visit(elt) self.emit("LIST_APPEND", gen_index + 1) elif type is ast.SetComp: self.visit(elt) self.emit("SET_ADD", gen_index + 1) elif type is ast.DictComp: self.compile_dictcomp_element(elt, val) self.emit("MAP_ADD", gen_index + 1) else: raise NotImplementedError("unknown comprehension type") self.nextBlock(skip) self.nextBlock(if_cleanup) self.emit("JUMP_ABSOLUTE", start) self.nextBlock(anchor) def compile_dictcomp_element(self, elt, val): self.visit(val) self.visit(elt) # exception related def visitAssert(self, node): # XXX would be interesting to implement this via a # transformation of the AST before this stage if not self.optimization_lvl: end = self.newBlock() self.set_lineno(node) # XXX AssertionError appears to be special case -- it is always # loaded as a global even if there is a local name. I guess this # is a sort of renaming op. self.nextBlock() self.compileJumpIf(node.test, end, True) self.nextBlock() self.emit("LOAD_GLOBAL", "AssertionError") if node.msg: self.visit(node.msg) self.emit("CALL_FUNCTION", 1) self.emit("RAISE_VARARGS", 1) else: self.emit("RAISE_VARARGS", 1) self.nextBlock(end) def visitRaise(self, node): self.set_lineno(node) n = 0 if node.exc: self.visit(node.exc) n = n + 1 if node.cause: self.visit(node.cause) n = n + 1 self.emit("RAISE_VARARGS", n) def visitTry(self, node): self.set_lineno(node) if node.finalbody: if node.handlers: self.emit_try_finally( node, lambda: self.visitTryExcept(node), lambda: self.visit(node.finalbody), ) else: self.emit_try_finally( node, lambda: self.visit(node.body), lambda: self.visit(node.finalbody), ) return self.visitTryExcept(node) def visitTryExcept(self, node): body = self.newBlock("try_body") handlers = self.newBlock("try_handlers") end = self.newBlock("try_end") if node.orelse: lElse = self.newBlock("try_else") else: lElse = end self.emit("SETUP_EXCEPT", handlers) self.nextBlock(body) self.setups.push((EXCEPT, body)) self.visit(node.body) self.emit("POP_BLOCK") self.setups.pop() self.emit("JUMP_FORWARD", lElse) self.nextBlock(handlers) last = len(node.handlers) - 1 for i in range(len(node.handlers)): handler = node.handlers[i] expr = handler.type target = handler.name body = handler.body self.set_lineno(handler) if expr: self.emit("DUP_TOP") self.visit(expr) self.emit("COMPARE_OP", "exception match") next = self.newBlock() self.emit("POP_JUMP_IF_FALSE", next) self.nextBlock() elif i < last: raise SyntaxError( "default 'except:' must be last", self.syntax_error_position(handler), ) else: self.set_lineno(handler) self.emit("POP_TOP") self.emit_except_local(handler) if target: def clear_name(): self.emit("LOAD_CONST", None) self.storeName(target) self.delName(target) self.emit_try_finally(node, lambda: self.visit(body), clear_name, True) else: # "block" param shouldn't matter, so just pass None self.setups.push((EXCEPT, None)) self.visit(body) self.emit("POP_EXCEPT") self.setups.pop() self.emit("JUMP_FORWARD", end) if expr: self.nextBlock(next) else: self.nextBlock(label="handler_end") self.emit("END_FINALLY") if node.orelse: self.nextBlock(lElse) self.visit(node.orelse) self.nextBlock(end) def emit_except_local(self, handler: ast.ExceptHandler): target = handler.name if target: self.update_lineno(handler.type) self.storeName(target) else: self.emit("POP_TOP") self.emit("POP_TOP") def emit_try_finally(self, node, try_body, finalbody, except_protect=False): raise NotImplementedError("missing overridde") def visitWith(self, node): self.set_lineno(node) body = self.newBlock() stack = [] for withitem in node.items: final = self.newBlock() stack.append(final) self.__with_count += 1 self.visit(withitem.context_expr) self.emit("SETUP_WITH", final) if withitem.optional_vars is None: self.emit("POP_TOP") else: self.visit(withitem.optional_vars) self.setups.push((TRY_FINALLY, body)) self.nextBlock(body) self.visit(node.body) while stack: final = stack.pop() self.emit("POP_BLOCK") self.setups.pop() self.emit("LOAD_CONST", None) self.nextBlock(final) self.setups.push((END_FINALLY, final)) self.emit("WITH_CLEANUP_START") self.emit("WITH_CLEANUP_FINISH") self.emit("END_FINALLY") self.setups.pop() self.__with_count -= 1 def visitAsyncWith(self, node): self.set_lineno(node) body = self.newBlock() stack = [] for withitem in node.items: final = self.newBlock() stack.append(final) self.__with_count += 1 self.visit(withitem.context_expr) self.emit("BEFORE_ASYNC_WITH") self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("SETUP_ASYNC_WITH", final) if withitem.optional_vars is None: self.emit("POP_TOP") else: self.visit(withitem.optional_vars) self.setups.push((TRY_FINALLY, body)) self.nextBlock(body) self.visit(node.body) while stack: final = stack.pop() self.emit("POP_BLOCK") self.setups.pop() self.emit("LOAD_CONST", None) self.nextBlock(final) self.setups.push((END_FINALLY, final)) self.emit("WITH_CLEANUP_START") self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("WITH_CLEANUP_FINISH") self.emit("END_FINALLY") self.setups.pop() self.__with_count -= 1 # misc def visitExpr(self, node): self.set_lineno(node) # CPy3.6 discards lots of constants if self.interactive: self.visit(node.value) self.emit("PRINT_EXPR") elif not is_const(node.value): self.visit(node.value) self.emit("POP_TOP") def visitNum(self, node): self.update_lineno(node) self.emit("LOAD_CONST", node.n) def visitStr(self, node): self.update_lineno(node) self.emit("LOAD_CONST", node.s) def visitBytes(self, node): self.update_lineno(node) self.emit("LOAD_CONST", node.s) def visitNameConstant(self, node): self.update_lineno(node) self.emit("LOAD_CONST", node.value) def visitConst(self, node): self.update_lineno(node) self.emit("LOAD_CONST", node.value) def visitKeyword(self, node): self.emit("LOAD_CONST", node.name) self.visit(node.expr) def visitGlobal(self, node): self.set_lineno(node) # no code to generate def visitNonlocal(self, node): self.set_lineno(node) # no code to generate def visitName(self, node): self.update_lineno(node) if isinstance(node.ctx, ast.Store): self.storeName(node.id) elif isinstance(node.ctx, ast.Del): self.delName(node.id) elif node.id == "__debug__": self.emit("LOAD_CONST", not bool(self.optimization_lvl)) else: self.loadName(node.id) def visitPass(self, node): self.set_lineno(node) def visitImport(self, node): self.set_lineno(node) level = 0 for alias in node.names: name = alias.name asname = alias.asname self.emit("LOAD_CONST", level) self.emit("LOAD_CONST", None) self.emit("IMPORT_NAME", self.mangle(name)) mod = name.split(".")[0] if asname: self.emitImportAs(name, asname) else: self.storeName(mod) def visitImportFrom(self, node): self.set_lineno(node) level = node.level fromlist = tuple(alias.name for alias in node.names) self.emit("LOAD_CONST", level) self.emit("LOAD_CONST", fromlist) self.emit("IMPORT_NAME", node.module or "") for alias in node.names: name = alias.name asname = alias.asname if name == "*": self.namespace = 0 self.emit("IMPORT_STAR") # There can only be one name w/ from ... import * assert len(node.names) == 1 return else: self.emit("IMPORT_FROM", name) self.storeName(asname or name) self.emit("POP_TOP") def emitImportAs(self, name: str, asname: str): elts = name.split(".") if len(elts) == 1: self.storeName(asname) return for elt in elts[1:]: self.emit("LOAD_ATTR", elt) self.storeName(asname) def visitAttribute(self, node): self.update_lineno(node) self.visit(node.value) if isinstance(node.ctx, ast.Store): self.emit("STORE_ATTR", self.mangle(node.attr)) elif isinstance(node.ctx, ast.Del): self.emit("DELETE_ATTR", self.mangle(node.attr)) else: self.emit("LOAD_ATTR", self.mangle(node.attr)) # next five implement assignments def visitAssign(self, node): self.set_lineno(node) self.visit(node.value) dups = len(node.targets) - 1 for i in range(len(node.targets)): elt = node.targets[i] if i < dups: self.emit("DUP_TOP") if isinstance(elt, ast.AST): self.visit(elt) def checkAnnExpr(self, node): self._visitAnnotation(node) self.emit("POP_TOP") def checkAnnSlice(self, node): if isinstance(node, ast.Index): self.checkAnnExpr(node.value) else: if node.lower: self.checkAnnExpr(node.lower) if node.upper: self.checkAnnExpr(node.upper) if node.step: self.checkAnnExpr(node.step) def checkAnnSubscr(self, node): if isinstance(node, (ast.Index, ast.Slice)): self.checkAnnSlice(node) elif isinstance(node, ast.ExtSlice): for v in node.dims: self.checkAnnSlice(v) def checkAnnotation(self, node): if isinstance(self.tree, (ast.Module, ast.ClassDef)): self.checkAnnExpr(node.annotation) def findAnn(self, stmts): for stmt in stmts: if isinstance(stmt, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)): # Don't recurse into definitions looking for annotations continue elif isinstance(stmt, ast.AnnAssign): return True elif isinstance(stmt, (ast.stmt, ast.ExceptHandler)): for field in stmt._fields: child = getattr(stmt, field) if isinstance(child, list): if self.findAnn(child): return True return False def emitStoreAnnotation(self, name: str, annotation: ast.expr): assert self.did_setup_annotations self._visitAnnotation(annotation) mangled = self.mangle(name) self.emit("STORE_ANNOTATION", mangled) def visitAnnAssign(self, node): self.set_lineno(node) if node.value: self.visit(node.value) self.visit(node.target) if isinstance(node.target, ast.Name): # If we have a simple name in a module or class, store the annotation if node.simple and isinstance(self.tree, (ast.Module, ast.ClassDef)): self.emitStoreAnnotation(node.target.id, node.annotation) elif isinstance(node.target, ast.Attribute): if not node.value: self.checkAnnExpr(node.target.value) elif isinstance(node.target, ast.Subscript): if not node.value: self.checkAnnExpr(node.target.value) self.checkAnnSubscr(node.target.slice) else: raise SystemError( f"invalid node type {type(node).__name__} for annotated assignment" ) if not node.simple: self.checkAnnotation(node) def visitAssName(self, node): if node.flags == "OP_ASSIGN": self.storeName(node.name) elif node.flags == "OP_DELETE": self.set_lineno(node) self.delName(node.name) else: print("oops", node.flags) assert 0 def visitAssAttr(self, node): self.visit(node.expr) if node.flags == "OP_ASSIGN": self.emit("STORE_ATTR", self.mangle(node.attrname)) elif node.flags == "OP_DELETE": self.emit("DELETE_ATTR", self.mangle(node.attrname)) else: print("warning: unexpected flags:", node.flags) print(node) assert 0 def _visitAssSequence(self, node, op="UNPACK_SEQUENCE"): if findOp(node) != "OP_DELETE": self.emit(op, len(node.nodes)) for child in node.nodes: self.visit(child) visitAssTuple = _visitAssSequence visitAssList = _visitAssSequence # augmented assignment def visitAugAssign(self, node): self.set_lineno(node) aug_node = wrap_aug(node.target) self.visit(aug_node, "load") self.visit(node.value) self.emit(self._augmented_opcode[type(node.op)]) self.visit(aug_node, "store") _augmented_opcode = { ast.Add: "INPLACE_ADD", ast.Sub: "INPLACE_SUBTRACT", ast.Mult: "INPLACE_MULTIPLY", ast.MatMult: "INPLACE_MATRIX_MULTIPLY", ast.Div: "INPLACE_TRUE_DIVIDE", ast.FloorDiv: "INPLACE_FLOOR_DIVIDE", ast.Mod: "INPLACE_MODULO", ast.Pow: "INPLACE_POWER", ast.RShift: "INPLACE_RSHIFT", ast.LShift: "INPLACE_LSHIFT", ast.BitAnd: "INPLACE_AND", ast.BitXor: "INPLACE_XOR", ast.BitOr: "INPLACE_OR", } def visitAugName(self, node, mode): if mode == "load": self.loadName(node.id) elif mode == "store": self.storeName(node.id) def visitAugAttribute(self, node, mode): if mode == "load": self.visit(node.value) self.emit("DUP_TOP") self.emit("LOAD_ATTR", self.mangle(node.attr)) elif mode == "store": self.emit("ROT_TWO") self.emit("STORE_ATTR", self.mangle(node.attr)) def visitAugSubscript(self, node, mode): if mode == "load": self.visitSubscript(node, 1) elif mode == "store": self.emit("ROT_THREE") self.emit("STORE_SUBSCR") def visitExec(self, node): self.visit(node.expr) if node.locals is None: self.emit("LOAD_CONST", None) else: self.visit(node.locals) if node.globals is None: self.emit("DUP_TOP") else: self.visit(node.globals) self.emit("EXEC_STMT") def compiler_subkwargs(self, kwargs, begin, end): nkwargs = end - begin if nkwargs > 1: for i in range(begin, end): self.visit(kwargs[i].value) self.emit("LOAD_CONST", tuple(arg.arg for arg in kwargs[begin:end])) self.emit("BUILD_CONST_KEY_MAP", nkwargs) else: for i in range(begin, end): self.emit("LOAD_CONST", kwargs[i].arg) self.visit(kwargs[i].value) self.emit("BUILD_MAP", nkwargs) def _call_helper(self, argcnt, args, kwargs): mustdictunpack = any(arg.arg is None for arg in kwargs) nelts = len(args) nkwelts = len(kwargs) # the number of tuples and dictionaries on the stack nsubkwargs = nsubargs = 0 nseen = argcnt # the number of positional arguments on the stack for arg in args: if isinstance(arg, ast.Starred): if nseen: self.emit("BUILD_TUPLE", nseen) nseen = 0 nsubargs += 1 self.visit(arg.value) nsubargs += 1 else: self.visit(arg) nseen += 1 if nsubargs or mustdictunpack: if nseen: self.emit("BUILD_TUPLE", nseen) nsubargs += 1 if nsubargs > 1: self.emit("BUILD_TUPLE_UNPACK_WITH_CALL", nsubargs) elif nsubargs == 0: self.emit("BUILD_TUPLE", 0) nseen = 0 # the number of keyword arguments on the stack following for i, kw in enumerate(kwargs): if kw.arg is None: if nseen: # A keyword argument unpacking. self.compiler_subkwargs(kwargs, i - nseen, i) nsubkwargs += 1 nseen = 0 self.visit(kw.value) nsubkwargs += 1 else: nseen += 1 if nseen: self.compiler_subkwargs(kwargs, nkwelts - nseen, nkwelts) nsubkwargs += 1 if nsubkwargs > 1: self.emit("BUILD_MAP_UNPACK_WITH_CALL", nsubkwargs) self.emit("CALL_FUNCTION_EX", int(nsubkwargs > 0)) elif nkwelts: for kw in kwargs: self.visit(kw.value) self.emit("LOAD_CONST", tuple(arg.arg for arg in kwargs)) self.emit("CALL_FUNCTION_KW", nelts + nkwelts + argcnt) else: self.emit("CALL_FUNCTION", nelts + argcnt) def visitCall(self, node): self.update_lineno(node) self.visit(node.func) self._call_helper(0, node.args, node.keywords) def visitPrint(self, node, newline=0): self.set_lineno(node) if node.dest: self.visit(node.dest) for child in node.nodes: if node.dest: self.emit("DUP_TOP") self.visit(child) if node.dest: self.emit("ROT_TWO") self.emit("PRINT_ITEM_TO") else: self.emit("PRINT_ITEM") if node.dest and not newline: self.emit("POP_TOP") def visitPrintnl(self, node): self.visitPrint(node, newline=1) if node.dest: self.emit("PRINT_NEWLINE_TO") else: self.emit("PRINT_NEWLINE") def checkReturn(self, node): if not isinstance(self.tree, (ast.FunctionDef, ast.AsyncFunctionDef)): raise SyntaxError( "'return' outside function", self.syntax_error_position(node) ) elif self.scope.coroutine and self.scope.generator and node.value: raise SyntaxError( "'return' with value in async generator", self.syntax_error_position(node), ) def visitReturn(self, node): self.checkReturn(node) self.set_lineno(node) if node.value: self.visit(node.value) else: self.emit("LOAD_CONST", None) self.emit("RETURN_VALUE") def visitYield(self, node): if not isinstance( self.tree, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda, ast.GeneratorExp), ): raise SyntaxError( "'yield' outside function", self.syntax_error_position(node) ) self.update_lineno(node) if node.value: self.visit(node.value) else: self.emit("LOAD_CONST", None) self.emit("YIELD_VALUE") def visitYieldFrom(self, node): if not isinstance( self.tree, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda, ast.GeneratorExp), ): raise SyntaxError( "'yield' outside function", self.syntax_error_position(node) ) elif self.scope.coroutine: raise SyntaxError( "'yield from' inside async function", self.syntax_error_position(node) ) self.update_lineno(node) self.visit(node.value) self.emit("GET_YIELD_FROM_ITER") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") def visitAwait(self, node): self.update_lineno(node) self.visit(node.value) self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") # slice and subscript stuff def visitSubscript(self, node, aug_flag=None): self.update_lineno(node) self.visit(node.value) self.visit(node.slice) if isinstance(node.ctx, ast.Load): self.emit("BINARY_SUBSCR") elif isinstance(node.ctx, ast.Store): if aug_flag: self.emit("DUP_TOP_TWO") self.emit("BINARY_SUBSCR") else: self.emit("STORE_SUBSCR") elif isinstance(node.ctx, ast.Del): self.emit("DELETE_SUBSCR") else: assert 0 # binary ops def binaryOp(self, node, op): self.visit(node.left) self.visit(node.right) self.emit(op) _binary_opcode = { ast.Add: "BINARY_ADD", ast.Sub: "BINARY_SUBTRACT", ast.Mult: "BINARY_MULTIPLY", ast.MatMult: "BINARY_MATRIX_MULTIPLY", ast.Div: "BINARY_TRUE_DIVIDE", ast.FloorDiv: "BINARY_FLOOR_DIVIDE", ast.Mod: "BINARY_MODULO", ast.Pow: "BINARY_POWER", ast.LShift: "BINARY_LSHIFT", ast.RShift: "BINARY_RSHIFT", ast.BitOr: "BINARY_OR", ast.BitXor: "BINARY_XOR", ast.BitAnd: "BINARY_AND", } def visitBinOp(self, node): self.update_lineno(node) self.visit(node.left) self.visit(node.right) op = self._binary_opcode[type(node.op)] self.emit(op) # unary ops def unaryOp(self, node, op): self.visit(node.operand) self.emit(op) _unary_opcode = { ast.Invert: "UNARY_INVERT", ast.USub: "UNARY_NEGATIVE", ast.UAdd: "UNARY_POSITIVE", ast.Not: "UNARY_NOT", } def visitUnaryOp(self, node): self.update_lineno(node) self.unaryOp(node, self._unary_opcode[type(node.op)]) def visitBackquote(self, node): return self.unaryOp(node, "UNARY_CONVERT") # object constructors def visitEllipsis(self, node): self.update_lineno(node) self.emit("LOAD_CONST", Ellipsis) def _visitUnpack(self, node): before = 0 after = 0 starred = None for elt in node.elts: if isinstance(elt, ast.Starred): if starred is not None: raise SyntaxError( "two starred expressions in assignment", self.syntax_error_position(elt), ) elif before >= 256 or len(node.elts) - before - 1 >= (1 << 31) >> 8: raise SyntaxError( "too many expressions in star-unpacking assignment", self.syntax_error_position(elt), ) starred = elt.value elif starred: after += 1 else: before += 1 if starred: self.emit("UNPACK_EX", after << 8 | before) else: self.emit("UNPACK_SEQUENCE", before) def hasStarred(self, elts): for elt in elts: if isinstance(elt, ast.Starred): return True return False def _visitSequence(self, node, build_op, build_inner_op, build_ex_op, ctx): self.update_lineno(node) if isinstance(ctx, ast.Store): self._visitUnpack(node) starred_load = False else: starred_load = self.hasStarred(node.elts) chunks = 0 in_chunk = 0 def out_chunk(): nonlocal chunks, in_chunk if in_chunk: self.emit(build_inner_op, in_chunk) in_chunk = 0 chunks += 1 for elt in node.elts: if starred_load: if isinstance(elt, ast.Starred): out_chunk() chunks += 1 else: in_chunk += 1 if isinstance(elt, ast.Starred): self.visit(elt.value) else: self.visit(elt) # Output trailing chunk, if any out_chunk() if isinstance(ctx, ast.Load): if starred_load: self.emit(build_ex_op, chunks) else: self.emit(build_op, len(node.elts)) def visitStarred(self, node): if isinstance(node.ctx, ast.Store): raise SyntaxError( "starred assignment target must be in a list or tuple", self.syntax_error_position(node), ) else: raise SyntaxError( "can't use starred expression here", self.syntax_error_position(node) ) def visitTuple(self, node): self._visitSequence( node, "BUILD_TUPLE", "BUILD_TUPLE", "BUILD_TUPLE_UNPACK", node.ctx ) def visitList(self, node): self._visitSequence( node, "BUILD_LIST", "BUILD_TUPLE", "BUILD_LIST_UNPACK", node.ctx ) def visitSet(self, node): self._visitSequence( node, "BUILD_SET", "BUILD_SET", "BUILD_SET_UNPACK", ast.Load() ) def visitSlice(self, node): num = 2 if node.lower: self.visit(node.lower) else: self.emit("LOAD_CONST", None) if node.upper: self.visit(node.upper) else: self.emit("LOAD_CONST", None) if node.step: self.visit(node.step) num += 1 self.emit("BUILD_SLICE", num) def visitExtSlice(self, node): for d in node.dims: self.visit(d) self.emit("BUILD_TUPLE", len(node.dims)) # Create dict item by item. Saves interp stack size at the expense # of bytecode size/speed. def visitDict_by_one(self, node): self.update_lineno(node) self.emit("BUILD_MAP", 0) for k, v in zip(node.keys, node.values): self.emit("DUP_TOP") self.visit(k) self.visit(v) self.emit("ROT_THREE") self.emit("STORE_SUBSCR") def _const_value(self, node): if isinstance(node, (ast.NameConstant, ast.Constant)): return node.value elif isinstance(node, ast.Num): return node.n elif isinstance(node, (ast.Str, ast.Bytes)): return node.s elif isinstance(node, ast.Ellipsis): return ... else: assert isinstance(node, ast.Name) and node.id == "__debug__" return not self.optimized def get_bool_const(self, node): """Return True if node represent constantly true value, False if constantly false value, and None otherwise (non-constant).""" if isinstance(node, ast.Num): return bool(node.n) if isinstance(node, ast.NameConstant): return bool(node.value) if isinstance(node, ast.Str): return bool(node.s) if isinstance(node, ast.Name): if node.id == "__debug__": return not bool(self.optimization_lvl) if isinstance(node, ast.Constant): return bool(node.value) def compile_subdict(self, node, begin, end): n = end - begin if n > 1 and all_items_const(node.keys, begin, end): for i in range(begin, end): self.visit(node.values[i]) self.emit( "LOAD_CONST", tuple(self._const_value(x) for x in node.keys[begin:end]) ) self.emit("BUILD_CONST_KEY_MAP", n) else: for i in range(begin, end): self.visit(node.keys[i]) self.visit(node.values[i]) self.emit("BUILD_MAP", n) def visitDict(self, node): self.update_lineno(node) containers = elements = 0 is_unpacking = False for i, (k, v) in enumerate(zip(node.keys, node.values)): is_unpacking = k is None if elements == 0xFFFF or (elements and is_unpacking): self.compile_subdict(node, i - elements, i) containers += 1 elements = 0 if is_unpacking: self.visit(v) containers += 1 else: elements += 1 if elements or containers == 0: self.compile_subdict(node, len(node.keys) - elements, len(node.keys)) containers += 1 self.emitMapUnpack(containers, is_unpacking) def emitMapUnpack(self, containers, is_unpacking): while containers > 1 or is_unpacking: oparg = min(containers, 255) self.emit("BUILD_MAP_UNPACK", oparg) containers -= oparg - 1 is_unpacking = False @property def name(self): if isinstance(self.tree, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)): return self.tree.name elif isinstance(self.tree, ast.SetComp): return "<setcomp>" elif isinstance(self.tree, ast.ListComp): return "<listcomp>" elif isinstance(self.tree, ast.DictComp): return "<dictcomp>" elif isinstance(self.tree, ast.GeneratorExp): return "<genexpr>" elif isinstance(self.tree, ast.Lambda): return "<lambda>" def finishFunction(self): if self.graph.current.returns: return self.graph.startExitBlock() if not isinstance(self.tree, ast.Lambda): self.emit("LOAD_CONST", None) self.emit("RETURN_VALUE") def make_child_codegen( self, tree: AST, graph: PyFlowGraph, codegen_type: Optional[Type[CinderCodeGenerator]] = None, ) -> CodeGenerator: if codegen_type is None: codegen_type = type(self) return codegen_type(self, tree, self.symbols, graph, self.optimization_lvl) def make_func_codegen( self, func: AST, name: str, first_lineno: int ) -> CodeGenerator: filename = self.graph.filename symbols = self.symbols class_name = self.class_name graph = self.make_function_graph( func, filename, symbols.scopes, class_name, name, first_lineno ) res = self.make_child_codegen(func, graph) res.optimized = 1 res.class_name = class_name return res def make_class_codegen( self, klass: ast.ClassDef, first_lineno: int ) -> CodeGenerator: filename = self.graph.filename symbols = self.symbols scope = symbols.scopes[klass] graph = self.flow_graph( klass.name, filename, scope, optimized=0, klass=True, docstring=self.get_docstring(klass), firstline=first_lineno, peephole_enabled=self.graph.peephole_enabled, ) res = self.make_child_codegen(klass, graph) res.class_name = klass.name return res def make_function_graph( self, func, filename: str, scopes, class_name: str, name: str, first_lineno: int ) -> PyFlowGraph: args = [misc.mangle(elt.arg, class_name) for elt in func.args.args] kwonlyargs = [misc.mangle(elt.arg, class_name) for elt in func.args.kwonlyargs] starargs = [] if func.args.vararg: starargs.append(func.args.vararg.arg) if func.args.kwarg: starargs.append(func.args.kwarg.arg) scope = scopes[func] graph = self.flow_graph( name, filename, scope, flags=self.get_graph_flags(func, scope), args=args, kwonlyargs=kwonlyargs, starargs=starargs, optimized=1, docstring=self.get_docstring(func), firstline=first_lineno, peephole_enabled=self.graph.peephole_enabled, ) return graph def get_docstring(self, func) -> Optional[str]: doc = None isLambda = isinstance(func, ast.Lambda) if not isLambda and not self.strip_docstrings: doc = get_docstring(func) return doc def get_graph_flags(self, func, scope): flags = 0 if func.args.vararg: flags = flags | CO_VARARGS if func.args.kwarg: flags = flags | CO_VARKEYWORDS if scope.nested: flags = flags | CO_NESTED if scope.generator and not scope.coroutine: flags = flags | CO_GENERATOR if not scope.generator and scope.coroutine: flags = flags | CO_COROUTINE if scope.generator and scope.coroutine: flags = flags | CO_ASYNC_GENERATOR return flags @classmethod def make_code_gen( cls, name: str, tree: AST, filename: str, flags: int, optimize: int, peephole_enabled: bool = True, ast_optimizer_enabled: bool = True, ): s = symbols.SymbolVisitor() walk(tree, s) graph = cls.flow_graph( name, filename, s.scopes[tree], peephole_enabled=peephole_enabled ) code_gen = cls(None, tree, s, graph, flags, optimize) walk(tree, code_gen) return code_gen class Python37CodeGenerator(CodeGenerator): flow_graph = pyassem.PyFlowGraph37 @classmethod def make_code_gen( cls, name: str, tree: AST, filename: str, flags: int, optimize: int, peephole_enabled: bool = True, ast_optimizer_enabled: bool = True, ): if ast_optimizer_enabled: tree = cls.optimize_tree(optimize, tree) s = symbols.SymbolVisitor() walk(tree, s) graph = cls.flow_graph( name, filename, s.scopes[tree], peephole_enabled=peephole_enabled ) code_gen = cls(None, tree, s, graph, flags, optimize) walk(tree, code_gen) return code_gen @classmethod def optimize_tree(self, optimize: int, tree: AST): return AstOptimizer(optimize=optimize > 0).visit(tree) def visitCall(self, node): if ( node.keywords or not isinstance(node.func, ast.Attribute) or not isinstance(node.func.ctx, ast.Load) or any(isinstance(arg, ast.Starred) for arg in node.args) ): # We cannot optimize this call return super().visitCall(node) self.update_lineno(node) self.visit(node.func.value) self.emit("LOAD_METHOD", self.mangle(node.func.attr)) for arg in node.args: self.visit(arg) self.emit("CALL_METHOD", len(node.args)) def findFutures(self, node): consts = self.consts future_flags = self.flags & consts.PyCF_MASK for feature in future.find_futures(node): if feature == "barry_as_FLUFL": future_flags |= consts.CO_FUTURE_BARRY_AS_BDFL elif feature == "annotations": future_flags |= consts.CO_FUTURE_ANNOTATIONS return future_flags def _visitAnnotation(self, node): consts = self.consts if self.module_gen.future_flags & consts.CO_FUTURE_ANNOTATIONS: self.emit("LOAD_CONST", to_expr(node)) else: self.visit(node) def emitStoreAnnotation(self, name: str, annotation: ast.expr): assert self.did_setup_annotations self._visitAnnotation(annotation) self.emit("LOAD_NAME", "__annotations__") mangled = self.mangle(name) self.emit("LOAD_CONST", mangled) self.emit("STORE_SUBSCR") def emitImportAs(self, name: str, asname: str): elts = name.split(".") if len(elts) == 1: self.storeName(asname) return first = True for elt in elts[1:]: if not first: self.emit("ROT_TWO") self.emit("POP_TOP") self.emit("IMPORT_FROM", elt) first = False self.storeName(asname) self.emit("POP_TOP") def compileJumpIf(self, test, next, is_if_true): if isinstance(test, ast.UnaryOp): if isinstance(test.op, ast.Not): # Compile to remove not operation self.compileJumpIf(test.operand, next, not is_if_true) return elif isinstance(test, ast.BoolOp): is_or = isinstance(test.op, ast.Or) skip_jump = next if is_if_true != is_or: skip_jump = self.newBlock() for node in test.values[:-1]: self.compileJumpIf(node, skip_jump, is_or) self.compileJumpIf(test.values[-1], next, is_if_true) if skip_jump is not next: self.nextBlock(skip_jump) return elif isinstance(test, ast.IfExp): end = self.newBlock("end") orelse = self.newBlock("orelse") # Jump directly to orelse if test matches self.compileJumpIf(test.test, orelse, 0) # Jump directly to target if test is true and body is matches self.compileJumpIf(test.body, next, is_if_true) self.emit("JUMP_FORWARD", end) # Jump directly to target if test is true and orelse matches self.nextBlock(orelse) self.compileJumpIf(test.orelse, next, is_if_true) self.nextBlock(end) return elif isinstance(test, ast.Compare): if len(test.ops) > 1: cleanup = self.newBlock() self.visit(test.left) for op, comparator in zip(test.ops[:-1], test.comparators[:-1]): self.emitChainedCompareStep( op, comparator, cleanup, "POP_JUMP_IF_FALSE" ) self.visit(test.comparators[-1]) self.emit("COMPARE_OP", self._cmp_opcode[type(test.ops[-1])]) self.emit( "POP_JUMP_IF_TRUE" if is_if_true else "POP_JUMP_IF_FALSE", next ) end = self.newBlock() self.emit("JUMP_FORWARD", end) self.nextBlock(cleanup) self.emit("POP_TOP") if not is_if_true: self.emit("JUMP_FORWARD", next) self.nextBlock(end) return self.visit(test) self.emit("POP_JUMP_IF_TRUE" if is_if_true else "POP_JUMP_IF_FALSE", next) return True def visitConstant(self, node: ast.Constant): self.update_lineno(node) self.emit("LOAD_CONST", node.value) def emitAsyncIterYieldFrom(self): pass def checkAsyncWith(self, node): if not self.scope.coroutine: raise SyntaxError( "'async with' outside async function", self.syntax_error_position(node) ) def visitAsyncWith(self, node): self.checkAsyncWith(node) return super().visitAsyncWith(node) def emit_try_finally(self, node, try_body, finalbody, except_protect=False): body = self.newBlock() final = self.newBlock() self.emit("SETUP_FINALLY", final) self.nextBlock(body) self.setups.push((TRY_FINALLY, body)) try_body() self.emit("POP_BLOCK") self.setups.pop() self.emit("LOAD_CONST", None) self.nextBlock(final) self.setups.push((END_FINALLY, final)) finalbody() self.emit("END_FINALLY") if except_protect: self.emit("POP_EXCEPT") self.setups.pop() def skip_func_docstring(self, body): return body def emitMapUnpack(self, containers, is_unpacking): if containers > 1 or is_unpacking: self.emit("BUILD_MAP_UNPACK", containers) def conjure_arguments(self, args: List[ast.arg]) -> ast.arguments: return ast.arguments(args, None, [], [], None, []) class Entry: kind: int block: pyassem.Block exit: Optional[pyassem.Block] def __init__(self, kind, block, exit): self.kind = kind self.block = block self.exit = exit class Python38CodeGenerator(Python37CodeGenerator): flow_graph = pyassem.PyFlowGraph38 consts = consts38 @classmethod # pyre-fixme[14]: `optimize_tree` overrides method defined in # `Python37CodeGenerator` inconsistently. def optimize_tree(cls, optimize: int, tree: AST): return AstOptimizer38(optimize=optimize > 0).visit(tree) def make_function_graph( self, func, filename: str, scopes, class_name: str, name: str, first_lineno: int ) -> PyFlowGraph: args = [ misc.mangle(elt.arg, class_name) for elt in itertools.chain(func.args.posonlyargs, func.args.args) ] kwonlyargs = [misc.mangle(elt.arg, class_name) for elt in func.args.kwonlyargs] starargs = [] if func.args.vararg: starargs.append(func.args.vararg.arg) if func.args.kwarg: starargs.append(func.args.kwarg.arg) scope = scopes[func] graph = self.flow_graph( name, filename, scope, flags=self.get_graph_flags(func, scope), args=args, kwonlyargs=kwonlyargs, starargs=starargs, optimized=1, docstring=self.get_docstring(func), firstline=first_lineno, peephole_enabled=self.graph.peephole_enabled, posonlyargs=len(func.args.posonlyargs), ) return graph def visitWhile(self, node): self.set_lineno(node) test_const = self.get_bool_const(node.test) loop = self.newBlock("while_loop") else_ = self.newBlock("while_else") after = self.newBlock("while_after") self.push_loop(WHILE_LOOP, loop, after) if test_const is False: with self.noEmit(): self.visit(node.test) self.visit(node.body) self.pop_loop() if node.orelse: self.visit(node.orelse) self.nextBlock(after) return elif test_const is True: # emulate co_firstlineno behavior of C compiler self.graph.maybeEmitSetLineno() self.nextBlock(loop) with self.maybeEmit(test_const is not True): self.compileJumpIf(node.test, else_ or after, False) self.nextBlock(label="while_body") self.visit(node.body) self.emit("JUMP_ABSOLUTE", loop) with self.maybeEmit(test_const is not True): self.nextBlock(else_ or after) # or just the POPs if not else clause self.pop_loop() if node.orelse: self.visit(node.orelse) self.nextBlock(after) def visitNamedExpr(self, node: ast.NamedExpr): self.update_lineno(node) self.visit(node.value) self.emit("DUP_TOP") self.visit(node.target) def push_loop(self, kind, start, end): self.setups.push(Entry(kind, start, end)) def pop_loop(self): self.setups.pop() def visitAsyncFor(self, node): start = self.newBlock("async_for_try") except_ = self.newBlock("except") end = self.newBlock("end") self.set_lineno(node) self.visit(node.iter) self.emit("GET_AITER") self.nextBlock(start) self.setups.push(Entry(FOR_LOOP, start, end)) self.emit("SETUP_FINALLY", except_) self.emit("GET_ANEXT") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("POP_BLOCK") self.visit(node.target) self.visit(node.body) self.emit("JUMP_ABSOLUTE", start) self.setups.pop() self.nextBlock(except_) self.emit("END_ASYNC_FOR") if node.orelse: self.visit(node.orelse) self.nextBlock(end) def unwind_setup_entry(self, e: Entry, preserve_tos: int) -> None: if e.kind == WHILE_LOOP: return if e.kind == END_FINALLY: e.exit = None self.emit("POP_FINALLY", preserve_tos) if preserve_tos: self.emit("ROT_TWO") self.emit("POP_TOP") elif e.kind == FOR_LOOP: if preserve_tos: self.emit("ROT_TWO") self.emit("POP_TOP") elif e.kind == EXCEPT: self.emit("POP_BLOCK") elif e.kind == TRY_FINALLY: self.emit("POP_BLOCK") self.emit("CALL_FINALLY", e.exit) elif e.kind == TRY_FINALLY_BREAK: self.emit("POP_BLOCK") if preserve_tos: self.emit("ROT_TWO") self.emit("POP_TOP") self.emit("CALL_FINALLY", e.exit) else: self.emit("CALL_FINALLY", e.exit) self.emit("POP_TOP") elif e.kind in (WITH, ASYNC_WITH): self.emit("POP_BLOCK") if preserve_tos: self.emit("ROT_TWO") self.emit("BEGIN_FINALLY") self.emit("WITH_CLEANUP_START") if e.kind == ASYNC_WITH: self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("WITH_CLEANUP_FINISH") self.emit("POP_FINALLY", 0) elif e.kind == HANDLER_CLEANUP: if preserve_tos: self.emit("ROT_FOUR") if e.exit: self.emit("POP_BLOCK") self.emit("POP_EXCEPT") self.emit("CALL_FINALLY", e.exit) else: self.emit("POP_EXCEPT") else: raise Exception(f"Unexpected kind {e.kind}") def visitContinue(self, node): self.set_lineno(node) for e in reversed(self.setups): if e.kind in (FOR_LOOP, WHILE_LOOP): self.emit("JUMP_ABSOLUTE", e.block) return self.unwind_setup_entry(e, 0) raise SyntaxError( "'continue' not properly in loop", self.syntax_error_position(node) ) def unwind_setup_entries(self, preserve_tos: bool) -> None: for e in reversed(self.setups): self.unwind_setup_entry(e, preserve_tos) def visitReturn(self, node): self.checkReturn(node) self.set_lineno(node) preserve_tos = bool(node.value and not isinstance(node.value, ast.Constant)) if preserve_tos: self.visit(node.value) self.unwind_setup_entries(preserve_tos) if not node.value: self.emit("LOAD_CONST", None) elif not preserve_tos: self.visit(node.value) self.emit("RETURN_VALUE") def compile_async_comprehension(self, generators, gen_index, elt, val, type, load_iter_from_dot0): start = self.newBlock("start") except_ = self.newBlock("except") if_cleanup = self.newBlock("if_cleanup") gen = generators[gen_index] if load_iter_from_dot0: self.loadName(".0") else: self.visit(gen.iter) self.emit("GET_AITER") self.emitAsyncIterYieldFrom() self.nextBlock(start) self.emit("SETUP_FINALLY", except_) self.emit("GET_ANEXT") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("POP_BLOCK") self.visit(gen.target) for if_ in gen.ifs: self.compileJumpIf(if_, if_cleanup, False) self.newBlock() gen_index += 1 if gen_index < len(generators): self.compile_comprehension_body(generators, gen_index, elt, val, type) elif type is ast.GeneratorExp: self.visit(elt) self.emit("YIELD_VALUE") self.emit("POP_TOP") elif type is ast.ListComp: self.visit(elt) self.emit("LIST_APPEND", gen_index + 1) elif type is ast.SetComp: self.visit(elt) self.emit("SET_ADD", gen_index + 1) elif type is ast.DictComp: self.compile_dictcomp_element(elt, val) self.emit("MAP_ADD", gen_index + 1) else: raise NotImplementedError("unknown comprehension type") self.nextBlock(if_cleanup) self.emit("JUMP_ABSOLUTE", start) self.nextBlock(except_) self.emit("END_ASYNC_FOR") def compile_dictcomp_element(self, elt, val): # For Py38+, the order of evaluation was reversed. self.visit(elt) self.visit(val) def visitTryExcept(self, node): body = self.newBlock("try_body") except_ = self.newBlock("try_handlers") orElse = self.newBlock("try_else") end = self.newBlock("try_end") self.emit("SETUP_FINALLY", except_) self.nextBlock(body) self.setups.push(Entry(EXCEPT, body, None)) self.visit(node.body) self.emit("POP_BLOCK") self.setups.pop() self.emit("JUMP_FORWARD", orElse) self.nextBlock(except_) last = len(node.handlers) - 1 for i in range(len(node.handlers)): handler = node.handlers[i] expr = handler.type target = handler.name body = handler.body self.set_lineno(handler) except_ = self.newBlock(f"try_except_{i}") if expr: self.emit("DUP_TOP") self.visit(expr) self.emit("COMPARE_OP", "exception match") self.emit("POP_JUMP_IF_FALSE", except_) elif i < last: raise SyntaxError( "default 'except:' must be last", self.syntax_error_position(handler), ) else: self.set_lineno(handler) self.emit("POP_TOP") if target: cleanup_end = self.newBlock(f"try_cleanup_end{i}") cleanup_body = self.newBlock(f"try_cleanup_body{i}") self.storeName(target) self.emit("POP_TOP") self.emit("SETUP_FINALLY", cleanup_end) self.nextBlock(cleanup_body) self.setups.push(Entry(HANDLER_CLEANUP, cleanup_body, cleanup_end)) self.visit(body) self.emit("POP_BLOCK") self.emit("BEGIN_FINALLY") self.setups.pop() self.nextBlock(cleanup_end) self.setups.push(Entry(END_FINALLY, cleanup_end, None)) self.emit("LOAD_CONST", None) self.storeName(target) self.delName(target) self.emit("END_FINALLY") self.emit("POP_EXCEPT") self.setups.pop() else: cleanup_body = self.newBlock(f"try_cleanup_body{i}") self.emit("POP_TOP") self.emit("POP_TOP") self.nextBlock(cleanup_body) self.setups.push(Entry(HANDLER_CLEANUP, cleanup_body, None)) self.visit(body) self.emit("POP_EXCEPT") self.setups.pop() self.emit("JUMP_FORWARD", end) self.nextBlock(except_) self.emit("END_FINALLY") self.nextBlock(orElse) self.visit(node.orelse) self.nextBlock(end) def visitBreak(self, node): self.set_lineno(node) for b in reversed(self.setups): self.unwind_setup_entry(b, 0) if b.kind == WHILE_LOOP or b.kind == FOR_LOOP: self.emit("JUMP_ABSOLUTE", b.exit) return raise SyntaxError("'break' outside loop", self.syntax_error_position(node)) def emit_try_finally(self, node, try_body, finalbody, except_protect=False): body = self.newBlock("try_finally_body") end = self.newBlock("try_finally_end") self.set_lineno(node) break_finally = True # compile FINALLY_END out of order to match CPython with self.graph.new_compile_scope() as compile_end_finally: self.nextBlock(end) self.setups.push(Entry(END_FINALLY, end, end)) finalbody() self.emit("END_FINALLY") break_finally = self.setups[-1].exit is None if break_finally: self.emit("POP_TOP") self.setups.pop() self.set_lineno(node) if break_finally: self.emit("LOAD_CONST", None) self.emit("SETUP_FINALLY", end) self.nextBlock(body) self.setups.push( Entry(TRY_FINALLY_BREAK if break_finally else TRY_FINALLY, body, end) ) try_body() self.emit("POP_BLOCK") self.emit("BEGIN_FINALLY") self.setups.pop() self.graph.apply_from_scope(compile_end_finally) def visitWith_(self, node, kind, pos=0): item = node.items[pos] block = self.newBlock("with_block") finally_ = self.newBlock("with_finally") self.visit(item.context_expr) if kind == ASYNC_WITH: self.emit("BEFORE_ASYNC_WITH") self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("SETUP_ASYNC_WITH", finally_) else: self.emit("SETUP_WITH", finally_) self.nextBlock(block) self.setups.push(Entry(kind, block, finally_)) if item.optional_vars: self.visit(item.optional_vars) else: self.emit("POP_TOP") if pos + 1 < len(node.items): self.visitWith_(node, kind, pos + 1) else: self.visit(node.body) self.emit("POP_BLOCK") self.emit("BEGIN_FINALLY") self.setups.pop() self.nextBlock(finally_) self.setups.push(Entry(END_FINALLY, finally_, None)) self.emit("WITH_CLEANUP_START") if kind == ASYNC_WITH: self.emit("GET_AWAITABLE") self.emit("LOAD_CONST", None) self.emit("YIELD_FROM") self.emit("WITH_CLEANUP_FINISH") self.emit("END_FINALLY") self.setups.pop() def visitWith(self, node): self.set_lineno(node) self.visitWith_(node, WITH, 0) def visitAsyncWith(self, node, pos=0): self.checkAsyncWith(node) self.visitWith_(node, ASYNC_WITH, 0) def annotate_args(self, args: ast.arguments) -> List[str]: ann_args = [] for arg in args.args: self.annotate_arg(arg, ann_args) for arg in args.posonlyargs: self.annotate_arg(arg, ann_args) if args.vararg: # pyre-fixme[6]: Expected `arg` for 1st param but got `Optional[_ast.arg]`. self.annotate_arg(args.vararg, ann_args) for arg in args.kwonlyargs: self.annotate_arg(arg, ann_args) if args.kwarg: # pyre-fixme[6]: Expected `arg` for 1st param but got `Optional[_ast.arg]`. self.annotate_arg(args.kwarg, ann_args) return ann_args def conjure_arguments(self, args: List[ast.arg]) -> ast.arguments: return ast.arguments([], args, None, [], [], None, []) def skip_visit(self): """On Py38 we never want to skip visiting nodes.""" return False def visit(self, node: Union[Sequence[AST], AST], *args): # Note down the old line number could_be_multiline_expr = isinstance(node, ast.expr) old_lineno = self.graph.lineno if could_be_multiline_expr else None ret = super().visit(node, *args) if old_lineno is not None and old_lineno != self.graph.lineno: self.graph.lineno = old_lineno self.graph.lineno_set = False return ret class CinderCodeGenerator(Python38CodeGenerator): flow_graph = pyassem.PyFlowGraphCinder def set_qual_name(self, qualname): self._qual_name = qualname def getCode(self): code = super().getCode() # pyre-fixme [21]: cinder from cinder import _set_qualname _set_qualname(code, self._qual_name) return code def _nameOp(self, prefix, name) -> None: if ( prefix == "LOAD" and name == "super" and isinstance(self.scope, symbols.FunctionScope) ): scope = self.scope.check_name(name) if scope in (SC_GLOBAL_EXPLICIT, SC_GLOBAL_IMPLICIT): self.scope.suppress_jit = True super()._nameOp(prefix, name) def _is_super_call(self, node): if ( not isinstance(node, ast.Call) or not isinstance(node.func, ast.Name) or node.func.id != "super" or node.keywords ): return False # check that 'super' only appear as implicit global: # it is not defined in local or modules scope if ( self.scope.check_name("super") != SC_GLOBAL_IMPLICIT or self.module_gen.scope.check_name("super") != SC_GLOBAL_IMPLICIT ): return False if len(node.args) == 2: return True if len(node.args) == 0: if len(self.scope.params) == 0: return False return self.scope.check_name("__class__") == SC_FREE return False def _emit_args_for_super(self, super_call, attr): if len(super_call.args) == 0: self.loadName("__class__") self.loadName(next(iter(self.scope.params))) else: for arg in super_call.args: self.visit(arg) return (self.mangle(attr), len(super_call.args) == 0) def visitAttribute(self, node): if isinstance(node.ctx, ast.Load) and self._is_super_call(node.value): self.emit("LOAD_GLOBAL", "super") load_arg = self._emit_args_for_super(node.value, node.attr) self.emit("LOAD_ATTR_SUPER", load_arg) else: super().visitAttribute(node) def visitCall(self, node): if ( not isinstance(node.func, ast.Attribute) or not isinstance(node.func.ctx, ast.Load) or node.keywords or any(isinstance(arg, ast.Starred) for arg in node.args) ): # We cannot optimize this call return super().visitCall(node) self.update_lineno(node) if self._is_super_call(node.func.value): self.emit("LOAD_GLOBAL", "super") load_arg = self._emit_args_for_super(node.func.value, node.func.attr) self.emit("LOAD_METHOD_SUPER", load_arg) for arg in node.args: self.visit(arg) self.emit("CALL_METHOD", len(node.args)) return self.visit(node.func.value) self.emit("LOAD_METHOD", self.mangle(node.func.attr)) for arg in node.args: self.visit(arg) self.emit("CALL_METHOD", len(node.args)) def get_default_generator(): if "cinder" in sys.version: return CinderCodeGenerator if sys.version_info >= (3, 8): return Python38CodeGenerator if sys.version_info >= (3, 7): return Python37CodeGenerator return CodeGenerator def get_docstring(node): if ( node.body and isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Str) ): return node.body[0].value.s def findOp(node): """Find the op (DELETE, LOAD, STORE) in an AssTuple tree""" v = OpFinder() v.VERBOSE = 0 walk(node, v) return v.op class OpFinder: def __init__(self): self.op = None def visitAssName(self, node): if self.op is None: self.op = node.flags elif self.op != node.flags: raise ValueError("mixed ops in stmt") visitAssAttr = visitAssName visitSubscript = visitAssName class Delegator: """Base class to support delegation for augmented assignment nodes To generator code for augmented assignments, we use the following wrapper classes. In visitAugAssign, the left-hand expression node is visited twice. The first time the visit uses the normal method for that node . The second time the visit uses a different method that generates the appropriate code to perform the assignment. These delegator classes wrap the original AST nodes in order to support the variant visit methods. """ def __init__(self, obj): self.obj = obj def __getattr__(self, attr): return getattr(self.obj, attr) def __eq__(self, other): return other == self.obj def __hash__(self): return hash(self.obj) class AugAttribute(Delegator): pass class AugName(Delegator): pass class AugSubscript(Delegator): pass class CompInner(Delegator): def __init__(self, obj, nested_scope, init_inst, elt_nodes, elt_insts): Delegator.__init__(self, obj) self.nested_scope = nested_scope self.init_inst = init_inst self.elt_nodes = elt_nodes self.elt_insts = elt_insts wrapper = { ast.Attribute: AugAttribute, ast.Name: AugName, ast.Subscript: AugSubscript, } def wrap_aug(node): return wrapper[node.__class__](node) PythonCodeGenerator = get_default_generator() if __name__ == "__main__": for file in sys.argv[1:]: compileFile(file)