packages/python-packages/apiview-copilot/cli.py (321 lines of code) (raw):

import asyncio from collections import OrderedDict import json import os from pprint import pprint import sys import pathlib from src._search_manager import SearchManager from src._apiview_reviewer import ( DEFAULT_USE_RAG, ) from knack import CLI, ArgumentsContext, CLICommandsLoader from knack.commands import CommandGroup from knack.help_files import helps from typing import Optional, List helps[ "review" ] = """ type: group short-summary: Commands for creating APIView reviews. """ helps[ "eval" ] = """ type: group short-summary: Commands for APIView Copilot evaluations. """ helps[ "app" ] = """ type: group short-summary: Commands for the Flask app deployment. """ helps[ "search" ] = """ type: group short-summary: Commands for searching the knowledge base. """ def local_review( language: str, target: str, base: str = None, use_rag: bool = DEFAULT_USE_RAG, ): """ Generates a review using the locally installed code. """ from src._apiview_reviewer import ApiViewReview if base is None: filename = os.path.splitext(os.path.basename(target))[0] else: target_name = os.path.splitext(os.path.basename(target))[0] base_name = os.path.splitext(os.path.basename(base))[0] # find the common prefix common_prefix = os.path.commonprefix([target_name, base_name]) # strip the common prefix from both names target_name = target_name[len(common_prefix) :] base_name = base_name[len(common_prefix) :] filename = f"{common_prefix}_{base_name}_{target_name}" with open(target, "r", encoding="utf-8") as f: target_apiview = f.read() if base: with open(base, "r", encoding="utf-8") as f: base_apiview = f.read() else: base_apiview = None reviewer = ApiViewReview(target=target_apiview, base=base_apiview, language=language, use_rag=use_rag) review = reviewer.run() reviewer.close() output_path = os.path.join("scratch", "output", language) os.makedirs(output_path, exist_ok=True) output_file = os.path.join(output_path, f"{filename}.json") with open(output_file, "w", encoding="utf-8") as f: f.write(review.model_dump_json(indent=4)) print(f"Review written to {output_file}") print(f"Found {len(review.comments)} comments.") def create_test_case( language: str, test_case: str, apiview_path: str, expected_path: str, test_file: str, overwrite: bool = False, ): """ Creates or updates a test case for the APIView reviewer. """ with open(apiview_path, "r") as f: apiview_contents = f.read() with open(expected_path, "r") as f: expected_contents = json.loads(f.read()) guidelines_path = pathlib.Path(__file__).parent / "guidelines" / language guidelines = [] for file in guidelines_path.glob("*.json"): with open(file, "r") as f: guidelines.extend(json.loads(f.read())) context = "" for violation in expected_contents["comments"]: for rule_id in violation["rule_ids"]: for rule in guidelines: if rule["id"] == rule_id: if rule["text"] not in context: context += f"\n{rule['text']}" test_case = { "testcase": test_case, "query": apiview_contents.replace("\t", ""), "language": language, "context": context, "response": json.dumps(expected_contents), } if os.path.exists(test_file): if overwrite: with open(test_file, "r") as f: existing_test_cases = [json.loads(line) for line in f if line.strip()] for existing_test_case in existing_test_cases: if existing_test_case["testcase"] == test_case["testcase"]: existing_test_cases.remove(existing_test_case) break existing_test_cases.append(test_case) with open(test_file, "w") as f: for existing_test_case in existing_test_cases: f.write(json.dumps(existing_test_case) + "\n") else: with open(test_file, "a") as f: f.write("\n") json.dump(test_case, f) else: with open(test_file, "w") as f: json.dump(test_case, f) def deconstruct_test_case(language: str, test_case: str, test_file: str): """ Deconstructs a test case into its component APIView test and expected results file. """ test_cases = {} with open(test_file, "r") as f: for line in f: if line.strip(): parsed = json.loads(line) if "testcase" in parsed: test_cases[parsed["testcase"]] = parsed if test_case not in test_cases: raise ValueError(f"Test case '{test_case}' not found in the file.") apiview = test_cases[test_case].get("query", "") expected = test_cases[test_case].get("response", "") deconstructed_apiview = pathlib.Path(__file__).parent / "evals" / "tests" / language / f"{test_case}.txt" deconstructed_expected = pathlib.Path(__file__).parent / "evals" / "tests" / language / f"{test_case}.json" with open(deconstructed_apiview, "w") as f: f.write(apiview) with open(deconstructed_expected, "w") as f: # sort comments by line number expected = json.loads(expected) expected["comments"] = sorted(expected["comments"], key=lambda x: x["line_no"]) f.write(json.dumps(expected, indent=4)) print(f"Deconstructed test case '{test_case}' into {deconstructed_apiview} and {deconstructed_expected}.") def deploy_flask_app( app_name: Optional[str] = None, resource_group: Optional[str] = None, subscription_id: Optional[str] = None, ): """Command to deploy the Flask app.""" from scripts.deploy_app import deploy_app_to_azure deploy_app_to_azure(app_name, resource_group, subscription_id) def generate_review_from_app(language: str, target: str, base: Optional[str] = None): """Generates a review using the deployed Flask app.""" from scripts.remote_review import generate_remote_review # Read the file content with open(target, "r", encoding="utf-8") as f: target = f.read() if base: with open(base, "r", encoding="utf-8") as f: base = f.read() else: base = None response = asyncio.run(generate_remote_review(target, language)) # response is already a dict, no need to parse it if isinstance(response, dict): pprint(response, indent=2) else: # Handle error responses which are strings print(response) def search_examples(path: str, language: str): """Search the examples-index for a query.""" from scripts.search_examples import search_examples results = search_examples(path, language) print(json.dumps(results, indent=2, cls=CustomJSONEncoder)) def search_guidelines(language: str, text: Optional[str] = None, path: Optional[str] = None): """Search the guidelines-index for a query.""" from scripts.search_guidelines import search_guidelines if (path and text) or (not path and not text): raise ValueError("Provide one of `--path` or `--text`.") results = search_guidelines(path or text, language) print(json.dumps(results, indent=2, cls=CustomJSONEncoder)) def search_knowledge_base( language: str, text: Optional[str] = None, path: Optional[str] = None, index: List[str] = ["examples", "guidelines"], markdown: bool = False, ): """ Queries the Search indexes and returns the resulting Cosmos DB objects, resolving all links between objects. This result represents what the AI reviewer would receive as context in RAG mode. """ if (path and text) or (not path and not text): raise ValueError("Provide one of `--path` or `--text`.") search = SearchManager(language=language) query = text if path: with open(path, "r") as f: query = f.read() if "examples" in index: examples = search.search_examples(query=query) if "guidelines" in index: guidelines = search.search_guidelines(query=query) context = search.build_context(guidelines, examples) if markdown: md = context.to_markdown() print(md) else: print(json.dumps(context, indent=2, cls=CustomJSONEncoder)) SUPPORTED_LANGUAGES = [ "android", "clang", "cpp", "dotnet", "golang", "ios", "java", "python", "rust", "typescript", ] class CliCommandsLoader(CLICommandsLoader): def load_command_table(self, args): with CommandGroup(self, "review", "__main__#{}") as g: g.command("local", "local_review") g.command("remote", "generate_review_from_app") with CommandGroup(self, "eval", "__main__#{}") as g: g.command("create", "create_test_case") g.command("deconstruct", "deconstruct_test_case") with CommandGroup(self, "app", "__main__#{}") as g: g.command("deploy", "deploy_flask_app") with CommandGroup(self, "search", "__main__#{}") as g: g.command("examples", "search_examples") g.command("guidelines", "search_guidelines") g.command("kb", "search_knowledge_base") return OrderedDict(self.command_table) def load_arguments(self, command): with ArgumentsContext(self, "") as ac: ac.argument( "language", type=str, help="The language of the APIView file", options_list=("--language", "-l"), choices=SUPPORTED_LANGUAGES, ) with ArgumentsContext(self, "review") as ac: ac.argument("path", type=str, help="The path to the APIView file") ac.argument( "use_rag", action="store_true", help="Use RAG pattern to generate the review.", ) ac.argument( "target", type=str, help="The path to the APIView file to review.", options_list=("--target", "-t"), ) ac.argument( "base", type=str, help="The path to the base APIView file to compare against. If omitted, copilot will review the entire target APIView.", options_list=("--base", "-b"), ) with ArgumentsContext(self, "eval create") as ac: ac.argument("language", type=str, help="The language for the test case.") ac.argument("test_case", type=str, help="The name of the test case") ac.argument( "apiview_path", type=str, help="The full path to the txt file containing the APIview text", ) ac.argument( "expected_path", type=str, help="The full path to the expected JSON output from the AI reviewer.", ) ac.argument( "test_file", type=str, help="The full path to the JSONL test file. Can be an existing test file, or will create a new one.", ) ac.argument( "overwrite", action="store_true", help="Overwrite the test case if it already exists.", ) with ArgumentsContext(self, "eval deconstruct") as ac: ac.argument("language", type=str, help="The language for the test case.") ac.argument("test_case", type=str, help="The specific test case to deconstruct.") ac.argument("test_file", type=str, help="The full path to the JSONL test file.") with ArgumentsContext(self, "app deploy") as ac: ac.argument( "app_name", options_list=["--app-name"], help="The name of the Azure App Service. Env var: AZURE_APP_NAME", ) ac.argument( "resource_group", options_list=["--resource-group"], help="The Azure resource group containing the App Service. Env var: AZURE_RESOURCE_GROUP", ) ac.argument( "subscription_id", options_list=["--subscription-id"], help="The Azure subscription ID. Env var: AZURE_SUBSCRIPTION_ID", ) with ArgumentsContext(self, "search") as ac: ac.argument( "path", type=str, help="The path to the file containing query text or code.", options_list=["--path"], ) ac.argument( "text", type=str, help="The text query to search.", ) ac.argument( "index", type=str, nargs="+", help="The indexes to search. Can be one or more of: examples, guidelines.", options_list=["--index"], ) ac.argument( "markdown", help="Render output as markdown instead of JSON.", ) super(CliCommandsLoader, self).load_arguments(command) def run_cli(): cli = CLI(cli_name="apiviewcopilot", commands_loader_cls=CliCommandsLoader) exit_code = cli.invoke(sys.argv[1:]) sys.exit(exit_code) class CustomJSONEncoder(json.JSONEncoder): def default(self, obj): # If the object has a `to_dict` method, use it if hasattr(obj, "to_dict"): return obj.to_dict() # If the object has a `__dict__` attribute, use it elif hasattr(obj, "__dict__"): return obj.__dict__ # Otherwise, use the default serialization return super().default(obj) if __name__ == "__main__": run_cli()