packages/python-packages/apiview-copilot/evals/run.py (397 lines of code) (raw):

import os import json import pathlib import argparse from typing import Set, Tuple, Any import prompty import prompty.azure_beta import copy import sys import yaml # set before azure.ai.evaluation import to make PF output less noisy os.environ["PF_LOGGING_LEVEL"] = "CRITICAL" import dotenv from tabulate import tabulate from azure.ai.evaluation import evaluate, SimilarityEvaluator, GroundednessEvaluator dotenv.load_dotenv() NUM_RUNS: int = 3 # for best results, this should always be a different model from the one we are evaluating MODEL_JUDGE = "gpt-4.1" model_config: dict[str, str] = { "azure_endpoint": os.environ["AZURE_OPENAI_ENDPOINT"], "api_key": os.environ["AZURE_OPENAI_API_KEY"], "azure_deployment": MODEL_JUDGE, "api_version": "2025-03-01-preview", } class CustomAPIViewEvaluator: """Evaluator for comparing expected and actual APIView comments.""" def __init__(self): ... def _get_comment_matches(self, expected: dict[str, Any], actual: dict[str, Any]) -> Tuple[Set, Set, Set]: """Compare comments based on both line numbers and rule IDs.""" exact_matches = set() rule_matches_wrong_line = set() # Filter out summary comments filtered_actual_comments = [c for c in actual["comments"] if c.get("source") != "summary"] filtered_expected_comments = [c for c in expected["comments"] if c.get("source") != "summary"] # Create a copy to work with comments_left = copy.deepcopy(filtered_actual_comments) for expected_comment in filtered_expected_comments: e_line = expected_comment["line_no"] e_rules = frozenset(expected_comment["rule_ids"]) for actual_comment in comments_left: a_line = actual_comment["line_no"] a_rules = frozenset(actual_comment["rule_ids"]) rule_match = any(rule for rule in a_rules if rule in e_rules) if e_line == a_line and rule_match: exact_matches.add((e_line, tuple(sorted(e_rules)))) # Remove the matched actual comment to avoid double counting comments_left.remove(actual_comment) break if rule_match: if abs(e_line - a_line) <= 5: # If the line numbers are close, consider it a match rule_matches_wrong_line.add((tuple(sorted(e_rules)), e_line, a_line)) comments_left.remove(actual_comment) break return exact_matches, rule_matches_wrong_line, comments_left def _evaluate_generic_comments(self, query: str, language: str, generic_comments: list[dict[str, Any]]) -> None: """Evaluate generic comments. If they are invalid, they count as false positives and receive penalty.""" filter_path = pathlib.Path(__file__).parent.parent / "metadata" / language / "filter.yaml" with open(filter_path, "r") as f: filter_data = yaml.safe_load(f) exceptions = filter_data["exceptions"].strip().split("\n") exceptions = [e.split(". ", 1)[1] for e in exceptions] for comment in generic_comments: line_no = comment["line_no"] start_idx = max(0, line_no - 10) end_idx = min(len(query), line_no + 10) context = query[start_idx:end_idx] prompt_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "prompts", "eval_judge_prompt.prompty") ) response = prompty.execute( prompt_path, inputs={ "code": context, "comment": comment["comment"], "exceptions": exceptions, "language": language, }, ) comment["valid"] = "true" in response.lower() def __call__(self, *, response: str, query: str, language: str, output: str, **kwargs): expected = json.loads(response) actual = json.loads(output) # Filter out summary comments expected["comments"] = [c for c in expected["comments"] if c.get("source") != "summary"] actual["comments"] = [c for c in actual["comments"] if c.get("source") != "summary"] exact_matches, rule_matches_wrong_line, generic_comments = self._get_comment_matches(expected, actual) self._evaluate_generic_comments(query, language, generic_comments) expected_comments = len([c for c in expected["comments"] if c["rule_ids"]]) valid_generic_comments = len([c for c in generic_comments if c["valid"]]) review_eval = { "expected_comments": expected_comments, "comments_found": len(actual["comments"]), "true_positives": len(exact_matches), "valid_generic_comments": valid_generic_comments, "false_positives": len(actual["comments"]) - (len(exact_matches) + len(rule_matches_wrong_line)) - valid_generic_comments, "false_negatives": expected_comments - (len(exact_matches) + len(rule_matches_wrong_line)), "percent_coverage": ((len(exact_matches) / expected_comments * 100) if expected_comments else 0), "rule_matches_wrong_line": len(rule_matches_wrong_line), "wrong_line_details": list(rule_matches_wrong_line), } return review_eval class CustomSimilarityEvaluator: """Wraps the SimilarityEvaluator to make sure we only check similarity for comments with rule IDs.""" def __init__(self): self._similarity_eval = SimilarityEvaluator(model_config=model_config) def __call__(self, *, query: str, language: str, output: str, ground_truth: str, **kwargs): output = json.loads(output) ground_truth = json.loads(ground_truth) # Filter out summary comments output["comments"] = [c for c in output["comments"] if c.get("source") != "summary"] ground_truth["comments"] = [c for c in ground_truth["comments"] if c.get("source") != "summary"] actual = [c for c in output["comments"] if c["rule_ids"]] if not actual: return {"similarity": 0.0} similarity = self._similarity_eval( response=json.dumps(actual), query=query, ground_truth=json.dumps([c for c in ground_truth["comments"] if c["rule_ids"]]), ) return similarity class CustomGroundednessEvaluator: """Wraps the GroundednessEvaluator to make sure we only check groundedness for comments with rule IDs.""" def __init__(self): self._groundedness_eval = GroundednessEvaluator(model_config=model_config) def __call__(self, *, query: str, language: str, output: str, context: str, **kwargs): output = json.loads(output) # Filter out summary comments output["comments"] = [c for c in output["comments"] if c.get("source") != "summary"] actual = [c for c in output["comments"] if c["rule_ids"]] if not actual: return {"groundedness": 0.0, "groundedness_reason": "No comments found."} groundedness = self._groundedness_eval(response=json.dumps(actual), context=context) return groundedness def review_apiview(query: str, language: str): # Add project root to sys.path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src._apiview_reviewer import ( # pylint: disable=import-error,no-name-in-module ApiViewReview, ) reviewer = ApiViewReview(target=query, base=None, language=language) review = reviewer.run() reviewer.close() return {"response": review.model_dump_json()} def calculate_overall_score(row: dict[str, Any]) -> float: """Calculate weighted score based on various metrics.""" weights = { "exact_match_weight": 0.7, # Exact match (rule id and line number) "groundedness_weight": 0.2, # Staying grounded in guidelines "similarity_weight": 0.1, # Similarity between expected and actual "false_positive_penalty": 0.3, # Penalty for false positives "fuzzy_match_bonus": 0.2, # Bonus for fuzzy match (right rule, wrong line) } if row["outputs.custom_eval.expected_comments"] == 0: # tests with no violations are all or nothing # but still give credit if no violations found, but valid generic comments found if ( row["outputs.custom_eval.comments_found"] == 0 or row["outputs.custom_eval.comments_found"] == row["outputs.custom_eval.valid_generic_comments"] ): return 100.0 return 0.0 exact_match_score = row["outputs.custom_eval.true_positives"] / row["outputs.custom_eval.expected_comments"] remaining_comments = row["outputs.custom_eval.expected_comments"] - row["outputs.custom_eval.true_positives"] fuzzy_match_score = ( row["outputs.custom_eval.rule_matches_wrong_line"] / remaining_comments if remaining_comments > 0 else 0.0 ) false_positive_rate = ( row["outputs.custom_eval.false_positives"] / row["outputs.custom_eval.comments_found"] if row["outputs.custom_eval.comments_found"] > 0 else 0.0 ) groundedness_normalized = (row["outputs.groundedness.groundedness"] - 1) / 4 similarity_normalized = (row["outputs.similarity.similarity"] - 1) / 4 score = ( weights["exact_match_weight"] * exact_match_score + weights["groundedness_weight"] * groundedness_normalized + weights["similarity_weight"] * similarity_normalized + weights["fuzzy_match_bonus"] * fuzzy_match_score - weights["false_positive_penalty"] * false_positive_rate ) normalized_score = max(0, min(100, score * 100)) return round(normalized_score) def format_terminal_diff(new: float, old: float, format_str: str = ".1f", reverse: bool = False) -> str: """Format difference with ANSI colors for terminal output.""" diff = new - old if diff > 0: if reverse: return f" (\033[31m+{diff:{format_str}}\033[0m)" # Red return f" (\033[32m+{diff:{format_str}}\033[0m)" # Green elif diff < 0: if reverse: return f" (\033[32m{diff:{format_str}}\033[0m)" # Green return f" (\033[31m{diff:{format_str}}\033[0m)" # Red return f" ({diff:{format_str}})" def show_results(args: argparse.Namespace, all_results: dict[str, Any]) -> None: """Display results in a table format.""" for name, test_results in all_results.items(): baseline_results = {} baseline_path = pathlib.Path(__file__).parent / "results" / args.language / name[:-1] if baseline_path.exists(): with open(baseline_path, "r") as f: baseline_data = json.load(f) for result in baseline_data[:-1]: # Skip summary baseline_results[result["testcase"]] = result baseline_results["average_score"] = baseline_data[-1]["average_score"] output_table(baseline_results, test_results, name) def output_table(baseline_results: dict[str, Any], eval_results: list[dict[str, Any]], file_name: str) -> None: headers = [ "Test Case", "Score", "Violations found", "Exact matches (TP)", "Valid generic comments", "Fuzzy matches", "False positives (FP)", "Groundedness", "Similarity", ] terminal_rows = [] for result in eval_results[:-1]: # Skip summary object testcase = result["testcase"] score = result["overall_score"] exact = result["true_positives"] rule = result["rule_matches_wrong_line"] fp = result["false_positives"] ground = result["groundedness"] sim = result["similarity"] valid_generic = result["valid_generic_comments"] comments_found = f"{result['comments_found']} / {result['expected_comments']}" terminal_row = [testcase] if testcase in baseline_results: base = baseline_results[testcase] terminal_row.extend( [ f"{score:.1f}{format_terminal_diff(score, base['overall_score'])}", comments_found, f"{exact}{format_terminal_diff(exact, base['true_positives'], 'd')}", f"{valid_generic}{format_terminal_diff(valid_generic, base['valid_generic_comments'], 'd')}", f"{rule}{format_terminal_diff(rule, base['rule_matches_wrong_line'], 'd')}", f"{fp}{format_terminal_diff(fp, base['false_positives'], 'd', reverse=True)}", f"{ground:.1f}{format_terminal_diff(ground, base['groundedness'])}", f"{sim:.1f}{format_terminal_diff(sim, base['similarity'])}", ] ) else: values = [ f"{score:.1f}", comments_found, f"{exact}", f"{valid_generic}", str(rule), str(fp), f"{ground:.1f}", f"{sim:.1f}", ] terminal_row.extend(values) terminal_rows.append(terminal_row) print("====================================================") print(f"\n\n✨ {file_name} results:\n") print(tabulate(terminal_rows, headers, tablefmt="simple")) if baseline_results: print( f"\n{file_name} average score: {eval_results[-1]['average_score']} {format_terminal_diff(eval_results[-1]['average_score'], baseline_results['average_score'])}\n\n" ) def calculate_coverage(args: argparse.Namespace, rule_ids: set[str]) -> None: """Calculate and output the coverage of tests based on the rule IDs.""" if args.test_file == "all": # only update coverage if all tests are run output_path = pathlib.Path(__file__).parent / "results" / args.language / "coverage.json" guidelines_path = pathlib.Path(__file__).parent.parent / "guidelines" / args.language guidelines = [] for file in guidelines_path.glob("*.json"): with open(file, "r") as f: guidelines.extend(json.loads(f.read())) guideline_rule_ids = [rule["id"] for rule in guidelines] difference = set(guideline_rule_ids).difference(rule_ids) with open(str(output_path), "w+") as f: f.write( json.dumps( { "tested": list(rule_ids), "not_tested": list(difference), "coverage": len(rule_ids) / len(guideline_rule_ids) * 100, }, indent=4, ) ) print(f"\nTest coverage for {args.language}: {len(rule_ids) / len(guideline_rule_ids) * 100:.2f}%") def establish_baseline(args: argparse.Namespace, all_results: dict[str, Any]) -> None: """Establish the current results as the new baseline.""" establish_baseline = input("\nDo you want to establish this as the new baseline? (y/n): ") if establish_baseline.lower() == "y": for name, result in all_results.items(): output_path = pathlib.Path(__file__).parent / "results" / args.language / name[:-1] with open(str(output_path), "w") as f: json.dump(result, indent=4, fp=f) # whether or not we establish a baseline, we want to write results to a temp dir log_path = pathlib.Path(__file__).parent / "results" / args.language / ".log" if not log_path.exists(): log_path.mkdir(parents=True, exist_ok=True) for name, result in all_results.items(): output_path = log_path / name[:-1] with open(str(output_path), "w") as f: json.dump(result, indent=4, fp=f) def record_run_result(result: dict[str, Any], rule_ids: Set[str]) -> list[dict[str, Any]]: run_result = [] total_score = 0 for row in result["rows"]: score = calculate_overall_score(row) total_score += score rules = [rule["rule_ids"] for rule in json.loads(row["inputs.response"])["comments"]] rule_ids.update(*rules) run_result.append( { "testcase": row["inputs.testcase"], "expected": json.loads(row["inputs.response"]), "actual": json.loads(row["outputs.response"]), "expected_comments": row["outputs.custom_eval.expected_comments"], "comments_found": row["outputs.custom_eval.comments_found"], "valid_generic_comments": row["outputs.custom_eval.valid_generic_comments"], "true_positives": row["outputs.custom_eval.true_positives"], "false_positives": row["outputs.custom_eval.false_positives"], "false_negatives": row["outputs.custom_eval.false_negatives"], "percent_coverage": row["outputs.custom_eval.percent_coverage"], "rule_matches_wrong_line": row["outputs.custom_eval.rule_matches_wrong_line"], "wrong_line_details": row["outputs.custom_eval.wrong_line_details"], "similarity": row["outputs.similarity.similarity"], "groundedness": row["outputs.groundedness.groundedness"], "groundedness_reason": row["outputs.groundedness.groundedness_reason"], "overall_score": score, } ) average_score = total_score / len(result["rows"]) run_result.append({"average_score": average_score, "total_evals": len(result["rows"])}) return run_result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run evals for APIview copilot.") parser.add_argument( "--language", "-l", type=str, default="python", help="The language to run evals for. Defaults to python.", ) parser.add_argument( "--num-runs", "-n", type=int, default=NUM_RUNS, help="The number of runs to perform, with the median of results kept. Defaults to 3.", ) parser.add_argument( "--test-file", "-t", type=str, default="all", help="Only run a particular jsonl test file, takes the name or path to the file. Defaults to all.", ) args = parser.parse_args() custom_eval = CustomAPIViewEvaluator() groundedness = CustomGroundednessEvaluator() similarity_eval = CustomSimilarityEvaluator() rule_ids = set() tests_directory = pathlib.Path(__file__).parent / "tests" / args.language args.test_file = pathlib.Path(args.test_file).name all_results = {} for file in tests_directory.glob("*.jsonl"): if args.test_file != "all" and file.name != args.test_file: continue run_results = [] for run in range(args.num_runs): print(f"Running evals {run + 1}/{args.num_runs} for {file.name}...") result = evaluate( data=str(file), evaluators={ "custom_eval": custom_eval, "similarity": similarity_eval, "groundedness": groundedness, }, evaluator_config={ "similarity": { "column_mapping": { "output": "${target.response}", "query": "${data.query}", "language": "${data.language}", "ground_truth": "${data.response}", }, }, "groundedness": { "column_mapping": { "output": "${target.response}", "query": "${data.query}", "language": "${data.language}", "context": "${data.context}", }, }, "custom_eval": { "column_mapping": { "response": "${data.response}", "query": "${data.query}", "language": "${data.language}", "output": "${target.response}", }, }, }, target=review_apiview, # TODO we can send data to our foundry project for history / more graphical insights # azure_ai_project={ # "subscription_id": os.environ["AZURE_SUBSCRIPTION_ID"], # "resource_group_name": os.environ["AZURE_FOUNDRY_RESOURCE_GROUP"], # "project_name": os.environ["AZURE_FOUNDRY_PROJECT_NAME"], # } ) run_result = record_run_result(result, rule_ids) print(f"Average score for {file.name} run {run + 1}/{args.num_runs}: {run_result[-1]['average_score']:.2f}") run_results.append(run_result) # take the median run based on the average score median_result = sorted(run_results, key=lambda x: x[-1]["average_score"])[len(run_results) // 2] all_results[file.name] = median_result if not all_results: raise ValueError(f"No tests found for arguments: {args}") show_results(args, all_results) establish_baseline(args, all_results) calculate_coverage(args, rule_ids)