ptr.py (948 lines of code) (raw):

#!/usr/bin/env python3 # Copyright © Meta Platforms, Inc. and affiliates # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # coding=utf8 # Can remove once we're >= 3.7 so asyncio has a .run() method # pyre-ignore-all-errors[0] # pyre-ignore-all-errors[16] import argparse import ast import asyncio import logging import sys from collections import defaultdict, namedtuple from configparser import ConfigParser from enum import Enum from json import dump from os import chdir, cpu_count, environ, getcwd, getpid from os.path import sep from pathlib import Path from platform import system from shutil import rmtree from subprocess import CalledProcessError from tempfile import gettempdir from time import time from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union LOG = logging.getLogger(__name__) MACOSX = system() == "Darwin" # To make main use asyncio.run and unittests to test approrpiately for older cpython # Using 3.8 rather than 3.7 due to subprocess.exec in < 3.8 only support main loop # https://bugs.python.org/issue35621 PY_38_OR_GREATER = sys.version_info >= (3, 8) WINDOWS = system() == "Windows" # Windows needs to use a ProactorEventLoop for subprocesses # Need to use sys.platform for mypy to understand # https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950 # pylint: disable=C0301 if sys.platform == "win32": asyncio.set_event_loop(asyncio.ProactorEventLoop()) def _config_default() -> ConfigParser: if WINDOWS: venv_pkgs = "black coverage flake8 mypy pip pylint setuptools usort" else: venv_pkgs = "black coverage flake8 mypy pip pylint pyre-check setuptools usort" LOG.info("Using default config settings") cp = ConfigParser() cp["ptr"] = {} cp["ptr"]["atonce"] = str(int((cpu_count() or 20) / 2) or 1) cp["ptr"]["exclude_patterns"] = "build* yocto" cp["ptr"]["pypi_url"] = "https://pypi.org/simple/" cp["ptr"]["venv_pkgs"] = venv_pkgs return cp def _config_read( cwd: str, conf_name: str = ".ptrconfig", cp: Optional[ConfigParser] = None ) -> ConfigParser: """Look from cwd to / for a "conf_name" file - If so read it in""" if cp is None: cp = _config_default() cwd_path = Path(cwd) # type: Path root_path = Path(f"{cwd_path.drive}\\") if WINDOWS else Path("/") while cwd_path: ptrconfig_path = cwd_path / conf_name if ptrconfig_path.exists(): cp.read(str(ptrconfig_path)) LOG.info(f"Loading found config @ {ptrconfig_path}") break if cwd_path == root_path: break cwd_path = cwd_path.parent return cp CWD = getcwd() CONFIG = _config_read(CWD) PIP_CONF_TEMPLATE = """\ [global] index-url = {} timeout = {}""" # Windows venv + pip are super slow VENV_TIMEOUT = 120 class StepName(Enum): pip_install = 1 tests_run = 2 analyze_coverage = 3 mypy_run = 4 usort_run = 5 black_run = 6 pylint_run = 7 flake8_run = 8 pyre_run = 9 coverage_line = namedtuple("coverage_line", ["stmts", "miss", "cover", "missing"]) step = namedtuple( "step", ["step_name", "run_condition", "cmds", "log_message", "timeout"] ) test_result = namedtuple( "test_result", ["setup_py_path", "returncode", "output", "runtime", "timeout"] ) def _get_site_packages_path(venv_path: Path) -> Optional[Path]: lib_path = venv_path / ("Lib" if WINDOWS else "lib") for apath in lib_path.iterdir(): if apath.is_dir() and apath.match("python*"): return apath / "site-packages" if apath.is_dir() and apath.name == "site-packages": return apath LOG.error(f"Unable to find a python lib dir in {lib_path}") return None def _remove_pct_symbol(pct_str: str) -> str: return pct_str.strip().replace("%", "") def _analyze_coverage( venv_path: Path, setup_py_path: Path, required_cov: Dict[str, float], coverage_report: str, stats: Dict[str, int], test_run_start_time: float, ) -> Optional[test_result]: module_path = setup_py_path.parent site_packages_path = _get_site_packages_path(venv_path) if not site_packages_path: LOG.error("Analyze coverage is unable to find site-packages path") return None relative_site_packages = str(site_packages_path.relative_to(venv_path)) + sep if not coverage_report: LOG.error( f"No coverage report for {setup_py_path} - Unable to enforce coverage" " requirements" ) return None if not required_cov: LOG.error(f"No required coverage to enforce for {setup_py_path}") return None coverage_lines = {} for line in coverage_report.splitlines(): if not line or line.startswith("-") or line.startswith("Name"): continue module_path_str = None sl_path = None sl = line.split(maxsplit=4) if sl[0] != "TOTAL": sl_path = _max_osx_private_handle(sl[0], site_packages_path) if sl_path and sl_path.is_absolute() and site_packages_path: for possible_abs_path in (module_path, site_packages_path): try: module_path_str = str(sl_path.relative_to(possible_abs_path)) except ValueError as ve: LOG.debug(ve) elif sl_path: module_path_str = str(sl_path).replace(relative_site_packages, "") else: module_path_str = sl[0] if not module_path_str: LOG.error( f"[{setup_py_path}] Unable to find path relative path for {sl[0]}" ) continue if len(sl) == 4: coverage_lines[module_path_str] = coverage_line( float(sl[1]), float(sl[2]), float(_remove_pct_symbol(sl[3])), "" ) else: coverage_lines[module_path_str] = coverage_line( float(sl[1]), float(sl[2]), float(_remove_pct_symbol(sl[3])), sl[4] ) if sl[0] != "TOTAL": stats[ f"suite.{module_path.name}_coverage.file.{module_path_str}" ] = coverage_lines[module_path_str].cover else: stats[f"suite.{module_path.name}_coverage.total"] = coverage_lines[ module_path_str ].cover failed_output = "The following files did not meet coverage requirements:\n" failed_coverage = False for afile, cov_req in required_cov.items(): try: cover = coverage_lines[afile].cover except KeyError: err = ( f"{afile} has not reported any coverage. Does the file exist? " + "Does it get ran during tests? Remove from setup config." ) keyerror_runtime = int(time() - test_run_start_time) return test_result( setup_py_path, StepName.analyze_coverage.value, err, keyerror_runtime, False, ) if cover < cov_req: failed_coverage = True cov_lines = coverage_lines[afile] failed_output += f" {afile}: {cov_lines.cover} < {cov_req} - Missing: {cov_lines.missing}\n" if failed_coverage: failed_cov_runtime = int(time() - test_run_start_time) return test_result( setup_py_path, StepName.analyze_coverage.value, failed_output, failed_cov_runtime, False, ) return None def _max_osx_private_handle( potenital_path: str, site_packages_path: Path ) -> Optional[Path]: """On Mac OS X `coverage` seems to always resolve /private for anything stored in /var. ptr's usage of gettempdir() seems to result in using dirs within there This function strips /private if it exists on the path supplied from coverage ONLY IF site_packages_path is not based in /private""" if not MACOSX: return Path(potenital_path) private_path = Path("/private") try: site_packages_path.relative_to(private_path) return Path(potenital_path) except ValueError: pass return Path(potenital_path.replace("/private", "")) def _write_stats_file(stats_file: str, stats: Dict[str, int]) -> None: stats_file_path = Path(stats_file) if not stats_file_path.is_absolute(): stats_file_path = Path(CWD) / stats_file_path try: with stats_file_path.open("w", encoding="utf8") as sfp: dump(stats, sfp, indent=2, sort_keys=True) except OSError as ose: LOG.exception( f"Unable to write out JSON statistics file to {stats_file} ({ose})" ) def _generate_black_cmd(module_dir: Path, black_exe: Path) -> Tuple[str, ...]: return (str(black_exe), "--check", ".") def _generate_install_cmd( pip_exe: str, module_dir: str, config: Dict[str, Any] ) -> Tuple[str, ...]: cmds = [pip_exe, "-v", "install", module_dir] if "tests_require" in config and config["tests_require"]: for dep in config["tests_require"]: cmds.append(dep) return tuple(cmds) def _generate_test_suite_cmd(coverage_exe: Path, config: Dict) -> Tuple[str, ...]: if config.get("test_suite", False): return (str(coverage_exe), "run", "-m", config["test_suite"]) return () def _generate_mypy_cmd( module_dir: Path, mypy_exe: Path, config: Dict ) -> Tuple[str, ...]: if config.get("run_mypy", False): mypy_entry_point = module_dir / f"{config['entry_point_module']}.py" else: return () cmds = [str(mypy_exe)] mypy_ini_path = module_dir / "mypy.ini" if mypy_ini_path.exists(): cmds.extend(["--config", str(mypy_ini_path)]) cmds.append(str(mypy_entry_point)) return tuple(cmds) def _generate_flake8_cmd( module_dir: Path, flake8_exe: Path, config: Dict ) -> Tuple[str, ...]: if not config.get("run_flake8", False): return () cmds = [str(flake8_exe)] flake8_config = module_dir / ".flake8" if flake8_config.exists(): cmds.extend(["--config", str(flake8_config)]) return tuple(cmds) def _generate_pylint_cmd( module_dir: Path, pylint_exe: Path, config: Dict ) -> Tuple[str, ...]: if not config.get("run_pylint", False): return () py_files = set() # type: Set[str] find_py_files(py_files, module_dir) cmds = [str(pylint_exe)] pylint_config = module_dir / ".pylint" if pylint_config.exists(): cmds.extend(["--rcfile", str(pylint_config)]) return tuple([*cmds, *sorted(py_files)]) def _generate_pyre_cmd( module_dir: Path, pyre_exe: Path, config: Dict ) -> Tuple[str, ...]: if not config.get("run_pyre", False) or WINDOWS: return () return (str(pyre_exe), "--source-directory", str(module_dir), "check") def _generate_usort_cmd( module_dir: Path, usort_exe: Path, config: Dict ) -> Tuple[str, ...]: if not config.get("run_usort", False): return () py_files = set() # type: Set[str] find_py_files(py_files, module_dir) return (str(usort_exe), "check", *sorted(py_files)) def _parse_setup_params(setup_py: Path) -> Dict[str, Any]: with setup_py.open("r", encoding="utf8") as sp: setup_tree = ast.parse(sp.read()) LOG.debug(f"AST visiting {setup_py}") for node in ast.walk(setup_tree): if isinstance(node, ast.Assign): for target in node.targets: target_id = getattr(target, "id", None) if not target_id: continue if target_id == "ptr_params": LOG.debug(f"Found ptr_params in {setup_py}") return dict(ast.literal_eval(node.value)) return {} def _get_test_modules( base_path: Path, stats: Dict[str, int], run_disabled: bool, print_non_configured: bool, ) -> Dict[Path, Dict]: get_tests_start_time = time() all_setup_pys = find_setup_pys( base_path, set(CONFIG["ptr"]["exclude_patterns"].split()) if CONFIG["ptr"]["exclude_patterns"] else set(), ) stats["total.setup_pys"] = len(all_setup_pys) non_configured_modules = [] # type: List[Path] test_modules = {} # type: Dict[Path, Dict] for setup_py in all_setup_pys: disabled_err_msg = f"Not running {setup_py} as ptr is disabled via config" # If a setup.cfg exists lets prefer it, if there is a [ptr] section ptr_params = parse_setup_cfg(setup_py) if not ptr_params: ptr_params = _parse_setup_params(setup_py) if ptr_params: if ptr_params.get("disabled", False) and not run_disabled: LOG.info(disabled_err_msg) stats["total.disabled"] += 1 else: test_modules[setup_py] = ptr_params if setup_py not in test_modules: non_configured_modules.append(setup_py) if print_non_configured and non_configured_modules: print_non_configured_modules(non_configured_modules) stats["total.non_ptr_setup_pys"] = len(non_configured_modules) stats["total.ptr_setup_pys"] = len(test_modules) stats["runtime.parse_setup_pys"] = int(time() - get_tests_start_time) return test_modules def _handle_debug(debug: bool) -> bool: """Turn on debugging if asked otherwise INFO default""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( format="[%(asctime)s] %(levelname)s: %(message)s (%(filename)s:%(lineno)d)", level=log_level, ) return debug def _validate_base_dir(base_dir: str) -> Path: base_dir_path = Path(base_dir) if not base_dir_path.is_absolute(): base_dir_path = Path(CWD) / base_dir_path if not base_dir_path.exists(): LOG.error(f"{base_dir} does not exit. Not running tests") sys.exit(69) return base_dir_path async def _gen_check_output( cmd: Sequence[str], timeout: Union[int, float] = 30, env: Optional[Dict[str, str]] = None, cwd: Optional[Path] = None, ) -> Tuple[bytes, bytes]: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, cwd=cwd, ) try: (stdout, stderr) = await asyncio.wait_for(process.communicate(), timeout) except asyncio.TimeoutError: process.kill() await process.wait() raise if process.returncode != 0: cmd_str = " ".join(cmd) raise CalledProcessError( process.returncode or -1, cmd_str, output=stdout, stderr=stderr ) return (stdout, stderr) async def _progress_reporter( progress_interval: float, queue: asyncio.Queue, total_tests: int ) -> None: while queue.qsize() > 0: done_count = total_tests - queue.qsize() done_pct = int((done_count / total_tests) * 100) LOG.info(f"{done_count} / {total_tests} test suites ran ({done_pct}%)") await asyncio.sleep(progress_interval) LOG.debug("progress_reporter finished") def _set_build_env(build_base_path: Optional[Path]) -> Dict[str, str]: build_environ = environ.copy() if not build_base_path or not build_base_path.exists(): if build_base_path: LOG.error( f"Configured local build env path {build_base_path} does not exist" ) return build_environ if build_base_path.exists(): build_env_vars = [ ( ("PATH", build_base_path / "Scripts") if WINDOWS else ("PATH", build_base_path / "sbin") ), ("C_INCLUDE_PATH", build_base_path / "include"), ("CPLUS_INCLUDE_PATH", build_base_path / "include"), ] if not WINDOWS: build_env_vars.append(("PATH", build_base_path / "bin")) for var_name, value in build_env_vars: if var_name in build_environ: build_environ[var_name] = f"{value}:{build_environ[var_name]}" else: build_environ[var_name] = str(value) else: LOG.error( f"{build_base_path} does not exist. Not add int PATH + INCLUDE Env variables" ) return build_environ def _set_pip_mirror( venv_path: Path, mirror: str = CONFIG["ptr"]["pypi_url"], timeout: int = 2 ) -> None: pip_conf_path = venv_path / "pip.conf" with pip_conf_path.open("w", encoding="utf8") as pcfp: print(PIP_CONF_TEMPLATE.format(mirror, timeout), file=pcfp) async def _test_steps_runner( # pylint: disable=R0914 test_run_start_time: int, tests_to_run: Dict[Path, Dict], setup_py_path: Path, venv_path: Path, env: Dict, stats: Dict[str, int], error_on_warnings: bool, print_cov: bool = False, ) -> Tuple[Optional[test_result], int]: bin_dir = "Scripts" if WINDOWS else "bin" exe = ".exe" if WINDOWS else "" black_exe = venv_path / bin_dir / f"black{exe}" coverage_exe = venv_path / bin_dir / f"coverage{exe}" flake8_exe = venv_path / bin_dir / f"flake8{exe}" mypy_exe = venv_path / bin_dir / f"mypy{exe}" pip_exe = venv_path / bin_dir / f"pip{exe}" pylint_exe = venv_path / bin_dir / f"pylint{exe}" pyre_exe = venv_path / bin_dir / f"pyre{exe}" usort_exe = venv_path / bin_dir / f"usort{exe}" config = tests_to_run[setup_py_path] steps = ( step( StepName.pip_install, True, _generate_install_cmd(str(pip_exe), str(setup_py_path.parent), config), f"Installing {setup_py_path} + deps", config["test_suite_timeout"], ), step( StepName.tests_run, bool("test_suite" in config and config["test_suite"]), _generate_test_suite_cmd(coverage_exe, config), f"Running {config.get('test_suite', '')} tests via coverage", config["test_suite_timeout"], ), step( StepName.analyze_coverage, bool( print_cov or ( "required_coverage" in config and config["required_coverage"] and len(config["required_coverage"]) > 0 ) ), (str(coverage_exe), "report", "-m"), f"Analyzing coverage report for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.mypy_run, bool("run_mypy" in config and config["run_mypy"]), _generate_mypy_cmd(setup_py_path.parent, mypy_exe, config), f"Running mypy for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.usort_run, bool("run_usort" in config and config["run_usort"]), _generate_usort_cmd(setup_py_path.parent, usort_exe, config), f"Running usort for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.black_run, bool("run_black" in config and config["run_black"]), _generate_black_cmd(setup_py_path.parent, black_exe), f"Running black for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.flake8_run, bool("run_flake8" in config and config["run_flake8"]), _generate_flake8_cmd(setup_py_path.parent, flake8_exe, config), f"Running flake8 for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.pylint_run, bool("run_pylint" in config and config["run_pylint"]), _generate_pylint_cmd(setup_py_path.parent, pylint_exe, config), f"Running pylint for {setup_py_path}", config["test_suite_timeout"], ), step( StepName.pyre_run, bool("run_pyre" in config and config["run_pyre"] and not WINDOWS), _generate_pyre_cmd(setup_py_path.parent, pyre_exe, config), f"Running pyre for {setup_py_path}", config["test_suite_timeout"], ), ) steps_ran = 0 for a_step in steps: a_test_result = None # Skip test if disabled if not a_step.run_condition: LOG.info(f"Not running {a_step.log_message} step") continue LOG.info(a_step.log_message) stdout = b"" steps_ran += 1 try: if a_step.cmds: LOG.debug(f"CMD: {' '.join(a_step.cmds)}") # If we're running tests and we want warnings to be errors step_env = env if a_step.step_name in [StepName.tests_run, StepName.pyre_run]: step_env = env.copy() step_env["PYTHONPATH"] = getcwd() if a_step.step_name == StepName.tests_run and error_on_warnings: step_env["PYTHONWARNINGS"] = "error" LOG.debug("Setting PYTHONWARNINGS to error") stdout, _stderr = await _gen_check_output( a_step.cmds, a_step.timeout, env=step_env, cwd=setup_py_path.parent ) else: LOG.debug(f"Skipping running a cmd for {a_step} step") except CalledProcessError as cpe: err_output = cpe.stdout.decode("utf8") LOG.debug(f"{a_step.log_message} FAILED for {setup_py_path}") a_test_result = test_result( setup_py_path, a_step.step_name.value, err_output, int(time() - test_run_start_time), False, ) except asyncio.TimeoutError as toe: LOG.debug(f"{setup_py_path} timed out running {a_step.log_message} ({toe})") a_test_result = test_result( setup_py_path, a_step.step_name.value, f"Timeout during {a_step.log_message}", a_step.timeout, True, ) if a_step.step_name is StepName.analyze_coverage: cov_report = stdout.decode("utf8") if stdout else "" if print_cov: print(f"{setup_py_path}:\n{cov_report}") if "required_coverage" not in config: # Add fake 0% TOTAL coverage required so step passes config["required_coverage"] = {"TOTAL": 0} if a_step.run_condition: a_test_result = _analyze_coverage( venv_path, setup_py_path, config["required_coverage"], cov_report, stats, test_run_start_time, ) # If we've had a failure return if a_test_result: return a_test_result, steps_ran return None, steps_ran async def _test_runner( queue: asyncio.Queue, tests_to_run: Dict[Path, Dict], test_results: List[test_result], venv_path: Path, print_cov: bool, stats: Dict[str, int], error_on_warnings: bool, idx: int, ) -> None: extra_build_env_path = ( Path(CONFIG["ptr"]["extra_build_env_prefix"]) if "extra_build_env_prefix" in CONFIG["ptr"] else None ) env = _set_build_env(extra_build_env_path) while True: try: setup_py_path = queue.get_nowait() except asyncio.QueueEmpty: LOG.debug(f"test_runner {idx} exiting") return test_run_start_time = int(time()) test_fail_result, steps_ran = await _test_steps_runner( test_run_start_time, tests_to_run, setup_py_path, venv_path, env, stats, error_on_warnings, print_cov, ) total_success_runtime = int(time() - test_run_start_time) if test_fail_result: test_results.append(test_fail_result) else: success_output = f"{setup_py_path} has passed all configured tests" LOG.info(success_output) test_results.append( test_result( setup_py_path, 0, success_output, total_success_runtime, False ) ) stats_name = setup_py_path.parent.name stats[f"suite.{stats_name}_runtime"] = total_success_runtime stats[f"suite.{stats_name}_completed_steps"] = steps_ran queue.task_done() async def create_venv( mirror: str, py_exe: str = sys.executable, install_pkgs: bool = True, timeout: float = VENV_TIMEOUT, system_site_packages: bool = False, ) -> Optional[Path]: start_time = time() venv_path = Path(gettempdir()) / f"ptr_venv_{getpid()}" if WINDOWS: pip_exe = venv_path / "Scripts" / "pip.exe" else: pip_exe = venv_path / "bin" / "pip" install_cmd: List[str] = [] try: cmd = [py_exe, "-m", "venv", str(venv_path)] if system_site_packages: cmd.append("--system-site-packages") await _gen_check_output(cmd, timeout=timeout) _set_pip_mirror(venv_path, mirror) if install_pkgs: install_cmd = [str(pip_exe), "install"] install_cmd.extend(CONFIG["ptr"]["venv_pkgs"].split()) await _gen_check_output(install_cmd, timeout=timeout) except CalledProcessError as cpe: LOG.exception(f"Failed to setup venv @ {venv_path} - '{install_cmd}'' ({cpe})") if cpe.stderr: LOG.debug(f"venv stderr:\n{cpe.stderr.decode('utf8')}") if cpe.output: LOG.debug(f"venv stdout:\n{cpe.output.decode('utf8')}") return None runtime = int(time() - start_time) LOG.info(f"Successfully created venv @ {venv_path} to run tests ({runtime}s)") return venv_path def find_py_files(py_files: Set[str], base_dir: Path) -> None: dirs = [d for d in base_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] py_files.update( {str(x) for x in base_dir.iterdir() if x.is_file() and x.suffix == ".py"} ) for directory in dirs: find_py_files(py_files, directory) def _recursive_find_files( files: Set[Path], base_dir: Path, exclude_patterns: Set[str], follow_symlinks: bool ) -> None: dirs = [d for d in base_dir.iterdir() if d.is_dir()] files.update( {x for x in base_dir.iterdir() if x.is_file() and x.name == "setup.py"} ) for directory in dirs: if not follow_symlinks and directory.is_symlink(): continue skip_dir = False for exclude_pattern in exclude_patterns: if directory.match(exclude_pattern): skip_dir = True LOG.debug( f"Skipping {directory} due to exclude pattern {exclude_pattern}" ) if not skip_dir: _recursive_find_files(files, directory, exclude_patterns, follow_symlinks) def find_setup_pys( base_path: Path, exclude_patterns: Set[str], follow_symlinks: bool = False ) -> Set[Path]: setup_pys = set() # type: Set[Path] _recursive_find_files(setup_pys, base_path, exclude_patterns, follow_symlinks) return setup_pys def parse_setup_cfg(setup_py: Path) -> Dict[str, Any]: req_cov_key_strip = "required_coverage_" ptr_params = {} # type: Dict[str, Any] setup_cfg = setup_py.parent / "setup.cfg" if not setup_cfg.exists(): return ptr_params cp = ConfigParser() # Have configparser maintain key case - T484 error = callable cp.optionxform = str # type: ignore cp.read(setup_cfg) if "ptr" not in cp: LOG.info("{} does not have a ptr section") return ptr_params # Create a setup.py like ptr_params to return ptr_params["required_coverage"] = {} for key, value in cp["ptr"].items(): if key.startswith(req_cov_key_strip): key = key.strip(req_cov_key_strip) ptr_params["required_coverage"][key] = int(value) elif key.startswith("run_") or key == "disabled": ptr_params[key] = cp.getboolean("ptr", key) elif key == "test_suite_timeout": ptr_params[key] = cp.getint("ptr", key) else: ptr_params[key] = value return ptr_params def print_non_configured_modules(modules: List[Path]) -> None: print(f"== {len(modules)} non ptr configured modules ==") for module in sorted(modules): print(f" - {str(module)}") def print_test_results( test_results: Sequence[test_result], stats: Optional[Dict[str, int]] = None ) -> Dict[str, int]: if not stats: stats = defaultdict(int) stats["total.test_suites"] = len(test_results) fail_output = "" for result in sorted(test_results): if result.returncode: if result.timeout: stats["total.timeouts"] += 1 else: stats["total.fails"] += 1 fail_output += ( f"{result.setup_py_path} (failed '{StepName(result.returncode).name}' " + f"step):\n{result.output}\n" ) else: stats["total.passes"] += 1 total_time = -1 if "runtime.all_tests" not in stats else stats["runtime.all_tests"] print(f"-- Summary (total time {total_time}s):\n") # TODO: Hardcode some workaround to ensure Windows always prints UTF8 # https://github.com/facebookincubator/ptr/issues/34 print(f"✅ PASS: {stats['total.passes']}") print(f"❌ FAIL: {stats['total.fails']}") print(f"⌛ TIMEOUT: {stats['total.timeouts']}") print(f"🔒 DISABLED: {stats['total.disabled']}") print(f"💩 TOTAL: {stats['total.test_suites']}\n") if "total.setup_pys" in stats: stats["pct.setup_py_ptr_enabled"] = int( (stats["total.test_suites"] / stats["total.setup_pys"]) * 100 ) print( f"-- {stats['total.test_suites']} / {stats['total.setup_pys']} " + f"({stats['pct.setup_py_ptr_enabled']}%) `setup.py`'s have " + "`ptr` tests running\n" ) if fail_output: print("-- Failure Output --\n") print(fail_output) return stats async def run_tests( atonce: int, mirror: str, tests_to_run: Dict[Path, Dict], progress_interval: Union[float, int], venv_path: Optional[Path], venv_keep: bool, print_cov: bool, stats: Dict[str, int], stats_file: str, venv_timeout: float, error_on_warnings: bool, system_site_packages: bool, ) -> int: tests_start_time = time() if not venv_path or not venv_path.exists(): venv_create_start_time = time() venv_path = await create_venv( mirror=mirror, timeout=venv_timeout, system_site_packages=system_site_packages, ) stats["venv_create_time"] = int(time() - venv_create_start_time) else: venv_keep = True if not venv_path or not venv_path.exists(): LOG.error("Unable to make a venv to run tests in. Exiting") return 3 # Be at the base of the venv to ensure we have a known neutral cwd chdir(str(venv_path)) queue: asyncio.Queue = asyncio.Queue() for test_setup_py in sorted(tests_to_run.keys()): await queue.put(test_setup_py) test_results = [] # type: List[test_result] consumers = [ _test_runner( queue, tests_to_run, test_results, venv_path, print_cov, stats, error_on_warnings, i + 1, ) for i in range(atonce) ] if progress_interval: LOG.debug(f"Adding progress reporter to report every {progress_interval}s") consumers.append( _progress_reporter(progress_interval, queue, len(tests_to_run)) ) LOG.debug("Starting to run tests") await asyncio.gather(*consumers) stats["runtime.all_tests"] = int(time() - tests_start_time) stats = print_test_results(test_results, stats) _write_stats_file(stats_file, stats) if not venv_keep: chdir(gettempdir()) rmtree(str(venv_path)) else: LOG.info(f"Not removing venv @ {venv_path} due to CLI arguments") return stats["total.fails"] + stats["total.timeouts"] async def async_main( atonce: int, base_path: Path, mirror: str, progress_interval: float, venv: str, venv_keep: bool, print_cov: bool, print_non_configured: bool, run_disabled: bool, stats_file: str, venv_timeout: float, error_on_warnings: bool, system_site_packages: bool, ) -> int: stats = defaultdict(int) # type: Dict[str, int] tests_to_run = _get_test_modules( base_path, stats, run_disabled, print_non_configured ) if not tests_to_run: LOG.error( f"{str(base_path)} has no setup.py files with unit tests defined. Exiting" ) return 1 if print_non_configured: return 0 try: venv_path = Path(venv) # type: Optional[Path] if venv_path and not venv_path.exists(): LOG.error(f"{venv_path} venv does not exist. Please correct!") return 2 except TypeError: venv_path = None return await run_tests( atonce, mirror, tests_to_run, progress_interval, venv_path, venv_keep, print_cov, stats, stats_file, venv_timeout, error_on_warnings, system_site_packages, ) def main() -> None: default_stats_file = Path(gettempdir()) / f"ptr_stats_{getpid()}" parser = argparse.ArgumentParser() parser.add_argument( "-a", "--atonce", default=int(CONFIG["ptr"]["atonce"]), type=int, help=f"How many tests to run at once [Default: {int(CONFIG['ptr']['atonce'])}]", ) parser.add_argument( "-b", "--base-dir", default=CWD, help=f"Path to recursively look for setup.py files [Default: {CWD}]", ) parser.add_argument( "-d", "--debug", action="store_true", help="Verbose debug output" ) parser.add_argument( "-e", "--error-on-warnings", action="store_true", help="Have Python warnings raise DeprecationWarning on tests run", ) parser.add_argument( "-k", "--keep-venv", action="store_true", help="Do not remove created venv" ) parser.add_argument( "-m", "--mirror", default=CONFIG["ptr"]["pypi_url"], help=( f"URL for pip to use for Simple API [Default: {CONFIG['ptr']['pypi_url']}]" ), ) parser.add_argument( "--print-cov", action="store_true", help="Print modules coverage report" ) parser.add_argument( "--print-non-configured", action="store_true", help="Print modules not configured to run ptr", ) parser.add_argument( "--progress-interval", default=0, type=float, help="Seconds between status update on test running [Default: Disabled]", ) parser.add_argument( "--run-disabled", action="store_true", help="Force any disabled tests suites to run", ) parser.add_argument( "--stats-file", default=str(default_stats_file), help=f"JSON statistics file [Default: {default_stats_file}]", ) parser.add_argument( "--system-site-packages", action="store_true", help="Give the virtual environment access to the system site-packages dir", ) parser.add_argument("--venv", help="Path to venv to reuse") parser.add_argument( "--venv-timeout", type=int, default=VENV_TIMEOUT, help=( "Timeout in seconds for venv creation + deps install [Default:" f" {VENV_TIMEOUT}]" ), ) args = parser.parse_args() _handle_debug(args.debug) LOG.info(f"Starting {sys.argv[0]}") main_coro = async_main( args.atonce, _validate_base_dir(args.base_dir), args.mirror, args.progress_interval, args.venv, args.keep_venv, args.print_cov, args.print_non_configured, args.run_disabled, args.stats_file, args.venv_timeout, args.error_on_warnings, args.system_site_packages, ) # This is gated to >= 3.8 for unittests # Once we're >= 3.7 tests could be refactored so we can use # asyncio.run in 3.7 if getattr(asyncio, "run", None) and PY_38_OR_GREATER: sys.exit(asyncio.run(main_coro)) else: loop = asyncio.get_event_loop() try: sys.exit(loop.run_until_complete(main_coro)) finally: loop.close() if __name__ == "__main__": main() # pragma: no cover