choose_ci_set.py (178 lines of code) (raw):
import os
import re
import sys
from subprocess import check_output
import pytest
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CI_MARK = "@pytest.mark.ci"
HEADED_MARK = "@pytest.mark.headed"
OUTPUT_FILE = "selected_tests"
class CollectionPlugin:
"""Mini plugin to get test names"""
def __init__(self):
self.tests = []
def pytest_report_collectionfinish(self, items):
self.tests = [item.nodeid for item in items]
def snakify(pascal: str) -> str:
"""Convert PascalCase to snake_case"""
chars = pascal[0].lower()
for c in pascal[1:]:
if c == c.upper():
chars = chars + f"_{c.lower()}"
else:
chars = chars + c
return chars
def pascalify(snake: str) -> str:
"""Convert snake_case to PascalCase"""
chars = snake[0].upper()
up_flag = False
for c in snake[1:]:
if up_flag:
chars = chars + c.upper()
up_flag = False
elif c == "_":
up_flag = True
else:
chars = chars + c
return chars
def localify(path: str) -> str:
"""Remove the script dir from an item"""
return path.replace(SCRIPT_DIR, ".")
def get_tests_by_model(
model_name: str, test_paths_and_contents: dict, run_list: list
) -> list:
"""
Given a model name, a dict of paths and their file contents, and the
list of existing tests/dirs to check, return matches by local paths
"""
matching_tests = []
for path, test in test_paths_and_contents.items():
localpath = localify(path)
in_run_list = False
for entry in run_list:
if entry in localpath:
in_run_list = True
if not in_run_list and model_name in test:
matching_tests.append(localpath)
return matching_tests
def dedupe(run_list: list, slash: str) -> list:
"""For a run list, remove entries that are covered by more general entries."""
run_list = list(set(run_list))
dotslashes = []
removes = []
for i, entry in enumerate(run_list):
if (
not entry.startswith(".")
and not entry.startswith("\\")
and not entry.startswith("/")
):
dotslashes.append(i)
for dotslash in dotslashes:
run_list[dotslash] = f".{slash}{run_list[dotslash]}"
for i, entry_a in enumerate(run_list):
for j, entry_b in enumerate(run_list):
if i == j:
continue
candidate = max((i, j))
if entry_a in entry_b and candidate not in removes:
removes.append(candidate)
removes.sort(reverse=True)
for remove in removes:
del run_list[remove]
return run_list
if __name__ == "__main__":
if os.path.exists(".env"):
with open(".env") as fh:
contents = fh.read()
if "TESTRAIL_REPORT='true'" in contents:
os.environ["TESTRAIL_REPORT"] = "true"
if "RUN_ALL='true'" in contents:
os.environ["MANUAL"] = "true"
if os.environ.get("TESTRAIL_REPORT") or os.environ.get("MANUAL"):
# Run all tests if this is a scheduled beta or a manual run
with open(OUTPUT_FILE, "w") as fh:
fh.write("tests")
sys.exit(0)
slash = "/" if "/" in SCRIPT_DIR else "\\"
re_obj = {
"test_re_string": r".*/.*/test_.*\.py",
"suite_conftest_re_string": r".*/.*/conftest\.py",
"selectors_json_re_string": r"modules/data/.*\.components\.json",
"object_model_re_string": r"modules/.*.object.*\.py",
"class_re_string": r"\s*class (\w+)[(A-Za-z0-9_)]*:",
}
for k in list(re_obj.keys()):
if slash == "\\":
re_obj[k] = re_obj.get(k).replace("/", r"\\")
short_name = "_".join(k.split("_")[:-1])
re_obj[short_name] = re.compile(re_obj.get(k))
run_list = []
check_output(["git", "fetch", "--quiet", "--depth=1", "origin", "main"])
committed_files = (
check_output(["git", "--no-pager", "diff", "--name-only", "origin/main"])
.decode()
.replace("/", slash)
.splitlines()
)
main_conftest = "conftest.py"
base_page = os.path.join("modules", "page_base.py")
if main_conftest in committed_files or base_page in committed_files:
# Run all the tests (no files as arguments) if main conftest or basepage changed
with open(OUTPUT_FILE, "w") as fh:
fh.write("tests")
sys.exit(0)
all_tests = []
test_paths_and_contents = {}
for root, _, files in os.walk(os.path.join(SCRIPT_DIR, "tests")):
for f in files:
this_file = os.path.join(root, f)
if re_obj.get("test_re").search(this_file) and "__pycache" not in this_file:
all_tests.append(os.path.join(this_file))
with open(this_file, encoding="utf-8") as fh:
lines = fh.readlines()
test_paths_and_contents[this_file] = "".join(lines)
p = CollectionPlugin()
pytest.main(["--collect-only", "-m", "ci", "-s"], plugins=[p])
ci_paths = [f".{slash}{test}" for test in p.tests]
# Dedupe just in case
ci_paths = list(set(ci_paths))
changed_suite_conftests = [
f for f in committed_files if re_obj.get("suite_conftest_re").match(f)
]
changed_selectors = [
f for f in committed_files if re_obj.get("selectors_json_re").match(f)
]
changed_models = [
f for f in committed_files if re_obj.get("object_model_re").match(f)
]
changed_tests = [f for f in committed_files if re_obj.get("test_re").match(f)]
if changed_suite_conftests:
run_list = [
"." + slash + os.path.join(*suite.split(slash)[-3:-1])
for suite in changed_suite_conftests
]
if changed_selectors:
for selector_file in changed_selectors:
(_, filename) = os.path.split(selector_file)
model_name = pascalify(filename.split(".")[0])
for test_name in get_tests_by_model(
model_name, test_paths_and_contents, run_list
):
run_list.append(test_name)
if changed_models:
for model_file in changed_models:
model_file_contents = "".join([line for line in open(model_file)])
classes = re_obj.get("class_re").findall(model_file_contents)
for model_name in classes:
for test_name in get_tests_by_model(
model_name, test_paths_and_contents, run_list
):
run_list.append(test_name)
if changed_tests:
for changed_test in changed_tests:
found = False
for file in run_list:
# Don't add if already exists in suite changes
pieces = file.split(slash)
if len(pieces) == 3 and pieces[-1] in changed_test:
found = True
# Don't add if already in list
if file in changed_test:
found = True
if not found:
run_list.append(changed_test)
if not run_list:
with open(OUTPUT_FILE, "w") as fh:
fh.write("\n".join(ci_paths))
else:
run_list.extend(ci_paths)
# Dedupe just in case
run_list = dedupe(run_list, slash)
run_list = [entry for entry in run_list if os.path.exists(entry.split("::")[0])]
with open(OUTPUT_FILE, "w") as fh:
fh.write("\n".join(run_list))