evaluation_pipeline/evaluation.py (233 lines of code) (raw):
import pandas as pd
import numpy as np
import os
import sys
import pandas as pd
from typing import List
import ast
import argparse
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from textwrap import wrap
import math
import wandb
import re
import json
from dotenv import load_dotenv
from sklearn.metrics import ndcg_score
# Add the parent directory of `evaluation_pipeline` to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname('evaluation.py'), "..")))
from src.constants import EMBEDDING_MODELS_DICT
from src.feature_extractor import FeatureExtractor
from src.llm_judge import llm_as_judge
def convert_to_list(value):
if isinstance(value, str):
value = value.replace('np.int64(', '').replace(')', '')
value = value.replace('np.float64(', '').replace(')', '')
return ast.literal_eval(value)
return value
def calc_precision_at_k(relevant_docs, retrieved_docs, k):
"""
Compute Precision@K.
Parameters:
relevant_docs (set): Set of relevant document IDs.
retrieved_docs (list): List of retrieved document IDs in ranked order.
k (int): Number of top results to consider.
Returns:
float: Precision@K score.
"""
retrieved_at_k = retrieved_docs[:k]
intersection = set(retrieved_at_k) & set(relevant_docs)
return len(intersection) / float(k)
def calc_recall_at_k(relevant_docs, retrieved_docs, k):
"""
Compute Recall@K.
Parameters:
relevant_docs (set): Set of relevant document IDs.
retrieved_docs (list): List of retrieved document IDs in ranked order.
k (int): Number of top results to consider.
Returns:
float: Recall@K score.
"""
retrieved_at_k = retrieved_docs[:k]
intersection = set(retrieved_at_k) & set(relevant_docs)
return len(intersection) / len(relevant_docs)
def calc_ndcg(relevant_docs, retrieved_docs, k, score_type='rank', retrieved_distances=None):
if len(retrieved_docs) <2:
return 0.00
y_true = np.array([[1 if doc in relevant_docs else 0 for doc in retrieved_docs]])
if score_type == 'rank':
y_scores = np.array([[1 / (rank + 1) for rank in range(len(retrieved_docs))]])
elif score_type == 'distance':
y_scores = np.array([[1 - dist for dist in retrieved_distances]])
ndcg = ndcg_score(y_true, y_scores, k=k)
return ndcg
def calc_reciprocal_rank(relevant_docs, retrieved_docs):
"""
Compute Reciprocal Rank (RR).
Parameters:
relevant_docs (set): Set of relevant document IDs.
retrieved_docs (list): List of retrieved document IDs in ranked order.
Returns:
float: MRR score.
Can be used to compute mean reciprocal rank for number of queries Q
"""
for rank, doc in enumerate(retrieved_docs, start=1):
if doc in relevant_docs:
return 1 / rank
return 0.0
def calc_average_precision(relevant_docs, retrieved_docs, k):
"""
Compute Average Precision (AP).
Parameters:
relevant_docs (list): List of relevant document IDs.
retrieved_docs (list): List of retrieved document IDs in ranked order.
k: average precision from 1 to k
Returns:
float: AP score.
Can be used to calculate mean average precision for number of queries Q
"""
if len(retrieved_docs) < 2:
return 0.0
total_precision = 0.0
for i in range(1, len(retrieved_docs)+1):
precision = calc_precision_at_k(relevant_docs, retrieved_docs, k=i)
total_precision += precision
if k > len(retrieved_docs):
print(f"k is higher than retrieval {len(retrieved_docs)}")
elif k < len(retrieved_docs):
print(f"k is lower than retrieval {len(retrieved_docs)}")
average_precision = total_precision / k
return average_precision
def format_judge_response(answer):
try:
formatted_answer = json.loads(answer[0]['generated_text'])
except:
try:
print("cleaning json")
formatted_answer = json.loads(answer[0]['generated_text'].replace("\n",""))
except:
try:
print("trying another way")
formatted_answer = json.loads(re.sub(r'```json|```', '', answer[0]['generated_text'].replace("\n","")).strip())
except:
formatted_answer = answer[0]['generated_text']
return formatted_answer
def run_llm_judge(judge, query_id, query, retrieved_texts, k):
row = {'query_id': query_id}
row['query'] = query
# call llm
decisions = []
for retrieved_text in retrieved_texts:
try:
llm_judge_response = judge.evaluation_prompt(query, retrieved_text)
# clean up JSON format from LLM
response = format_judge_response(llm_judge_response)
decisions.append(response['binary_decision'])
except:
decisions.append(0)
# store results
row['decisions'] = decisions
row[f'on_topic_number@{k}'] = sum(decisions)
row[f'on_topic_rate@{k}'] = sum(decisions) / float(k)
return row
def run_traditional_eval(query_id, query, relevant_docs, retrieved_docs, retrieved_distances, k):
row = {'query_id': query_id}
row['query'] = query
# calcuate traditional IR metrics
precision = calc_precision_at_k(relevant_docs, retrieved_docs, k)
recall = calc_recall_at_k(relevant_docs, retrieved_docs, k)
ndcg = calc_ndcg(relevant_docs, retrieved_docs,score_type='rank',retrieved_distances=retrieved_distances, k=k)
reciprocal_rank = calc_reciprocal_rank(relevant_docs, retrieved_docs)
average_precision = calc_average_precision(relevant_docs, retrieved_docs, k=k)
# store in row
row['retrieved_ids'] = retrieved_docs
row['relevant_docs'] = relevant_docs
row[f'precision@{k}'] = precision
row[f'recall@{k}'] = recall
row[f'ndcg@{k}'] = ndcg
row['reciprocal_rank'] = reciprocal_rank
row['average_precision'] = average_precision
return row
def get_combined_texts_uniform_k(df, k):
# Identify retrieval columns and sort them numerically
retrieval_cols = sorted(
[col for col in df.columns if 'retrieval_' in col and '_combined_text' in col],
key=lambda x: int(x.split('_')[1])
)
# Extract relevant retrieval columns as a NumPy array
retrieval_matrix = df[retrieval_cols].to_numpy()
# Slice the matrix up to `k` columns for all rows
sliced_matrix = retrieval_matrix[:, :k]
# Convert to a list of lists
result = sliced_matrix.tolist()
return result
def load_retrieved(file_path):
df = pd.read_csv(file_path,
converters={
'relevant_docs': convert_to_list,
'retrieved_ids': convert_to_list,
'combined_text': convert_to_list,
'retrieved_distances': convert_to_list,
})
k = df['k'].max()
model_name = df['model_name'].unique()[0]
model_name_normalized = model_name.replace("/","_").replace("-","_").replace(".","_")
df['combined_text'] = get_combined_texts_uniform_k(df, k)
return df, k, model_name_normalized
def vectorized_evaluation(row, k):
return run_traditional_eval(
query_id=row['query_id'],
query = row['query'],
relevant_docs=row['relevant_docs'],
retrieved_docs=row['retrieved_ids'],
retrieved_distances = row['retrieved_distances'],
k=k
)
def vectorized_llm_evaluation(row, k, judge):
return run_llm_judge(
judge=judge,
query_id=row['query_id'],
query = row['query'],
retrieved_texts=row['combined_text'],
k=k
)
def run_vectorized_traditional_eval(df, k):
# Apply the function row-wise, passing k as a constant
df['evaluation'] = df.apply(lambda row: vectorized_evaluation(row, k), axis=1)
# Return the evaluations as a DataFrame
return pd.DataFrame(df['evaluation'].tolist())
def run_vectorized_llm_eval(df, k, judge):
# Apply the function row-wise, passing k as a constant
df['llm_evaluation'] = df.apply(lambda row: vectorized_llm_evaluation(row, k, judge), axis=1)
# Return the evaluations as a DataFrame
return pd.DataFrame(df['llm_evaluation'].tolist())
def wandb_logging(df, k):
load_dotenv()
api_key = os.getenv("WANDB_API_KEY")
project = os.getenv("WANDB_PROJECT")
entity = os.getenv("WANDB_ENTITY")
wandb.login(key=api_key) # Automatically uses the API key
wandb.init(
# set the wandb project where this run will be logged
project=project,
# track hyperparameters and run metadata
config={"model_name": df['model_name'].unique(),
"k": k,
"notes": ""
}
)
def eval_pipeline(run_llm_judge: bool, file_path, log_to_wandb, specific_k=None):
retrieval_df, k, model_name = load_retrieved(file_path=file_path)
# give option to override k
if specific_k is None:
pass
else:
k = specific_k
if log_to_wandb:
wandb_logging(retrieval_df, k)
results_df = run_vectorized_traditional_eval(retrieval_df, k)
dfs_to_return = []
dfs_to_return.append(results_df)
df_labels = []
df_labels.append("traditional_eval")
if run_llm_judge:
judge = llm_as_judge()
llm_results_df = run_vectorized_llm_eval(retrieval_df, k, judge)
dfs_to_return.append(llm_results_df)
df_labels.append("llm_eval")
for df, label in zip(dfs_to_return, df_labels):
df['model_name'] = model_name
summary_df = df.describe().reset_index()
summary_df = summary_df.loc[:, summary_df.columns != 'query_id'] if 'query_id' in summary_df.columns else summary_df
summary_table = wandb.Table(dataframe=summary_df)
averages = summary_df[summary_df['index'] == 'mean']
if log_to_wandb:
wandb.log({label: summary_table})
wandb.log({"Averages": averages})
wandb.finish()
summary_df.to_csv(f"evaluation_results/{model_name}_{label}_aggregate_metrics.csv")
df.to_csv(f"evaluation_results/{model_name}_{label}_detailed_metrics.csv")
def visualize_embeddings(model_name, queries):
fe = FeatureExtractor(EMBEDDING_MODELS_DICT, model_name=model_name)
model_name_normalized = model_name.replace("/","_").replace("-","_").replace(".","_")
embeddings = []
for query in queries:
query_embeddings = fe.get_embeddings([query])[0]
embeddings.append(query_embeddings)
tsne = TSNE(n_components=2, random_state=42, perplexity = 25)
reduced_embeddings = tsne.fit_transform(np.array(embeddings))
# Wrap text to a maximum width
wrapped_labels = [ "\n".join(wrap(label, width=25)) for label in queries ] # 15 characters per line
# Plot with wrapped labels
fig, ax = plt.subplots(figsize=(10, 7))
plt.scatter([x[0] for x in reduced_embeddings], [x[1] for x in reduced_embeddings], c='orange', alpha=0.6)
for i, txt in enumerate(wrapped_labels):
ax.annotate(txt, (reduced_embeddings[i, 0], reduced_embeddings[i, 1]),fontsize=7,)
ax.set_xlabel('Dimension 1')
ax.set_ylabel('Dimension 2')
ax.set_title(f't-SNE Visualization of Embeddings for {model_name}')
try:
wandb.log({"chart": wandb.Image(fig)})
except:
pass
plt.savefig(f'figs/t_sne_embeddings_{model_name_normalized}.png', format='png', dpi=300, bbox_inches='tight')
plt.close()
def main(use_llm_judge, file_path, log_to_wandb):
eval_pipeline(run_llm_judge = use_llm_judge, file_path=file_path, log_to_wandb=log_to_wandb)
if __name__ == "__main__":
# Create the argument parser
parser = argparse.ArgumentParser(description="Run the evaluation pipeline with specified parameters.")
parser.add_argument("-f", type=str, help="Path to the file to be processed")
parser.add_argument("-log_to_wandb", type=str, default=False, help="Whether to log to Weights & Biases")
parser.add_argument(
"-llm",
type=lambda x: str(x).lower() in ['true', '1', 'yes'],
default=False,
help="Whether or not to use LLM judge (default: False)"
)
# Parse the command-line arguments
args = parser.parse_args()
main(use_llm_judge=args.llm, file_path=args.f, log_to_wandb=args.log_to_wandb)