in src/open_r1/rewards.py [0:0]
def ioi_code_reward(completions, test_batch_size: int = 1, provider_type: str = "piston", **kwargs) -> list[float]:
"""Reward function that evaluates IOI problems using a specified execution client.
Assumes the dataset has the same format as hf.co/datasets/open-r1/ioi
Args:
completions: List of model completions to evaluate
test_batch_size: Evaluate these many test cases in parallel, then check if any of them failed (0 score):
if so stop evaluating; otherwise continue with the next batch of test cases.
provider_type: The execution provider to use (default: "piston"). Supported values: "piston", "morph"
**kwargs: Additional arguments passed from the dataset
"""
# Get the appropriate client based on provider_type
if provider_type == "morph":
execution_client = get_morph_client_from_env()
else:
# for info on setting up piston workers, see slurm/piston/README.md
execution_client = get_piston_client_from_env()
code_snippets = [
# note: grading is automatically skipped if no code is extracted
add_includes(extract_code(completion[-1]["content"], "cpp"), problem_id)
for completion, problem_id in zip(completions, kwargs["id"])
]
async def run_catch_exceptions(task):
try:
return await task
except Exception as e:
print(f"Error from {provider_type} worker: {e}")
return SubtaskResult()
problems_data = [dict(zip(kwargs.keys(), values)) for values in zip(*kwargs.values())]
loop = _init_event_loop()
evals = [
loop.create_task(
run_catch_exceptions(
score_subtask(
execution_client,
problem_data,
code,
test_batch_size=test_batch_size,
)
)
)
for problem_data, code in zip(problems_data, code_snippets)
]
results = loop.run_until_complete(asyncio.gather(*evals))
return [result.score for result in results]