modules/testrail_integration.py (455 lines of code) (raw):

import logging import os import re import subprocess import sys from modules import taskcluster as tc from modules import testrail as tr from modules.testrail import TestRail FX_PRERC_VERSION_RE = re.compile(r"(\d+)\.(\d\d?)[ab](\d\d?)-build(\d+)") FX_RC_VERSION_RE = re.compile(r"(\d+)\.(\d\d?)(.*)") FX_DEVED_VERSION_RE = re.compile(r"(\d+)\.(\d\d?)b(\d\d?)") FX_RELEASE_VERSION_RE = re.compile(r"(\d+)\.(\d\d?)\.(\d\d?)(.*)") TESTRAIL_RUN_FMT = ( "[{channel} {major}] Automated testing {major}.{minor}b{beta}-build{build}" ) PLAN_NAME_RE = re.compile(r"\[(\w+) (\d+)\]") CONFIG_GROUP_ID = 95 TESTRAIL_FX_DESK_PRJ = 17 def get_plan_title(version_str: str, channel: str) -> str: """Given a version string, get the plan_title""" version_match = FX_PRERC_VERSION_RE.match(version_str) if channel == "Devedition": logging.info(f"DevEdition: {version_str}") version_match = FX_DEVED_VERSION_RE.match(version_str) (major, minor, beta) = [version_match[n] for n in range(1, 4)] plan_title = ( TESTRAIL_RUN_FMT.replace("{channel}", channel) .replace("{major}", major) .replace("{minor}", minor) .replace("{beta}", beta) .split("-")[0] ) elif version_match: logging.info(version_match) (major, minor, beta, build) = [version_match[n] for n in range(1, 5)] logging.info(f"major {major} minor {minor} beta {beta} build {build}") plan_title = ( TESTRAIL_RUN_FMT.replace("{channel}", channel) .replace("{major}", major) .replace("{minor}", minor) .replace("{beta}", beta) .replace("{build}", build) ) else: # Version doesn't look like a normal beta, someone updated to the RC version_match = FX_RC_VERSION_RE.match(version_str) (major, minor) = [version_match[n] for n in range(1, 3)] plan_title = ( TESTRAIL_RUN_FMT.replace("{channel}", channel) .replace("{major}", major) .replace("{minor}", minor) .replace("{beta}", "rc") ) return plan_title def tc_reportable(): """For CI: return True if run is reportable, but get TC creds first""" creds = tc.get_tc_secret() if creds: os.environ["TESTRAIL_USERNAME"] = creds.get("TESTRAIL_USERNAME") os.environ["TESTRAIL_API_KEY"] = creds.get("TESTRAIL_API_KEY") os.environ["TESTRAIL_BASE_URL"] = creds.get("TESTRAIL_BASE_URL") else: sys.exit(100) if reportable(): sys.exit(0) else: sys.exit(100) def reportable(platform_to_test=None): """Return true if we should report to TestRail""" import platform if not os.environ.get("TESTRAIL_REPORT"): logging.warning("TESTRAIL_REPORT not set, session not reportable.") return False # If we ask for reporting, we can force a report if os.environ.get("REPORTABLE"): logging.warning("REPORTABLE=true; we will report this session.") return True # Find the correct test plan sys_platform = platform_to_test or platform.system() if platform_to_test: os.environ["FX_PLATFORM"] = platform_to_test version = ( subprocess.check_output([sys.executable, "./collect_executables.py", "-n"]) .strip() .decode() ) logging.warning(f"Got version from collect_executable.py! {version}") tr_session = testrail_init() major_number, second_half = version.split(".") if "-" in second_half: minor_num, _ = second_half.split("-") else: minor_num = second_half channel = os.environ.get("FX_CHANNEL") or "beta" channel = channel.title() if not channel: if "b" in minor_num: channel = "Beta" else: channel = "Release" major_version = f"Firefox {major_number}" major_milestone = tr_session.matching_milestone(TESTRAIL_FX_DESK_PRJ, major_version) if not major_milestone: logging.warning( f"Not reporting: Could not find matching milestone: Firefox {major_version}" ) return False channel_milestone = tr_session.matching_submilestone( major_milestone, f"{channel} {major_number}" ) if not channel_milestone: if channel == "Devedition": channel_milestone = tr_session.matching_submilestone( major_milestone, f"Beta {major_number}" ) if not channel_milestone: logging.warning( f"Not reporting: Could not find matching submilestone for {channel} {major_number}" ) return False plan_title = get_plan_title(version, channel) logging.warning(f"Plan title: {plan_title}") this_plan = tr_session.matching_plan_in_milestone( TESTRAIL_FX_DESK_PRJ, channel_milestone.get("id"), plan_title ) if not this_plan: logging.warning( f"Session reportable: could not find {plan_title} (milestone: {channel_milestone.get('id')})" ) return True if platform_to_test: sys_platform = platform_to_test platform = "MacOS" if sys_platform == "Darwin" else sys_platform plan_entries = this_plan.get("entries") covered_suites = 0 for entry in plan_entries: for run_ in entry.get("runs"): if run_.get("config") and platform in run_.get("config"): covered_suites += 1 num_suites = 0 for test_dir_name in os.listdir("tests"): test_dir = os.path.join("tests", test_dir_name) if os.path.isdir(test_dir) and not os.path.exists( os.path.join(test_dir, "skip_reporting") ): num_suites += 1 logging.warning( f"Potentially matching run found for {platform}, may be reportable. ({covered_suites} out of {num_suites} suites already reported.)" ) return covered_suites < num_suites def testrail_init() -> TestRail: """Connect to a TestRail API session""" local = os.environ.get("TESTRAIL_BASE_URL").split("/")[2].startswith("127") tr_session = tr.TestRail( os.environ.get("TESTRAIL_BASE_URL"), os.environ.get("TESTRAIL_USERNAME"), os.environ.get("TESTRAIL_API_KEY"), local, ) return tr_session def merge_results(*result_sets) -> dict: """Merge dictionaries of test results""" output = {} for results in result_sets: for key in results: if not output.get(key): output[key] = results[key] continue if key in ["passed", "skipped", "xfailed", "failed"]: for run_id in results.get(key): if not output.get(key).get(run_id): output[key][run_id] = results[key][run_id] continue output[key][run_id] += results[key][run_id] return output def mark_results(testrail_session: TestRail, test_results): """For each type of result, and per run, mark tests to status in batches""" logging.info(f"mark results: object\n{test_results}") existing_results = {} for category in ["passed", "skipped", "xfailed", "failed"]: for run_id in test_results[category]: if not existing_results.get(run_id): existing_results[run_id] = testrail_session.get_test_results(run_id) current_results = { result.get("case_id"): result.get("status_id") for result in existing_results[run_id] } suite_id = test_results[category][run_id][0].get("suite_id") all_test_cases = [ result.get("test_case") for result in test_results[category][run_id] ] # Don't set passed tests to another status. test_cases = [tc for tc in all_test_cases if current_results.get(tc) != 1] logging.warn( f"Setting the following test cases in run {run_id} to {category}: {test_cases}" ) testrail_session.update_test_cases( test_results.get("project_id"), testrail_run_id=run_id, testrail_suite_id=suite_id, test_case_ids=test_cases, status=category, ) def organize_entries(testrail_session: TestRail, expected_plan: dict, suite_info: dict): """ When we get to the level of entries on a TestRail plan, we need to make sure: * the entry exists or is created * a run matching the current config / platform exists or is created * the test cases we care about are on that run or are added * test results are batched by run and result type (passed, skipped, failed) """ # Suite and milestone info suite_id = suite_info.get("id") suite_description = suite_info.get("description") milestone_id = suite_info.get("milestone_id") # Config config = suite_info.get("config") config_id = suite_info.get("config_id") # Cases and results cases_in_suite = suite_info.get("cases") cases_in_suite = [int(n) for n in cases_in_suite] results = suite_info.get("results") plan_title = expected_plan.get("name") suite_entries = [ entry for entry in expected_plan.get("entries") if entry.get("suite_id") == suite_id ] # Add a missing entry to a plan plan_id = expected_plan.get("id") if not suite_entries: # If no entry, create entry for suite logging.info(f"Create entry in plan {plan_id} for suite {suite_id}") logging.info(f"cases: {cases_in_suite}") entry = testrail_session.create_new_plan_entry( plan_id=plan_id, suite_id=suite_id, name=suite_description, description="Automation-generated test plan entry", case_ids=cases_in_suite, ) expected_plan = testrail_session.matching_plan_in_milestone( TESTRAIL_FX_DESK_PRJ, milestone_id, plan_title ) suite_entries = [ entry for entry in expected_plan.get("entries") if entry.get("suite_id") == suite_id ] if len(suite_entries) != 1: logging.info("Suite entries are broken somehow") # There should only be one entry per suite per plan # Check that this entry has a run with the correct config # And if not, make that run entry = suite_entries[0] config_runs = [run for run in entry.get("runs") if run.get("config") == config] logging.info(f"config runs {config_runs}") if not config_runs: expected_plan = testrail_session.create_test_run_on_plan_entry( plan_id, entry.get("id"), [config_id], description=f"Auto test plan entry: {suite_description}", case_ids=cases_in_suite, ) suite_entries = [ entry for entry in expected_plan.get("entries") if entry.get("suite_id") == suite_id ] entry = suite_entries[0] logging.info(f"new entry: {entry}") config_runs = [run for run in entry.get("runs") if run.get("config") == config] run = testrail_session.get_run(config_runs[0].get("id")) # If the run is missing cases, add them run_cases = [ t.get("case_id") for t in testrail_session.get_test_results(run.get("id")) ] if run_cases: expected_case_ids = list(set(run_cases + cases_in_suite)) if len(expected_case_ids) > len(run_cases): testrail_session.update_run_in_entry( run.get("id"), case_ids=expected_case_ids, include_all=False ) run = testrail_session.get_run(config_runs[0].get("id")) run_cases = [ t.get("case_id") for t in testrail_session.get_test_results(run.get("id")) ] if run.get("is_completed"): logging.info(f"Run {run.get('id')} is already completed.") return {} run_id = run.get("id") # Gather the test results by category of result passkey = { "passed": ["passed", "xpassed", "warnings"], "failed": ["failed", "error"], "xfailed": ["xfailed"], "skipped": ["skipped", "deselected"], } test_results = { "project_id": TESTRAIL_FX_DESK_PRJ, "passed": {}, "failed": {}, "xfailed": {}, "skipped": {}, } for test_case, outcome in results.items(): logging.info(f"{test_case}: {outcome}") if outcome == "rerun": logging.info("Rerun result...skipping...") continue category = next(status for status in passkey if outcome in passkey.get(status)) if not test_results[category].get(run_id): test_results[category][run_id] = [] test_results[category][run_id].append( {"suite_id": suite_id, "test_case": test_case} ) return test_results def collect_changes(testrail_session: TestRail, report): """ Determine what structure needs to be built so that we can report TestRail results. * Construct config and plan name * Find the right milestone to report to * Find the right submilestone * Find the right plan to report to, or create it * Find the right config to attach to the run, or create it * Use organize_entries to create the rest of the structure and gather results * Use mark_results to update the test runs """ # Find milestone to attach to channel = os.environ.get("FX_CHANNEL") or "beta" channel = channel.title() if channel == "Release": raise ValueError("Release reporting currently not supported") metadata = None for test in report.get("tests"): if test.get("metadata"): metadata = test.get("metadata") break if not metadata: logging.error("No metadata collected. Exiting without report.") return False version_str = metadata.get("fx_version") plan_title = get_plan_title(version_str, channel) logging.info(plan_title) plan_match = PLAN_NAME_RE.match(plan_title) (_, major) = [plan_match[n] for n in range(1, 3)] config = metadata.get("machine_config") if "linux" in config.lower(): os_name = "Linux" for word in config.split(" "): if word.startswith("x"): arch = word release = subprocess.check_output(["lsb_release", "-d"]).decode() release = release.split("\t")[-1].strip() release = ".".join(release.split(".")[:-1]) config = f"{os_name} {release} {arch}" logging.warning(f"Reporting for config: {config}") if not config.strip(): raise ValueError("Config cannot be blank.") with open(".tmp_testrail_info", "w") as fh: fh.write(f"{plan_title}|{config}") major_milestone = testrail_session.matching_milestone( TESTRAIL_FX_DESK_PRJ, f"Firefox {major}" ) logging.info(f"{channel} {major}") channel_milestone = testrail_session.matching_submilestone( major_milestone, f"{channel} {major}" ) if (not channel_milestone) and channel == "Devedition": channel_milestone = testrail_session.matching_submilestone( major_milestone, f"Beta {major}" ) # Find plan to attach runs to, create if doesn't exist logging.info(f"Plan title: {plan_title}") milestone_id = channel_milestone.get("id") expected_plan = testrail_session.matching_plan_in_milestone( TESTRAIL_FX_DESK_PRJ, milestone_id, plan_title ) if expected_plan is None: logging.info(f"Create plan '{plan_title}' in milestone {milestone_id}") expected_plan = testrail_session.create_new_plan( TESTRAIL_FX_DESK_PRJ, plan_title, description="Automation-generated test plan", milestone_id=milestone_id, ) elif expected_plan.get("is_completed"): logging.info(f"Plan found ({expected_plan.get('id')}) but is completed.") return None # Find or add correct config for session config_matches = None tried = False while not config_matches: config_matches = testrail_session.matching_configs( TESTRAIL_FX_DESK_PRJ, CONFIG_GROUP_ID, config ) if tried: break if not config_matches: logging.info("Creating config...") testrail_session.add_config(CONFIG_GROUP_ID, config) tried = True if len(config_matches) == 1: config_id = config_matches[0].get("id") logging.info(f"config id: {config_id}") else: raise ValueError(f"Should only have one matching TR config: {config}") # Find or add suite-based runs on the plan # Store test results for later last_suite_id = None last_description = None results_by_suite = {} full_test_results = {} tests = [ test for test in report.get("tests") if "metadata" in test and "suite_id" in test.get("metadata") ] tests = sorted(tests, key=lambda item: item.get("metadata").get("suite_id")) # Iterate through the tests; when we finish a line of same-suite tests, gather them for test in tests: (suite_id_str, suite_description) = test.get("metadata").get("suite_id") try: suite_id = int(suite_id_str.replace("S", "")) except (ValueError, TypeError): logging.info("No suite number, not reporting...") continue test_case = test.get("metadata").get("test_case") try: int(test_case) except (ValueError, TypeError): logging.info("No test case number, not reporting...") continue outcome = test.get("outcome") # Tests reported as rerun are a problem -- we need to know pass/fail if outcome == "rerun": outcome = test.get("call").get("outcome") logging.info(f"TC: {test_case}: {outcome}") if not results_by_suite.get(suite_id): results_by_suite[suite_id] = {} results_by_suite[suite_id][test_case] = outcome if suite_id != last_suite_id: # When we get the last test_case in a suite, add entry, run, results if last_suite_id: logging.info("n-1 run") cases_in_suite = list(results_by_suite[last_suite_id].keys()) cases_in_suite = [int(n) for n in cases_in_suite] suite_info = { "id": last_suite_id, "description": last_description, "milestone_id": milestone_id, "config": config, "config_id": config_id, "cases": cases_in_suite, "results": results_by_suite[last_suite_id], } full_test_results = merge_results( full_test_results, organize_entries(testrail_session, expected_plan, suite_info), ) last_suite_id = suite_id last_description = suite_description # We do need to run this again because we will always have one last suite. cases_in_suite = list(results_by_suite[last_suite_id].keys()) suite_info = { "id": last_suite_id, "description": last_description, "milestone_id": milestone_id, "config": config, "config_id": config_id, "cases": cases_in_suite, "results": results_by_suite[last_suite_id], } logging.info(f"n run {last_suite_id}, {last_description}") full_test_results = merge_results( full_test_results, organize_entries(testrail_session, expected_plan, suite_info) ) return full_test_results def update_all_test_cases(testrail_session, field_to_update, field_content): """ Sets the field of every test case to the new content """ print(f"updating all test cases to have {field_content} in {field_to_update}") test_suites = [d for d in os.listdir("./tests/")] for test_suite in test_suites: for test in os.listdir(f"./tests/{test_suite}"): if test[0:4] != "test": continue # Find tests and parse test case number file_name = f"tests/{test_suite}/{test}" with open(file_name) as f: for line in f: if line.startswith("def test_case():"): break line = f.readline().strip() ind = line.find('"') + 1 test_case = line[ind:-1] if test_case in ("", "N/A"): continue # Update the test case print(f"updating {test_case}: {file_name}") testrail_session.update_case_field( test_case, field_to_update, field_content )