run_tests/selection_simulator.py (56 lines of code) (raw):

from collections import deque def get_problem_scores(selected_dataset_samples) -> float: if not selected_dataset_samples: return 0.0 subtask_scores = { subtask['subtask']: 0 for subtask in selected_dataset_samples[0]['all_subtasks_results'] } for submission in selected_dataset_samples: for subtask_result in submission['all_subtasks_results']: subtask_scores[subtask_result['subtask']] = max(subtask_scores[subtask_result['subtask']], subtask_result['weighted_score']) return sum(subtask_scores.values()) def get_submission_cot_length(submission) -> int: if "metadata" in submission: if 'output_tokens' in submission['metadata']['usage']: return submission['metadata']['usage']['output_tokens'] return submission['metadata']['usage']['completion_tokens'] # no token info. use pure length if 'generation' in submission: return len(submission['generation']) # crap... return 0 def simulate_round_robin(all_submissions) -> float: if not all_submissions: return 0 subtasks = [x['subtask'] for x in all_submissions[0]['all_subtasks_results']] submissions_by_target_subtask = {subtask: [] for subtask in subtasks} for submission in all_submissions: # if it failed to compile, skip if not submission["code"] or submission['all_subtasks_results'][0]['status'] == 'CE': continue submissions_by_target_subtask[submission['target_subtask']].append(submission) for target_subtask in submissions_by_target_subtask: # we only have access to the first subtask (examples/public test) submissions_by_target_subtask[target_subtask] = deque( sorted(submissions_by_target_subtask[target_subtask], key=lambda x: (x['all_subtasks_results'][0]['score'], get_submission_cot_length(x)), reverse=True) ) exhausted_subtasks = set([subtask for subtask in submissions_by_target_subtask if len(submissions_by_target_subtask[subtask]) == 0]) solved_subtasks = set([subtasks[0]]) # we don't explicitly care about solving the examples # only up to 50 submissions selected_submissions = [] subtask_i = len(subtasks) - 1 while len(selected_submissions) < 50 and len(exhausted_subtasks.union(solved_subtasks)) < len(subtasks): subtask = subtasks[subtask_i] if subtask not in solved_subtasks and subtask not in exhausted_subtasks: sol = submissions_by_target_subtask[subtask].popleft() selected_submissions.append(sol) for subtask_to_check in range(len(sol['all_subtasks_results'])): if sol['all_subtasks_results'][subtask_to_check]['score'] == 1.0: solved_subtasks.add(subtask_to_check) if len(submissions_by_target_subtask[subtask]) == 0: exhausted_subtasks.add(subtask) subtask_i = (subtask_i - 1) % len(subtasks) remaining_submissions = deque(sorted( [submission for subtask_submissions in submissions_by_target_subtask.values() for submission in subtask_submissions], key=lambda x: (x['all_subtasks_results'][0]['score'], get_submission_cot_length(x), subtasks.index(x['target_subtask']) if x['target_subtask'] in subtasks else 0), reverse=True) ) while len(selected_submissions) < 50 and remaining_submissions: selected_submissions.append(remaining_submissions.popleft()) return selected_submissions