python/pyspark/errors/exceptions/tblib.py (200 lines of code) (raw):

""" Class for parsing Python tracebacks. This module was adapted from the `tblib` package https://github.com/ionelmc/python-tblib modified to also recover line content from the traceback. BSD 2-Clause License Copyright (c) 2013-2023, Ionel Cristian Mărieș. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ import re import sys from types import CodeType, FrameType, TracebackType from typing import Any, Dict, List, Optional __version__ = "3.0.0" __all__ = "Traceback", "TracebackParseError", "Frame", "Code" FRAME_RE = re.compile( r'^\s*File "(?P<co_filename>.+)", line (?P<tb_lineno>\d+)(, in (?P<co_name>.+))?$' ) class _AttrDict(dict): __slots__ = () def __getattr__(self, name: str) -> Any: try: return self[name] except KeyError: raise AttributeError(name) from None # noinspection PyPep8Naming class __traceback_maker(Exception): pass class TracebackParseError(Exception): pass class Code: """ Class that replicates just enough of the builtin Code object to enable serialization and traceback rendering. """ co_code: Optional[bytes] = None def __init__(self, code: CodeType) -> None: self.co_filename = code.co_filename self.co_name: Optional[str] = code.co_name self.co_argcount = 0 self.co_kwonlyargcount = 0 self.co_varnames = () self.co_nlocals = 0 self.co_stacksize = 0 self.co_flags = 64 self.co_firstlineno = 0 class Frame: """ Class that replicates just enough of the builtin Frame object to enable serialization and traceback rendering. Args: get_locals (callable): A function that take a frame argument and returns a dict. See :class:`Traceback` class for example. """ def __init__(self, frame: FrameType, *, get_locals: Any = None) -> None: self.f_locals = {} if get_locals is None else get_locals(frame) self.f_globals = {k: v for k, v in frame.f_globals.items() if k in ("__file__", "__name__")} self.f_code = Code(frame.f_code) self.f_lineno = frame.f_lineno def clear(self) -> None: """ For compatibility with PyPy 3.5; clear() was added to frame in Python 3.4 and is called by traceback.clear_frames(), which in turn is called by unittest.TestCase.assertRaises """ class LineCacheEntry(list): """ The list of lines in a file where only some of the lines are available. """ def set_line(self, lineno: int, line: str) -> None: self.extend([""] * (lineno - len(self))) self[lineno - 1] = line class Traceback: """ Class that wraps builtin Traceback objects. Args: get_locals (callable): A function that take a frame argument and returns a dict. Ideally you will only return exactly what you need, and only with simple types that can be json serializable. Example: .. code:: python def get_locals(frame): if frame.f_locals.get("__tracebackhide__"): return {"__tracebackhide__": True} else: return {} """ tb_next: Optional["Traceback"] = None def __init__(self, tb: TracebackType, *, get_locals: Any = None): self.tb_frame = Frame(tb.tb_frame, get_locals=get_locals) self.tb_lineno = int(tb.tb_lineno) self.cached_lines: Dict[str, Dict[int, str]] = {} # filename -> lineno -> line """ Lines shown in the parsed traceback. """ # Build in place to avoid exceeding the recursion limit _tb = tb.tb_next prev_traceback = self cls = type(self) while _tb is not None: traceback = object.__new__(cls) traceback.tb_frame = Frame(_tb.tb_frame, get_locals=get_locals) traceback.tb_lineno = int(_tb.tb_lineno) prev_traceback.tb_next = traceback prev_traceback = traceback _tb = _tb.tb_next def populate_linecache(self) -> None: """ For each cached line, update the linecache if the file is not present. This helps us show the original lines even if the source file is not available, for example when the parsed traceback comes from a different host. """ import linecache for filename, lines in self.cached_lines.items(): entry: list[str] = linecache.getlines(filename, module_globals=None) if entry: if not isinstance(entry, LineCacheEntry): # no need to update the cache if the file is present continue else: entry = LineCacheEntry() linecache.cache[filename] = (1, None, entry, filename) for lineno, line in lines.items(): entry.set_line(lineno, line) def as_traceback(self) -> Optional[TracebackType]: """ Convert to a builtin Traceback object that is usable for raising or rendering a stacktrace. """ current: Optional[Traceback] = self top_tb = None tb = None stub = compile( "raise __traceback_maker", "<string>", "exec", ) while current: f_code = current.tb_frame.f_code code = stub.replace( co_firstlineno=current.tb_lineno, co_argcount=0, co_filename=f_code.co_filename, co_name=f_code.co_name or stub.co_name, co_freevars=(), co_cellvars=(), ) # noinspection PyBroadException try: exec( code, dict(current.tb_frame.f_globals), dict(current.tb_frame.f_locals) ) # noqa: S102 except Exception: next_tb = sys.exc_info()[2].tb_next # type: ignore if top_tb is None: top_tb = next_tb if tb is not None: tb.tb_next = next_tb tb = next_tb del next_tb current = current.tb_next try: return top_tb finally: del top_tb del tb to_traceback = as_traceback def as_dict(self) -> dict: """ Converts to a dictionary representation. You can serialize the result to JSON as it only has builtin objects like dicts, lists, ints or strings. """ if self.tb_next is None: tb_next = None else: tb_next = self.tb_next.as_dict() code = { "co_filename": self.tb_frame.f_code.co_filename, "co_name": self.tb_frame.f_code.co_name, } frame = { "f_globals": self.tb_frame.f_globals, "f_locals": self.tb_frame.f_locals, "f_code": code, "f_lineno": self.tb_frame.f_lineno, } return { "tb_frame": frame, "tb_lineno": self.tb_lineno, "tb_next": tb_next, } to_dict = as_dict @classmethod def from_dict(cls, dct: dict) -> "Traceback": """ Creates an instance from a dictionary with the same structure as ``.as_dict()`` returns. """ if dct["tb_next"]: tb_next = cls.from_dict(dct["tb_next"]) else: tb_next = None code = _AttrDict( co_filename=dct["tb_frame"]["f_code"]["co_filename"], co_name=dct["tb_frame"]["f_code"]["co_name"], ) frame = _AttrDict( f_globals=dct["tb_frame"]["f_globals"], f_locals=dct["tb_frame"].get("f_locals", {}), f_code=code, f_lineno=dct["tb_frame"]["f_lineno"], ) tb = _AttrDict( tb_frame=frame, tb_lineno=dct["tb_lineno"], tb_next=tb_next, ) return cls(tb, get_locals=get_all_locals) # type: ignore @classmethod def from_string(cls, string: str, strict: bool = True) -> "Traceback": """ Creates an instance by parsing a stacktrace. Strict means that parsing stops when lines are not indented by at least two spaces anymore. """ frames: List[Dict[str, str]] = [] cached_lines: Dict[str, Dict[int, str]] = {} lines = string.splitlines()[::-1] if strict: # skip the header while lines: line = lines.pop() if line == "Traceback (most recent call last):": break while lines: line = lines.pop() frame_match = FRAME_RE.match(line) if frame_match: frames.append(frame_match.groupdict()) if lines and lines[-1].startswith(" "): # code for the frame code = lines.pop().strip() filename = frame_match.group("co_filename") lineno = int(frame_match.group("tb_lineno")) cached_lines.setdefault(filename, {}).setdefault(lineno, code) elif line.startswith(" "): pass elif strict: break # traceback ended if frames: previous = None for frame in reversed(frames): previous = _AttrDict( frame, tb_frame=_AttrDict( frame, f_globals=_AttrDict( __file__=frame["co_filename"], __name__="?", ), f_locals={}, f_code=_AttrDict(frame), f_lineno=int(frame["tb_lineno"]), ), tb_next=previous, ) self = cls(previous) # type: ignore self.cached_lines = cached_lines return self else: raise TracebackParseError("Could not find any frames in %r." % string) def get_all_locals(frame: FrameType) -> dict: return dict(frame.f_locals)