evaluations/genai_evaluation.py (213 lines of code) (raw):

#!/usr/bin/env python3 """ evaluation.py A script to evaluate questions either by sending them to a REST API or processing them locally using the Orchestrator class. Additionally, results are saved to an Excel spreadsheet. Usage: bash: export PYTHONPATH=./:$PYTHONPATH python evaluation.py --test-data path/to/test_data.jsonl Powershell: $env:PYTHONPATH = "./;$env:PYTHONPATH" python evaluation.py --test-data path/to/test_data.jsonl Environment Variables: - USE_REST_API: Set to "True" to use the REST API for processing questions. Otherwise, local execution is used. - ORCHESTRATOR_ENDPOINT: The API endpoint URI (required if USE_REST_API is "True"). - FUNCTION_KEY: The API access key (required if USE_REST_API is "True"). - AZURE_OPENAI_ENDPOINT: Azure OpenAI endpoint. - AZURE_OPENAI_API_VERSION: Azure OpenAI API version. - AZURE_OPENAI_API_KEY: Azure OpenAI API key. - AZURE_SUBSCRIPTION_ID: Azure subscription ID. - AZURE_RESOURCE_GROUP: Azure resource group. - AZUREAI_PROJECT_NAME: Azure AI project name. Requirements: - Python 3.x - requests library (`pip install requests`) - python-dotenv library (`pip install python-dotenv`) - promptflow library (`pip install promptflow`) - pandas library (`pip install pandas`) - openpyxl library (`pip install openpyxl`) Security Note: Ensure that your `.env` file is not committed to version control systems as it contains sensitive information. Add `.env` to your `.gitignore` file. """ import os import sys import json import requests import datetime import logging import logging.config from dotenv import load_dotenv import asyncio import argparse import pandas as pd # Import pandas import time # Import Orchestrator for local execution try: from orchestration import Orchestrator except ImportError: print("Error: Could not import Orchestrator from 'orchestration' module.") sys.exit(1) # Configure logging LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': False, # Allow existing loggers to propagate 'formatters': { 'standard': { 'format': '%(asctime)s - %(levelname)s - %(name)s - %(message)s', }, }, 'handlers': { 'file_handler': { 'class': 'logging.FileHandler', 'filename': 'evaluation.log', 'mode': 'a', 'formatter': 'standard', 'level': 'INFO', }, 'console_handler': { 'class': 'logging.StreamHandler', 'stream': sys.stdout, 'formatter': 'standard', 'level': 'ERROR', }, }, 'root': { 'handlers': ['file_handler', 'console_handler'], 'level': 'DEBUG', }, } logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger(__name__) # Use a module-specific logger def get_rest_api_config(): """ Load environment variables required for REST API configuration. Returns: tuple: (orchestrator_endpoint, function_key) """ orchestrator_endpoint = os.getenv('ORCHESTRATOR_ENDPOINT') function_key = os.getenv('FUNCTION_KEY') if not orchestrator_endpoint: logger.error("ORCHESTRATOR_ENDPOINT not found in environment variables.") sys.exit(1) if not function_key: logger.error("FUNCTION_KEY not found in environment variables.") sys.exit(1) return orchestrator_endpoint, function_key def send_question_to_rest_api(uri, x_functions_key, question, conversation_id): """ Send the question to the orchestrator API and return the response. Args: uri (str): The API endpoint URI. x_functions_key (str): The API access key. question (str): The question to send. conversation_id (str): The conversation ID. Returns: dict: The API response parsed as a JSON object. """ headers = { 'x-functions-key': x_functions_key, 'Content-Type': 'application/json' } body = { 'conversation_id': conversation_id, 'question': question } try: response = requests.post(uri, headers=headers, json=body) response.raise_for_status() # Raises HTTPError for bad responses try: response_data = response.json() if not isinstance(response_data, dict): logger.error("Response JSON is not a dictionary.") return {"error": "Invalid response format from orchestrator API."} return response_data except json.JSONDecodeError: logger.error("Response is not valid JSON.") return {"error": "Response is not valid JSON."} except requests.exceptions.RequestException as e: logger.exception(f"HTTP Request failed: {e}") return {"error": f"HTTP Request failed: {e}"} def send_question_to_python(question, conversation_id): """ Process the question using the Orchestrator locally. Args: question (str): The user's question. conversation_id (str): The conversation ID. Returns: dict: The response from the Orchestrator. """ client_principal = { 'id': '00000000-0000-0000-0000-000000000000', 'name': 'anonymous' } if question: try: orchestrator = Orchestrator(conversation_id, client_principal) result = asyncio.run(orchestrator.answer(question)) if not isinstance(result, dict): logger.error("Expected result to be a dictionary.") return {"error": "Invalid response format from orchestrator."} return result except Exception as e: logger.exception(f"An error occurred while orchestrating the question: {e}") return {"error": "An error occurred while processing your question."} else: logger.warning("No question provided to orchestrate.") return {"error": "No question provided."} def process_question(question, use_rest_api, orchestrator_endpoint, function_key, conversation_id): """ Process a single question either via REST API or locally. Args: question (str): The question to process. use_rest_api (bool): Flag to determine the method of processing. orchestrator_endpoint (str): The API endpoint URI. function_key (str): The API access key. conversation_id (str): The conversation ID. Returns: dict: The response from the chosen processing method. """ if use_rest_api: response_data = send_question_to_rest_api( orchestrator_endpoint, function_key, question, conversation_id) else: response_data = send_question_to_python(question, conversation_id) return response_data def prettify_jsonl_file(input_file): # Check if the input file exists if not os.path.isfile(input_file): print(f"Error: The file '{input_file}' does not exist.") return try: # Read the entire content from the input file first with open(input_file, 'r', encoding='utf-8') as infile: lines = infile.readlines() # Prettify the content and write back to the same file with open(input_file, 'w', encoding='utf-8') as outfile: for line in lines: # Load the JSON object from the line json_obj = json.loads(line.strip()) # Write the pretty-printed JSON back to the file json.dump(json_obj, outfile, indent=4, ensure_ascii=False) outfile.write('\n') # Add a newline after each JSON object print(f"Prettified JSONL content has been written to '{input_file}'") except Exception as e: print(f"Error occurred while processing the file: {e}") def parse_arguments(): """ Parse command-line arguments. Returns: Namespace: Parsed arguments. """ parser = argparse.ArgumentParser( description="Evaluate questions either by sending them to a REST API or processing them locally." ) parser.add_argument( "--test-data", type=str, required=True, help="Path to the test dataset file in JSONL format.", ) return parser.parse_args() def main(): """ Main function to execute the evaluation process. """ args = parse_arguments() data_file_to_use = args.test_data # Check if the specified data file exists if not os.path.exists(data_file_to_use): logger.error(f"The specified data file '{data_file_to_use}' does not exist.") sys.exit(1) print(f"Using data file: {data_file_to_use}") load_dotenv() current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") use_rest_api = os.getenv('USE_REST_API', "False").lower() == "true" if use_rest_api: orchestrator_endpoint, function_key = get_rest_api_config() print("Configured to use REST API for processing questions.") else: orchestrator_endpoint = None function_key = None print("Configured to use local execution for processing questions.") # Azure configuration (used if needed) azure_config = { "aoai_endpoint": os.environ.get('AZURE_OPENAI_ENDPOINT', ''), "aoai_api_version": os.environ.get('AZURE_OPENAI_API_VERSION', '2024-02-01'), "aoai_api_key": os.environ.get('AZURE_OPENAI_API_KEY', ''), "subscription_id": os.environ.get('AZURE_SUBSCRIPTION_ID', ''), "resource_group": os.environ.get('AZURE_RESOURCE_GROUP', ''), "project_name": os.environ.get('AZUREAI_PROJECT_NAME', '') } print("Azure configuration loaded.") # Ensure 'evaluations' directory exists os.makedirs('evaluations', exist_ok=True) print("Ensured 'evaluations' directory exists.") output_jsonl_file = f"evaluations/responses_{current_time}.jsonl" output_excel_file = f"evaluations/responses_{current_time}.xlsx" # Excel output file conversation_id = "" last_response_data = None # Initialize a list to collect all output data for Excel excel_data = [] # Process each question in the test dataset with open(data_file_to_use, 'r', encoding='utf-8') as f_in, open(output_jsonl_file, 'w', encoding='utf-8') as f_out: print("Opened data file and output JSONL file.") for line_number, line in enumerate(f_in, start=1): line = line.strip() if not line: continue try: data = json.loads(line) question = data.get('question', '') ground_truth = data.get('ground_truth', '') print(f"Processing question {line_number}: {question}") start_time = time.time() # Process the question response_data = process_question( question, use_rest_api, orchestrator_endpoint if use_rest_api else None, function_key if use_rest_api else None, conversation_id ) duration = time.time() - start_time # Prepare the output data output_data = { "Question": question, "Ground ground_truth": ground_truth, "Answer": response_data.get('answer', 'No answer provided.'), "Context": response_data.get('data_points', 'No data points provided.'), "Reasoning": response_data.get('reasoning', 'No reasoning provided.'), "Processing Time (seconds)": duration } f_out.write(json.dumps(output_data) + '\n') # Append to excel_data list excel_data.append(output_data) except Exception as e: logger.exception(f"Error processing line {line_number}: {e}") print("Finished processing all questions.") # Optionally prettify the JSONL file prettify_jsonl_file(output_jsonl_file) # Save results to Excel try: df = pd.DataFrame(excel_data) df.to_excel(output_excel_file, index=False) print(f"Results have been saved to Excel file: '{output_excel_file}'") except Exception as e: logger.exception(f"Failed to save results to Excel: {e}") print(f"Error: Could not save results to Excel file. {e}") if __name__ == '__main__': try: main() except Exception as e: logger.exception(f"An unhandled exception occurred: {e}") sys.exit(1)