testslide/runner.py (683 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import inspect
import io
import os
import os.path
import random
import re
import sys
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from importlib import import_module
from typing import Any, Callable, Dict, List, Optional, Pattern, Union, cast
import psutil
import pygments
import pygments.formatters
import pygments.lexers
from pygments.token import (
Comment,
Error,
Generic,
Keyword,
Name,
Number,
Operator,
String,
Token,
Whitespace,
)
import testslide
from . import AggregatedExceptions, Context, Example, Skip, _ExampleRunner
##
## Base
##
TS_COLORSCHEME = {
Token: ("", ""),
Whitespace: ("gray", "brightblack"),
Comment: ("gray", "brightblack"),
Comment.Preproc: ("cyan", "brightcyan"),
Keyword: ("brightblue", "brightblue"),
Keyword.Type: ("cyan", "brightcyan"),
Operator.Word: ("magenta", "brightmagenta"),
Name.Builtin: ("cyan", "brightcyan"),
Name.Function: ("green", "brightgreen"),
Name.Namespace: ("_cyan_", "_brightcyan_"),
Name.Class: ("_green_", "_brightgreen_"),
Name.Exception: ("cyan", "brightcyan"),
Name.Decorator: ("brightblack", "gray"),
Name.Variable: ("red", "brightred"),
Name.Constant: ("red", "brightred"),
Name.Attribute: ("cyan", "brightcyan"),
Name.Tag: ("brightblue", "brightblue"),
String: ("yellow", "yellow"),
Number: ("brightblue", "brightblue"),
Generic.Deleted: ("brightred", "brightred"),
Generic.Inserted: ("green", "brightgreen"),
Generic.Heading: ("**", "**"),
Generic.Subheading: ("*magenta*", "*brightmagenta*"),
Generic.Prompt: ("**", "**"),
Generic.Error: ("brightred", "brightred"),
Error: ("_brightred_", "_brightred_"),
}
class BaseFormatter:
"""
Formatter base class. To be paired with Runner, to process / output example
execution results.
"""
def __init__(
self,
import_module_names: List[str],
force_color: bool = False,
import_secs: Optional[float] = None,
trim_path_prefix: Optional[str] = None,
show_testslide_stack_trace: bool = False,
dsl_debug: bool = False,
) -> None:
self.import_module_names = import_module_names
self.force_color = force_color
self.import_secs = import_secs
self._import_secs_warn = True
self.trim_path_prefix = trim_path_prefix
self.show_testslide_stack_trace = show_testslide_stack_trace
self.dsl_debug = dsl_debug
self.current_hierarchy: List[Context] = []
self.results: Dict[
str, List[Union[Example, Dict[str, Union[Example, BaseException]]]]
] = {
"success": [],
"fail": [],
"skip": [],
}
self.start_time = psutil.Process(os.getpid()).create_time()
self.end_time: Optional[float] = None
self.duration_secs: Optional[float] = None
# Example Discovery
def discovery_start(self) -> None:
"""
To be called before example discovery starts.
"""
pass
def example_discovered(self, example: Example) -> None:
"""
To be called when a new example is discovered.
"""
print(example.full_name)
def discovery_finish(self) -> None:
"""
To be called before example discovery finishes.
"""
pass
# Test Execution
def start(self, example: Example) -> None:
"""
To be called before each example execution.
"""
context_to_print = [
context
for context in example.context.hierarchy
if context not in self.current_hierarchy
]
for context in context_to_print:
self.new_context(context)
self.new_example(example)
self.current_hierarchy = example.context.hierarchy
def new_context(self, context: Context) -> None:
"""
Called before an example execution, when its context is different from
previous executed example.
"""
pass
def new_example(self, example: Example) -> None:
"""
Called before an example execution.
"""
pass
def success(self, example: Example) -> None:
"""
Called when an example was Successfuly executed.
"""
self.results["success"].append(example)
def fail(self, example: Example, exception: BaseException) -> None:
"""
Called when an example failed on execution.
"""
self.results["fail"].append({"example": example, "exception": exception})
def skip(self, example: Example) -> None:
"""
Called when an example had the execution skipped.
"""
self.results["skip"].append(example)
def finish(self, not_executed_examples: List[Example]) -> None:
"""
Called when all examples finished execution.
"""
self.end_time = time.time()
self.duration_secs = self.end_time - self.start_time
# DSL
def dsl_example(self, example: Example, code: Callable) -> None:
pass
def dsl_before(self, example: Example, code: Callable) -> None:
pass
def dsl_after(self, example: Example, code: Callable) -> None:
pass
def dsl_around(self, example: Example, code: Callable) -> None:
pass
def dsl_memoize(self, example: Example, code: Callable) -> None:
pass
def dsl_memoize_before(self, example: Example, code: Callable) -> None:
pass
def dsl_function(self, example: Example, code: Callable) -> None:
pass
##
## Mixins
##
class ColorFormatterMixin(BaseFormatter):
@property
def colored(self) -> bool:
return sys.stdout.isatty() or self.force_color
def remove_terminal_escape(self, text: str) -> str:
return re.sub("\033\\[[0-9;]+m", "", text)
def _format_attrs(self, attrs: str, *values: Any) -> str:
text = "".join([str(value) for value in values])
if self.colored:
return "\033[0m\033[{attrs}m{text}\033[0m".format(attrs=attrs, text=text)
else:
return text
def _print_attrs(self, attrs: str, *values: Any, **kwargs: Any) -> None:
file = kwargs.get("file", None)
if file is not None:
raise ValueError()
if self.colored:
print(
self._format_attrs(attrs, *values),
**kwargs,
)
else:
print(*values, **kwargs)
def format_bright(self, *values: Any) -> str:
return self._format_attrs("1", *values)
def print_bright(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("1", *values, **kwargs)
def format_dim(self, *values: Any) -> str:
return self._format_attrs("2", *values)
def print_dim(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("2", *values, **kwargs)
def format_green(self, *values: Any) -> str:
return self._format_attrs("32", *values)
def print_green(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("32", *values, **kwargs)
def format_red(self, *values: Any) -> str:
return self._format_attrs("31", *values)
def print_red(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("31", *values, **kwargs)
def format_yellow(self, *values: Any) -> str:
return self._format_attrs("33", *values)
def format_yellow_bright(self, *values: Any) -> str:
return self._format_attrs("1;33", *values)
def print_yellow(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("33", *values, **kwargs)
def format_cyan(self, *values: Any) -> str:
return self._format_attrs("36", *values)
def print_cyan(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("36", *values, **kwargs)
def format_cyan_dim_underline(self, *values: Any) -> str:
return self._format_attrs("36;2;4", *values)
def print_cyan_dim_underline(self, *values: Any, **kwargs: Any) -> None:
self._print_attrs("36;2;4", *values, **kwargs)
class FailurePrinterMixin(ColorFormatterMixin):
TESTSLIDE_PATH: str = os.path.abspath(os.path.dirname(testslide.__file__))
def _get_test_module_index(self, tb: traceback.StackSummary) -> Optional[int]:
test_module_paths = [
import_module(import_module_name).__file__
for import_module_name in self.import_module_names
]
test_module_index = None
for index, value in enumerate(tb):
path = value[0]
if path in test_module_paths:
if test_module_index is None or index < test_module_index:
test_module_index = index
return test_module_index
def _print_stack_trace(self, exception: BaseException, cause_depth: int) -> None:
indent = " " * cause_depth
if cause_depth:
self.print_red(f"{indent} Caused by ", end="")
self.print_red(
"{exception_class}: {message}".format(
exception_class=exception.__class__.__name__,
message=f"\n{indent} ".join(str(exception).split("\n")),
)
)
tb = traceback.extract_tb(exception.__traceback__)
test_module_index = self._get_test_module_index(tb)
for index, (path, line, function_name, text) in enumerate(tb):
if not self.show_testslide_stack_trace:
if test_module_index is not None and index < test_module_index:
continue
if os.path.abspath(path).startswith(self.TESTSLIDE_PATH):
continue
if self.trim_path_prefix:
split = path.split(self.trim_path_prefix)
if len(split) == 2 and not split[0]:
path = split[1]
row_text = (
' File "{path}", line {line}, in {function_name}\n'
" {text}\n".format(
path=path,
line=line,
function_name=function_name,
text=text,
)
)
if self.colored:
row_text = pygments.highlight(
row_text,
pygments.lexers.PythonTracebackLexer(),
pygments.formatters.TerminalFormatter(colorscheme=TS_COLORSCHEME),
)
row_text = "\n".join(
"{indent} {line}".format(indent=indent, line=line)
for line in row_text.split("\n")[:-1]
)
print(row_text)
if exception.__cause__:
self._print_stack_trace(exception.__cause__, cause_depth=cause_depth + 1)
def print_failed_example(
self,
number: int,
example: Example,
exception: BaseException,
) -> None:
self.print_bright(
" {number}) {context}: {example}".format(
number=number, context=example.context.full_name, example=example
)
)
if type(exception) is AggregatedExceptions:
exception_list = exception.exceptions # type: ignore
else:
exception_list = [exception]
for number, exception in enumerate(exception_list):
self.print_red(
" {number}) ".format(
number=number + 1,
),
end="",
)
self._print_stack_trace(exception, cause_depth=0)
class SlowImportWarningMixin(ColorFormatterMixin):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
if self.import_secs and self.import_secs > 1 and self._import_secs_warn:
self.print_yellow(
"Warning: Importing test modules alone took %.1fs! To remove this slow "
"down remove object construction from module level. If not possible, "
"consider using/ lazy_import(). Try using --import-profiler to profile "
"your imports." % (self.import_secs)
)
self._import_secs_warn = False
class DSLDebugMixin:
def get_dsl_debug_indent(self, example: Example) -> str:
return ""
def _dsl_print(self, example: Example, description: str, code: Callable) -> None:
lineno: Union[str, int]
if not self.dsl_debug: # type: ignore
return
name = code.__name__
try:
file = inspect.getsourcefile(code)
except TypeError:
try:
file = inspect.getfile(code)
except TypeError:
file = "?"
if file and file.startswith(os.path.dirname(__file__)):
return
if self.trim_path_prefix: # type: ignore
split = file.split(self.trim_path_prefix) # type: ignore
if len(split) == 2 and not split[0]:
file = split[1]
try:
_lines, lineno = inspect.getsourcelines(code)
except OSError:
lineno = "?"
self.print_cyan( # type: ignore
"{indent}{description}: {name} @ {file_lineno}".format(
indent=self.get_dsl_debug_indent(example),
description=description,
name=name,
file_lineno=f"{file}:{lineno}",
)
)
def dsl_example(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "example", code)
def dsl_before(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "before", code)
def dsl_after(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "after", code)
def dsl_around(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "around", code)
def dsl_memoize(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "memoize", code)
def dsl_memoize_before(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "memoize_before", code)
def dsl_function(self, example: Example, code: Callable) -> None:
self._dsl_print(example, "function", code)
class VerboseFinishMixin(ColorFormatterMixin):
def _ansi_attrs(self, attrs: str, text: str) -> str:
if self.colored:
return "\033[0m\033[{attrs}m{text}\033[0m".format(attrs=attrs, text=text)
else:
return text
def _bright_attr(self, text: str) -> str:
return self._ansi_attrs("1", text)
def _green_bright_attr(self, text: str) -> str:
return self._ansi_attrs("32;1", text)
def _red_bright_attr(self, text: str) -> str:
return self._ansi_attrs("31;1", text)
def _yellow_bright_attr(self, text: str) -> str:
return self._ansi_attrs("33;1", text)
def _get_ascii_logo_lines(self) -> List[str]:
quote = '"'
backslash = "\\"
return f"""
{self._yellow_bright_attr("--_")} {self._green_bright_attr(f"|{quote}{quote}---__")}
{self._red_bright_attr("|'.")}{self._yellow_bright_attr("| |")}{self._green_bright_attr("|")} {self._bright_attr(".")} {self._green_bright_attr(f"{quote}{quote}{quote}|")}
{self._red_bright_attr("| |")}{self._yellow_bright_attr("| |")}{self._green_bright_attr("|")} {self._bright_attr(f"/|{backslash}{quote}{quote}-.")} {self._green_bright_attr("|")}
{self._red_bright_attr("| |")}{self._yellow_bright_attr("| |")}{self._green_bright_attr("|")} {self._bright_attr("| |")} {self._green_bright_attr("|")}
{self._red_bright_attr("| |")}{self._yellow_bright_attr("| |")}{self._green_bright_attr("|")} {self._bright_attr(f"| {backslash}|/")} {self._green_bright_attr("|")}
{self._red_bright_attr(f"|.{quote}")}{self._yellow_bright_attr("| |")}{self._green_bright_attr("|")} {self._bright_attr(f"--{quote}{quote}")} {self._bright_attr("'")}{self._green_bright_attr("__|")}
{self._yellow_bright_attr(f"--{quote}")} {self._green_bright_attr(f"|__---{quote}{quote}{quote}")}
""".split(
"\n"
)[
1:8
]
def _get_summary_lines(
self, total: int, success: int, fail: int, skip: int, not_executed_examples: int
) -> List[str]:
summary_lines: List[str] = []
if self.import_secs and self.import_secs > 2:
summary_lines.append(
self.format_yellow_bright("Imports took: %.1fs!" % (self.import_secs))
+ " Profile with "
+ self.format_bright("--import-profiler")
+ "."
)
else:
summary_lines.append("")
example = "examples" if total > 1 else "example"
summary_lines.append(
self.format_bright(
"Executed %s %s in %.1fs:"
% (total, example, cast(float, self.duration_secs)),
)
)
if success:
summary_lines.append(self.format_green(" Successful: ", success))
else:
summary_lines.append(self.format_dim(" Successful: ", success))
if fail:
summary_lines.append(self.format_red(" Failed: ", fail))
else:
summary_lines.append(self.format_dim(" Failed: ", fail))
if skip:
summary_lines.append(self.format_yellow(" Skipped: ", skip))
else:
summary_lines.append(self.format_dim(" Skipped: ", skip))
if not_executed_examples:
summary_lines.append(
self.format_cyan(" Not executed: ", not_executed_examples)
)
else:
summary_lines.append(
self.format_dim(" Not executed: ", not_executed_examples)
)
summary_lines.append(
self.format_cyan_dim_underline("https://testslide.readthedocs.io/")
)
return summary_lines
def finish(self, not_executed_examples: List[Example]) -> None:
super().finish(not_executed_examples)
success = len(self.results["success"])
fail = len(self.results["fail"])
skip = len(self.results["skip"])
total = success + fail + skip
if self.results["fail"]:
self.print_red("\nFailures:")
for number, result in enumerate(self.results["fail"]):
result = cast(Dict[str, Union[Example, BaseException]], result)
print("")
self.print_failed_example( # type: ignore
number + 1, result["example"], result["exception"] # type: ignore
)
summary_lines = self._get_summary_lines(
total, success, fail, skip, len(not_executed_examples)
)
max_summary_len = max(
[len(self.remove_terminal_escape(line)) for line in summary_lines]
)
logo_lines = self._get_ascii_logo_lines()
max_logo_len = max(
[len(self.remove_terminal_escape(line)) for line in logo_lines]
)
try:
columns, _lines = os.get_terminal_size()
except OSError:
columns = 80
if columns > 80:
columns = 80
if max_summary_len + max_logo_len + 1 <= columns:
logo_start_column = (
columns - max_summary_len - max_logo_len - 2 + max_summary_len
)
for idx in range(len(summary_lines)):
print(
summary_lines[idx],
" "
* (
max_summary_len
- len(self.remove_terminal_escape(summary_lines[idx]))
+ (logo_start_column - max_summary_len)
),
end="",
)
print(logo_lines[idx])
else:
for idx in range(len(summary_lines)):
print(
summary_lines[idx],
)
##
## Formatters
##
class QuietFormatter(BaseFormatter):
pass
class ProgressFormatter(DSLDebugMixin, SlowImportWarningMixin, FailurePrinterMixin):
"""
Simple formatter that outputs "." when an example passes or "F" w
"""
def new_example(self, example: Example) -> None:
super().new_example(example)
if self.dsl_debug:
print("")
def success(self, example: Example) -> None:
super().success(example)
self.print_green(".", end="")
def fail(self, example: Example, exception: BaseException) -> None:
super().fail(example, exception)
self.print_red("F", end="")
def skip(self, example: Example) -> None:
super().skip(example)
self.print_yellow("S", end="")
def finish(self, not_executed_examples: List[Example]) -> None:
super().finish(not_executed_examples)
if self.results["fail"] and not self.dsl_debug:
self.print_red("\nFailures:")
for number, result in enumerate(self.results["fail"]):
result = cast(Dict[str, Union[Example, BaseException]], result)
print("")
self.print_failed_example(
number + 1, result["example"], result["exception"] # type: ignore
)
print("")
class DocumentFormatter(
VerboseFinishMixin, DSLDebugMixin, SlowImportWarningMixin, FailurePrinterMixin
):
def get_dsl_debug_indent(self, example: Example) -> str:
return " " * (example.context.depth + 1)
def new_context(self, context: Context) -> None:
self.print_bright(
"{}{}{}".format(" " * context.depth, "*" if context.focus else "", context)
)
def _color_output(self) -> bool:
return sys.stdout.isatty() or self.force_color
def success(self, example: Example) -> None:
super().success(example)
self.print_green(
"{indent}{focus}{example}{pass_text}".format(
indent=" " * (example.context.depth + 1),
focus="*" if example.focus else "",
example=example,
pass_text="" if self._color_output() else ": PASS",
)
)
def fail(self, example: Example, exception: BaseException) -> None:
if isinstance(exception, AggregatedExceptions) and 1 == len(
exception.exceptions
):
exception = exception.exceptions[0]
super().fail(example, exception)
self.print_red(
"{indent}{focus}{example}: {ex_class}: {ex_message}".format(
indent=" " * (example.context.depth + 1),
focus="*" if example.focus else "",
example=example,
ex_class=type(exception).__name__,
ex_message=str(exception).split("\n")[0],
)
)
def skip(self, example: Example) -> None:
super().skip(example)
self.print_yellow(
"{indent}{focus}{example}{skip_text}".format(
indent=" " * (example.context.depth + 1),
focus="*" if example.focus else "",
example=example,
skip_text="" if self._color_output() else ": SKIP",
)
)
class LongFormatter(
VerboseFinishMixin, DSLDebugMixin, SlowImportWarningMixin, FailurePrinterMixin
):
def get_dsl_debug_indent(self, example: Example) -> str:
return " "
def new_example(self, example: Example) -> None:
self.print_bright(
"{}{}: ".format(
"*" if example.context.focus else "", example.context.full_name
),
end="",
)
if self.dsl_debug:
print("")
def _color_output(self) -> bool:
return sys.stdout.isatty() or self.force_color
def success(self, example: Example) -> None:
super().success(example)
if self.dsl_debug:
print(" ", end="")
self.print_green(
"{focus}{example}{pass_text}".format(
focus="*" if example.focus else "",
example=example,
pass_text="" if self._color_output() else ": PASS",
)
)
def fail(self, example: Example, exception: BaseException) -> None:
if isinstance(exception, AggregatedExceptions) and 1 == len(
exception.exceptions
):
exception = exception.exceptions[0]
super().fail(example, exception)
if self.dsl_debug:
print(" ", end="")
self.print_red(
"{focus}{example}: {ex_class}: {ex_message}".format(
focus="*" if example.focus else "",
example=example,
ex_class=type(exception).__name__,
ex_message=str(exception).split("\n")[0],
),
)
def skip(self, example: Example) -> None:
super().skip(example)
if self.dsl_debug:
print(" ", end="")
self.print_yellow(
"{focus}{example}{skip_text}".format(
focus="*" if example.focus else "",
example=example,
skip_text="" if self._color_output() else ": SKIP",
)
)
##
## Runner
##
class Runner:
"""
Execute examples contained in given contexts.
"""
def __init__(
self,
contexts: List[Context],
formatter: Union[SlowImportWarningMixin, DocumentFormatter],
shuffle: bool = False,
seed: int = None,
focus: bool = False,
fail_fast: bool = False,
fail_if_focused: bool = False,
names_text_filter: Optional[str] = None,
names_regex_filter: Optional[Pattern] = None,
names_regex_exclude: Optional[Pattern] = None,
quiet: bool = False,
) -> None:
self.contexts = contexts
self.formatter = formatter
self.shuffle = shuffle
self.seed = seed
self.focus = focus
self.fail_fast = fail_fast
self.fail_if_focused = fail_if_focused
self.names_text_filter = names_text_filter
self.names_regex_filter = names_regex_filter
self.names_regex_exclude = names_regex_exclude
self.quiet = quiet
def _run_example(self, example: Example) -> None:
if example.focus and self.fail_if_focused:
raise AssertionError(
"Focused example not allowed with --fail-if-focused"
". Please remove the focus to allow the test to run."
)
if self.quiet:
stdout = io.StringIO()
stderr = io.StringIO()
example_exception = None
with redirect_stdout(stdout), redirect_stderr(stderr):
try:
_ExampleRunner(example, self.formatter).run()
except BaseException as ex:
example_exception = ex
if example_exception:
if not isinstance(example_exception, Skip):
if stdout.getvalue():
print("stdout:\n{}".format(stdout.getvalue()))
if stderr.getvalue():
print("stderr:\n{}".format(stderr.getvalue()))
raise example_exception
else:
_ExampleRunner(example, self.formatter).run()
def run(self) -> int:
"""
Execute all examples in all contexts.
"""
sys.stdout.flush()
sys.stderr.flush()
executed_examples = []
exit_code = 0
for example in self._to_execute_examples:
executed_examples.append(example)
self.formatter.start(example)
sys.stdout.flush()
sys.stderr.flush()
try:
self._run_example(example)
except Skip:
self.formatter.skip(example)
except BaseException as exception:
self.formatter.fail(example, exception)
exit_code = 1
if self.fail_fast:
break
else:
self.formatter.success(example)
not_executed_examples = [
example
for example in self._all_examples
if example not in executed_examples
]
self.formatter.finish(not_executed_examples)
sys.stdout.flush()
sys.stderr.flush()
return exit_code
def _filter(self, example: Example, focus: bool) -> bool:
if focus and not example.focus:
return False
if self.names_regex_exclude:
if self.names_regex_exclude.search(example.full_name):
return False
if self.names_text_filter:
if self.names_text_filter not in example.full_name:
return False
if self.names_regex_filter:
if not self.names_regex_filter.search(example.full_name):
return False
return True
@property
def _all_examples(self) -> List[Example]:
examples = [
example for context in self.contexts for example in context.all_examples
]
if self.shuffle:
if self.seed:
random.seed(self.seed)
random.shuffle(examples)
return examples
@property
def _to_execute_examples(self) -> List[Example]:
examples = [
example
for example in self._all_examples
if self._filter(example, focus=self.focus)
]
if not examples and self.focus:
return [
example
for example in self._all_examples
if self._filter(example, focus=False)
]
return examples