Skip to content
8 changes: 4 additions & 4 deletions eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async def _execute_with_semaphore(idx):
messages.append(Message.model_validate(msg_dict))

evaluation_rows[idx].messages = messages
evaluation_rows[idx].input_metadata.row_id = envs.dataset_rows[idx].id
evaluation_rows[idx].input_metadata.dataset_info = asdict(envs.dataset_rows[idx])
# evaluation_rows[idx].input_metadata.row_id = envs.dataset_rows[idx].id
# evaluation_rows[idx].input_metadata.dataset_info = asdict(envs.dataset_rows[idx])
evaluation_rows[idx].tools = shared_tool_schema
evaluation_rows[idx].usage = CompletionUsage(**trajectory.usage)
evaluation_rows[idx].input_metadata.completion_params = CompletionParams(
Expand Down Expand Up @@ -482,11 +482,11 @@ async def _execute_rollout(
trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"})
try:
await envs.connection_manager.reset_session(session)
except:
except: # noqa: E722
logger.error(f"Error resetting session {session.session_id}")
try:
await envs.connection_manager.close_session(session)
except:
except: # noqa: E722
logger.error(f"Error closing session {session.session_id}")
return trajectory

Expand Down
19 changes: 18 additions & 1 deletion eval_protocol/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ class InputMetadata(BaseModel):
)


class EvaluationThreshold(BaseModel):
"""Threshold configuration for evaluation tests.

The success field is required - tests must specify a minimum success rate.
The standard_deviation field is optional - if provided, tests must also meet the maximum standard deviation requirement.
"""

success: float = Field(
..., description="Minimum success rate threshold (fraction of total score, 0.0 to 1.0)", ge=0.0, le=1.0
)
standard_deviation: Optional[float] = Field(
None, description="Maximum standard deviation threshold (fraction of total score, 0.0 to 1.0)", ge=0.0, le=1.0
)


class EvalMetadata(BaseModel):
"""Metadata about the evaluation that was run."""

Expand All @@ -216,7 +231,9 @@ class EvalMetadata(BaseModel):
)
num_runs: int = Field(..., description="Number of times the evaluation was repeated")
aggregation_method: str = Field(..., description="Method used to aggregate scores across runs")
threshold_of_success: Optional[float] = Field(None, description="Threshold score for test success")
passed_threshold: Optional[EvaluationThreshold] = Field(
None, description="Threshold configuration for test success"
)
passed: Optional[bool] = Field(None, description="Whether the evaluation passed based on the threshold")


Expand Down
82 changes: 57 additions & 25 deletions eval_protocol/pytest/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@
import math
import os
import statistics
from typing import Any, Callable, Dict, List, Literal, Optional
from typing import Any, Callable, Dict, List, Literal, Optional, Union

import pytest

from eval_protocol.dataset_logger import default_logger
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
from eval_protocol.human_id import generate_id
from eval_protocol.models import CompletionParams, EvalMetadata, EvaluationRow, InputMetadata, Message
from eval_protocol.models import (
CompletionParams,
EvalMetadata,
EvaluationRow,
EvaluationThreshold,
InputMetadata,
Message,
)
from eval_protocol.pytest.default_dataset_adapter import default_dataset_adapter
from eval_protocol.pytest.default_no_op_rollout_process import default_no_op_rollout_processor
from eval_protocol.pytest.types import (
Expand Down Expand Up @@ -47,7 +54,7 @@ def evaluation_test( # noqa: C901
rollout_processor: RolloutProcessor = default_no_op_rollout_processor,
evaluation_test_kwargs: Optional[List[EvaluationInputParam]] = None,
aggregation_method: AggregationMethod = "mean",
threshold_of_success: Optional[float] = None,
passed_threshold: Optional[Union[EvaluationThreshold, float]] = None,
num_runs: int = 1,
max_dataset_rows: Optional[int] = None,
mcp_config_path: Optional[str] = None,
Expand Down Expand Up @@ -108,8 +115,8 @@ def evaluation_test( # noqa: C901
rollout_processor: Function used to perform the rollout.
evaluation_test_kwargs: Kwargs for the evaluation function.
aggregation_method: How to aggregate scores across rows.
threshold_of_success: If set, fail the test if the aggregated score is
below this threshold.
passed_threshold: Threshold configuration for test success.
Success rate must be above success, and if set, standard deviation must be below standard_deviation.
num_runs: Number of times to repeat the rollout and evaluations.
max_dataset_rows: Limit dataset to the first N rows.
mcp_config_path: Path to MCP config file that follows MCPMultiClientConfiguration schema
Expand All @@ -127,6 +134,14 @@ def evaluation_test( # noqa: C901
def decorator(
test_func: TestFunction,
):
if passed_threshold is not None:
if isinstance(passed_threshold, float):
threshold = EvaluationThreshold(success=passed_threshold)
else:
threshold = EvaluationThreshold(**passed_threshold)
else:
threshold = None

sig = inspect.signature(test_func)

# For pointwise/rowwise mode, we expect a different signature
Expand Down Expand Up @@ -285,7 +300,7 @@ def create_wrapper_with_signature() -> Callable:
def wrapper_body(**kwargs):
model_name = kwargs["model"]
eval_metadata = None
all_results: List[EvaluationRow] = []
all_results: List[List[EvaluationRow]] = [[] for _ in range(num_runs)]

cohort_id = generate_id()

Expand Down Expand Up @@ -346,7 +361,7 @@ def _log_eval_error(
status="running",
num_runs=num_runs,
aggregation_method=aggregation_method,
threshold_of_success=threshold_of_success,
passed_threshold=threshold,
passed=None,
)

Expand Down Expand Up @@ -386,11 +401,11 @@ def _log_eval_error(
logger=active_logger,
)

for _ in range(num_runs):
for i in range(num_runs):
# Regenerate outputs each run by deep-copying the pristine dataset
# so model responses are not reused across runs.
run_id = generate_id()
fresh_dataset = [copy.deepcopy(r) for r in data]
fresh_dataset = [r.model_copy(deep=True) for r in data]

# apply new run_id to fresh_dataset
for row in fresh_dataset:
Expand Down Expand Up @@ -418,7 +433,7 @@ def _log_eval_error(
raise ValueError(
f"Test function {test_func.__name__} did not return an EvaluationRow instance. You must return an EvaluationRow instance from your test function decorated with @evaluation_test."
)
all_results.append(result)
all_results[i].append(result)
else:
# Batch mode: call the test function with the full dataset
results = execute_with_params(
Expand All @@ -442,17 +457,21 @@ def _log_eval_error(
raise ValueError(
f"Test function {test_func.__name__} returned a list containing non-EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test."
)
all_results.extend(results)
all_results[i] = results

scores = [r.evaluation_result.score for r in all_results if r.evaluation_result]
scores = [
sum([r.evaluation_result.score for r in result if r.evaluation_result]) / len(result)
for result in all_results
]
agg_score = aggregate(scores, aggregation_method)
score_std = statistics.stdev(scores) if len(scores) > 1 else 0.0

# Compute 95% confidence interval for the fixed-set mean μ (by-question, using repeats)
ci_low: float | None = None
ci_high: float | None = None
if aggregation_method == "mean":
try:
result_ci = compute_fixed_set_mu_ci(all_results)
result_ci = compute_fixed_set_mu_ci([item for sublist in all_results for item in sublist])
mu_ci_low, mu_ci_high = result_ci[1], result_ci[2]
if mu_ci_low is not None and mu_ci_high is not None:
ci_low = float(mu_ci_low)
Expand All @@ -464,23 +483,32 @@ def _log_eval_error(

# Determine if the evaluation passed based on threshold
passed = None
if threshold_of_success is not None:
passed = agg_score >= threshold_of_success

if threshold is not None:
success_passed, std_passed = True, True

success_passed = agg_score >= threshold.success

if threshold.standard_deviation is not None:
std_passed = score_std <= threshold.standard_deviation

passed = success_passed and std_passed

# Update eval metadata status and passed field for all results
for r in all_results:
if r.eval_metadata is not None:
r.eval_metadata.status = "finished"
r.eval_metadata.passed = passed
active_logger.log(r)
for result in all_results:
for r in result:
if r.eval_metadata is not None:
r.eval_metadata.status = "finished"
r.eval_metadata.passed = passed
default_logger.log(r)

# Optional: print and/or persist a summary artifact for CI
try:
should_print = os.getenv("EP_PRINT_SUMMARY") == "1"
summary_path = os.getenv("EP_SUMMARY_JSON")
suite_name = test_func.__name__
model_used = model_name
total_rows = len(all_results)
total_rows = len([item for sublist in all_results for item in sublist])
summary_obj = {
"suite": suite_name,
"model": model_used,
Expand All @@ -497,7 +525,7 @@ def _log_eval_error(
from collections import defaultdict

metric_scores: Dict[str, list] = defaultdict(list)
for r in all_results:
for r in [item for sublist in all_results for item in sublist]:
if r.evaluation_result and r.evaluation_result.metrics:
for m_name, m_res in r.evaluation_result.metrics.items():
if m_res is not None and getattr(m_res, "score", None) is not None:
Expand Down Expand Up @@ -614,10 +642,14 @@ def _extract_effort_tag(params: dict) -> str | None:
# pass

# Check threshold after logging
if threshold_of_success is not None and not passed:
if threshold is not None and not passed:
assert (
agg_score >= threshold_of_success
), f"Aggregated score {agg_score:.3f} below threshold {threshold_of_success}"
agg_score >= threshold.success
), f"Aggregated score {agg_score:.3f} below threshold {threshold.success}"
if threshold.standard_deviation is not None:
assert (
score_std <= threshold.standard_deviation
), f"Standard deviation {score_std:.3f} above threshold {threshold.standard_deviation}"

except AssertionError:
_log_eval_error("finished", data if "data" in locals() else None, passed=False)
Expand Down
15 changes: 6 additions & 9 deletions tests/pytest/test_apps_coding.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ def apps_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evaluatio
Convert entries from APPS dataset to EvaluationRow objects.
"""
return [
EvaluationRow(
messages=[Message(role="user", content=row["question"])],
ground_truth=row["input_output"]
)
EvaluationRow(messages=[Message(role="user", content=row["question"])], ground_truth=row["input_output"])
for row in data
]

Expand All @@ -31,7 +28,7 @@ def apps_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evaluatio
dataset_adapter=apps_dataset_to_evaluation_row,
model=["fireworks_ai/accounts/fireworks/models/kimi-k2-instruct"],
rollout_input_params=[{"temperature": 0.0, "max_tokens": 4096}],
threshold_of_success=0.33,
passed_threshold=0.33,
rollout_processor=default_single_turn_rollout_processor,
num_runs=1,
mode="pointwise",
Expand All @@ -42,7 +39,7 @@ def test_apps_code_evaluation(row: EvaluationRow) -> EvaluationRow:

Args:
row: EvaluationRow containing the conversation messages and ground_truth as JSON string

Returns:
EvaluationRow with the evaluation result
"""
Expand All @@ -51,8 +48,8 @@ def test_apps_code_evaluation(row: EvaluationRow) -> EvaluationRow:
messages=row.messages,
ground_truth=row.ground_truth,
)

# Set the evaluation result on the row
row.evaluation_result = result
return row

return row
38 changes: 17 additions & 21 deletions tests/pytest/test_basic_coding.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from eval_protocol.models import EvaluateResult, EvaluationRow, Message
from eval_protocol.pytest import default_single_turn_rollout_processor, evaluation_test
from eval_protocol.rewards.code_execution import extract_code_blocks, execute_python_code
from eval_protocol.rewards.code_execution import execute_python_code, extract_code_blocks


def coding_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[EvaluationRow]:
Expand All @@ -18,8 +18,8 @@ def coding_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evaluat
"""
return [
EvaluationRow(
messages=[Message(role="user", content=f"{row['prompt']} Input: {row['input']}")],
ground_truth=row["expected_output"]
messages=[Message(role="user", content=f"{row['prompt']} Input: {row['input']}")],
ground_truth=row["expected_output"],
)
for row in data
]
Expand All @@ -30,63 +30,59 @@ def coding_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evaluat
dataset_adapter=coding_dataset_to_evaluation_row,
model=["fireworks_ai/accounts/fireworks/models/kimi-k2-instruct"],
rollout_input_params=[{"temperature": 0.0, "max_tokens": 4096}],
threshold_of_success=0.8,
passed_threshold=0.8,
rollout_processor=default_single_turn_rollout_processor,
num_runs=1,
mode="pointwise",
)
def test_coding_code_evaluation(row: EvaluationRow) -> EvaluationRow:
"""
Evaluation function that tests code correctness by executing it locally.

This function:
1. Extracts Python code from the assistant's response
2. Executes the code locally with timeout=10
3. Compares the output to ground_truth
4. Returns a score of 1.0 if output matches, 0.0 otherwise

Args:
row: EvaluationRow containing the conversation messages and expected_output in ground_truth

Returns:
EvaluationRow with the evaluation result
"""
# Check if we have an assistant response
if len(row.messages) < 2 or row.messages[-1].role != "assistant":
row.evaluation_result = EvaluateResult(score=0.0, reason="No assistant response found")
return row

assistant_content = row.messages[-1].content or ""
expected_output = (row.ground_truth or "").strip()

# Extract Python code blocks
code_blocks = extract_code_blocks(assistant_content, language="python")
if not code_blocks:
row.evaluation_result = EvaluateResult(score=0.0, reason="No Python code block found")
return row

code = code_blocks[0]["code"]

# Execute the code locally
execution_result = execute_python_code(code, timeout=10)

if not execution_result.get("success", False):
error_msg = execution_result.get("error", "Code execution failed")
row.evaluation_result = EvaluateResult(score=0.0, reason=f"Execution error: {error_msg}")
return row

# Compare output with expected
actual_output = (execution_result.get("output", "") or "").strip()

if actual_output == expected_output:
row.evaluation_result = EvaluateResult(
score=1.0,
reason=f"✅ Output matches: '{actual_output}'"
)
row.evaluation_result = EvaluateResult(score=1.0, reason=f"✅ Output matches: '{actual_output}'")
else:
row.evaluation_result = EvaluateResult(
score=0.0,
reason=f"❌ Expected: '{expected_output}', Got: '{actual_output}'"
score=0.0, reason=f"❌ Expected: '{expected_output}', Got: '{actual_output}'"
)

return row
Loading
Loading