diff --git a/eval_protocol/benchmarks/__init__.py b/eval_protocol/benchmarks/__init__.py new file mode 100644 index 00000000..18a872c7 --- /dev/null +++ b/eval_protocol/benchmarks/__init__.py @@ -0,0 +1,9 @@ +from .registry import export_benchmark, get_benchmark_runner, list_benchmarks + +__all__ = [ + "export_benchmark", + "get_benchmark_runner", + "list_benchmarks", +] + + diff --git a/eval_protocol/benchmarks/registry.py b/eval_protocol/benchmarks/registry.py new file mode 100644 index 00000000..1e3b3e7b --- /dev/null +++ b/eval_protocol/benchmarks/registry.py @@ -0,0 +1,174 @@ +""" +Benchmark registry and export decorator. + +This module provides a lightweight registry for benchmarks and a decorator +`@export_benchmark(name)` that can be stacked with `@evaluation_test`. + +It registers a runnable handle that executes the exact same evaluation pipeline +as the pytest flow by calling `run_evaluation_test_direct` with the parameters +captured from the decorated function. + +Usage in a suite module (stack under @evaluation_test): + + from eval_protocol.benchmarks.registry import export_benchmark + + @export_benchmark("aime25_low") + @evaluation_test(...) + def test_aime_pointwise(row: EvaluationRow) -> EvaluationRow: + ... + +Programmatic run: + + from eval_protocol.benchmarks.registry import get_benchmark_runner + get_benchmark_runner("aime25_low")(model="fireworks_ai/...", print_summary=True, out="artifacts/aime.json") +""" + +from __future__ import annotations + +import json +import os +from typing import Any, Callable, Dict, List, Optional + + +# Global registry: name -> callable runner +_BENCHMARK_REGISTRY: Dict[str, Callable[..., Any]] = {} + + +def list_benchmarks() -> List[str]: + return sorted(_BENCHMARK_REGISTRY.keys()) + + +def get_benchmark_runner(name: str) -> Callable[..., Any]: + try: + return _BENCHMARK_REGISTRY[name] + except KeyError as exc: + raise KeyError(f"Benchmark '{name}' not found. Available: {list_benchmarks()}") from exc + + +def export_benchmark(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Decorator to export a benchmark test into the global registry. + + This expects to be stacked with `@evaluation_test`, so the decorated function + should carry `__ep_config` and `__ep_original_test_func` attributes that the + decorator can read to construct a direct runner. + + The registered runner supports a subset of convenient overrides and maps them + to the same EP_* environment variables used by the pytest plugin to ensure + identical summaries and JSON artifact behavior. + """ + + def _decorator(test_wrapper: Callable[..., Any]) -> Callable[..., Any]: + # Pull through metadata attached by evaluation_test + ep_config: Dict[str, Any] = getattr(test_wrapper, "__ep_config", {}) + original_test_func: Optional[Callable[..., Any]] = getattr( + test_wrapper, "__ep_original_test_func", None + ) + + def _runner( + *, + model: Optional[str] = None, + print_summary: bool = False, + out: Optional[str] = None, + reasoning_effort: Optional[str] = None, + max_rows: Optional[int | str] = None, + num_runs: Optional[int] = None, + input_params_override: Optional[Dict[str, Any]] = None, + max_concurrency: Optional[int] = None, + ) -> Any: + # Map convenience flags to EP_* env used by the pytest flow + if print_summary: + os.environ["EP_PRINT_SUMMARY"] = "1" + if out: + os.environ["EP_SUMMARY_JSON"] = out + # Merge reasoning effort and arbitrary overrides into EP_INPUT_PARAMS_JSON + merged: Dict[str, Any] = {} + if reasoning_effort: + # Fireworks OpenAI-compatible endpoint expects extra_body.reasoning_effort, not nested reasoning dict + merged.setdefault("extra_body", {})["reasoning_effort"] = str(reasoning_effort) + if input_params_override: + def _deep_update(base: Dict[str, Any], over: Dict[str, Any]) -> Dict[str, Any]: + for k, v in over.items(): + if isinstance(v, dict) and isinstance(base.get(k), dict): + _deep_update(base[k], v) + else: + base[k] = v + return base + merged = _deep_update(merged, dict(input_params_override)) + if merged: + os.environ["EP_INPUT_PARAMS_JSON"] = json.dumps(merged) + + if max_rows is not None: + if isinstance(max_rows, str) and max_rows.strip().lower() == "all": + os.environ["EP_MAX_DATASET_ROWS"] = "None" + else: + os.environ["EP_MAX_DATASET_ROWS"] = str(max_rows) + + # Build effective parameters, preferring overrides + models: List[str] = ep_config.get("model") or [] + model_to_use = model or (models[0] if models else None) + if not model_to_use: + raise ValueError( + f"No model provided and none captured from evaluation_test for benchmark '{name}'" + ) + + input_messages = ep_config.get("input_messages") + input_dataset = ep_config.get("input_dataset") + dataset_adapter = ep_config.get("dataset_adapter") + rollout_input_params_list = ep_config.get("rollout_input_params") + rollout_processor = ep_config.get("rollout_processor") + aggregation_method = ep_config.get("aggregation_method") + threshold = ep_config.get("threshold_of_success") + default_num_runs = ep_config.get("num_runs") + max_dataset_rows = ep_config.get("max_dataset_rows") + mcp_config_path = ep_config.get("mcp_config_path") + max_concurrent_rollouts = ep_config.get("max_concurrent_rollouts") + if max_concurrency is not None: + max_concurrent_rollouts = int(max_concurrency) + server_script_path = ep_config.get("server_script_path") + steps = ep_config.get("steps") + mode = ep_config.get("mode") + combine_datasets = ep_config.get("combine_datasets") + + # Choose the first rollout param set by default + rollout_params = None + if isinstance(rollout_input_params_list, list) and rollout_input_params_list: + rollout_params = rollout_input_params_list[0] + + # Import runner lazily to avoid hard import dependencies and circulars + import importlib + + _mod = importlib.import_module("eval_protocol.pytest.evaluation_test") + run_evaluation_test_direct = getattr(_mod, "run_evaluation_test_direct") + + return run_evaluation_test_direct( + test_func=original_test_func or test_wrapper, + model=model_to_use, + input_messages=input_messages, + input_dataset=input_dataset, + dataset_adapter=dataset_adapter, + rollout_input_params=rollout_params, + rollout_processor=rollout_processor, + aggregation_method=aggregation_method, + threshold_of_success=threshold, + num_runs=(num_runs if num_runs is not None else default_num_runs), + max_dataset_rows=max_dataset_rows, + mcp_config_path=mcp_config_path, + max_concurrent_rollouts=max_concurrent_rollouts, + server_script_path=server_script_path, + steps=steps, + mode=mode, + ) + + # Register runner + if name in _BENCHMARK_REGISTRY: + # Overwrite with latest definition + _BENCHMARK_REGISTRY[name] = _runner + else: + _BENCHMARK_REGISTRY[name] = _runner + + return test_wrapper + + return _decorator + + diff --git a/eval_protocol/benchmarks/run.py b/eval_protocol/benchmarks/run.py new file mode 100644 index 00000000..9195666f --- /dev/null +++ b/eval_protocol/benchmarks/run.py @@ -0,0 +1,100 @@ +""" +Minimal CLI runner for exported benchmarks. + +Usage: + + python -m eval_protocol.benchmarks.run aime25_low \ + --model fireworks_ai/accounts/fireworks/models/gpt-oss-120b \ + --print-summary \ + --out artifacts/aime25_low.json \ + --max-rows 50 \ + --reasoning-effort low +""" + +from __future__ import annotations + +import argparse +from typing import Any + +from importlib import import_module +import pkgutil +import eval_protocol.benchmarks.suites as suites_pkg +from eval_protocol.benchmarks.registry import get_benchmark_runner, list_benchmarks + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run an exported eval-protocol benchmark") + parser.add_argument("name", help=f"Benchmark name. Known: {', '.join(list_benchmarks()) or '(none)'}") + parser.add_argument("--model", required=True, help="Model identifier (provider/model)") + parser.add_argument("--print-summary", action="store_true", help="Print concise EP summary line") + parser.add_argument("--out", help="Write JSON summary artifact to path or directory") + parser.add_argument( + "--reasoning-effort", + choices=["low", "medium", "high"], + help="Sets extra_body.reasoning.effort via EP_INPUT_PARAMS_JSON", + ) + parser.add_argument( + "--max-rows", + help="Limit rows: integer or 'all' for no limit (maps to EP_MAX_DATASET_ROWS)", + ) + parser.add_argument("--num-runs", type=int, help="Override num_runs if provided") + parser.add_argument("--max-tokens", type=int, help="Override max_tokens for generation requests") + parser.add_argument("--max-concurrency", type=int, help="Override max concurrent rollouts") + # Allow overriding reasoning effort explicitly (low/medium/high). If omitted, suite default is used. + # Already mapped by --reasoning-effort above. + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + # Auto-import all suite modules so their @export_benchmark decorators register + # Import all suite modules so their @export_benchmark decorators register + import sys, traceback + for modinfo in pkgutil.iter_modules(suites_pkg.__path__): + mod_name = f"{suites_pkg.__name__}.{modinfo.name}" + try: + import_module(mod_name) + except Exception as e: + print(f"[bench] failed to import suite module: {mod_name}: {e}", file=sys.stderr) + traceback.print_exc() + # Fallback: if nothing registered yet and a known suite was requested, try explicit import + if not list_benchmarks(): + known_map = { + "aime25_low": "eval_protocol.benchmarks.suites.aime25", + } + forced = known_map.get(args.name) + if forced: + try: + import_module(forced) + except Exception as e: + print(f"[bench] explicit import failed for {forced}: {e}", file=sys.stderr) + runner = get_benchmark_runner(args.name) + max_rows: int | str | None = None + if args.max_rows is not None: + try: + max_rows = int(args.max_rows) + except Exception: + max_rows = str(args.max_rows) + # Build input params override if needed + ip_override = {} + if args.max_tokens is not None: + ip_override["max_tokens"] = int(args.max_tokens) + + _ = runner( + model=args.model, + print_summary=args.print_summary, + out=args.out, + reasoning_effort=args.reasoning_effort, + max_rows=max_rows, + num_runs=args.num_runs, + input_params_override=(ip_override or None), + max_concurrency=args.max_concurrency, + ) + # Non-zero exit on failure gate is handled within the runner via assertions + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/eval_protocol/benchmarks/suites/__init__.py b/eval_protocol/benchmarks/suites/__init__.py new file mode 100644 index 00000000..04746ad6 --- /dev/null +++ b/eval_protocol/benchmarks/suites/__init__.py @@ -0,0 +1,3 @@ +# Suite modules are auto-imported by eval_protocol.benchmarks.run to register benchmarks. + + diff --git a/examples/aime2025_chat_completion/tests/test_aime2025.py b/eval_protocol/benchmarks/suites/aime25.py similarity index 62% rename from examples/aime2025_chat_completion/tests/test_aime2025.py rename to eval_protocol/benchmarks/suites/aime25.py index a0ef92ad..d6572bf9 100644 --- a/examples/aime2025_chat_completion/tests/test_aime2025.py +++ b/eval_protocol/benchmarks/suites/aime25.py @@ -1,43 +1,53 @@ -import os -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult from eval_protocol.pytest.default_single_turn_rollout_process import ( default_single_turn_rollout_processor, ) from eval_protocol.pytest.evaluation_test import evaluation_test -from examples.aime2025_chat_completion.main import _extract_boxed_text, _normalize_to_int_or_none +from eval_protocol.benchmarks.registry import export_benchmark + SYSTEM_PROMPT = ( - "You are a helpful math assistant. Please reason step by step, and put your " "final answer within \\boxed{...}." + "You are a helpful math assistant. Please reason step by step, and put your " + "final answer within \\boxed{...}." ) -""" -This test consumes the AIME2025 dataset directly from Hugging Face JSONL URLs via -the evaluation_test dataset loader + adapter. By default, max_dataset_rows=2 to -keep CI fast; set it to None to run the full dataset. -""" +def _extract_boxed_text(text: str) -> str: + import re + + if not text: + return "" + + pattern_boxed = r"boxed{(.*?)}|framebox{(.*?)}" + matches = re.findall(pattern_boxed, text, re.DOTALL) + if matches: + for match in matches[::-1]: + for group in match: + if group: + return group.split(",")[-1].strip() + matches_digits = re.findall(r"\d+", text, re.DOTALL) + if matches_digits: + return matches_digits[-1] + return "" + + +def _normalize_to_int_or_none(s: Optional[str]) -> Optional[int]: + import re -def _ep_int(var_name: str, default_value: int | None) -> int | None: - """Read EP_*-prefixed integer or 'None' from environment for easy overrides.""" - raw = os.getenv(var_name) - if raw is None: - return default_value - raw_stripped = raw.strip().lower() - if raw_stripped == "none": + if s is None: + return None + m = re.match(r"\d+", str(s).strip()) + if not m: return None try: - return int(raw_stripped) + return int(m.group(0)) except ValueError: - return default_value + return None def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: - """ - Convert raw AIME2025 rows (with keys 'question' and 'answer') to EvaluationRow. - Limits handled by evaluation_test's max_dataset_rows, so adapter is simple. - """ converted: List[EvaluationRow] = [] for r in rows: question = r.get("question", "") @@ -46,10 +56,13 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: Message(role="system", content=SYSTEM_PROMPT), Message(role="user", content=str(question)), ] - converted.append(EvaluationRow(messages=messages, ground_truth=str(answer) if answer is not None else None)) + converted.append( + EvaluationRow(messages=messages, ground_truth=str(answer) if answer is not None else None) + ) return converted +@export_benchmark("aime25") @evaluation_test( model=["fireworks_ai/accounts/fireworks/models/gpt-oss-120b"], input_dataset=[ @@ -57,26 +70,21 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-II.jsonl", ], dataset_adapter=aime2025_dataset_adapter, - rollout_input_params=[{"extra_body": {"reasoning_effort": "low"}}], + rollout_input_params=[{"max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}}], rollout_processor=default_single_turn_rollout_processor, aggregation_method="mean", - passed_threshold=None, - num_runs=2, + threshold_of_success=None, + num_runs=8, max_dataset_rows=2, max_concurrent_rollouts=4, mode="pointwise", ) -def test_aime2025_pointwise(row: EvaluationRow) -> EvaluationRow: - """ - Pointwise evaluation of AIME2025 rows: extract final integer from assistant message and compare to ground truth. - """ - # After rollout, the last message should be assistant's response +def test_aime25_pointwise(row: EvaluationRow) -> EvaluationRow: assistant_msgs = [m for m in row.messages if m.role == "assistant"] content = assistant_msgs[-1].content if assistant_msgs else "" extracted_text = _extract_boxed_text(content or "") extracted_int = _normalize_to_int_or_none(extracted_text) - # Ground truth comes from dataset_adapter gt_int = _normalize_to_int_or_none(row.ground_truth or "") is_valid = extracted_int is not None and gt_int is not None @@ -106,3 +114,5 @@ def test_aime2025_pointwise(row: EvaluationRow) -> EvaluationRow: metrics=metrics, ) return row + + diff --git a/eval_protocol/benchmarks/suites/gpqa.py b/eval_protocol/benchmarks/suites/gpqa.py new file mode 100644 index 00000000..2b89284c --- /dev/null +++ b/eval_protocol/benchmarks/suites/gpqa.py @@ -0,0 +1,100 @@ +from typing import List + +import csv +import io +import re +import requests + +from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult +from eval_protocol.pytest.evaluation_test import evaluation_test +from eval_protocol.pytest.default_single_turn_rollout_process import ( + default_single_turn_rollout_processor, +) +from eval_protocol.benchmarks.registry import export_benchmark + + +SYSTEM_PROMPT = ( + "You are a helpful assistant. Read the question and options carefully. " + "Express your final answer strictly as a single letter: A, B, C, or D." +) + + +def _load_gpqa_messages_from_csv() -> List[List[Message]]: + url = "https://openaipublic.blob.core.windows.net/simple-evals/gpqa_diamond.csv" + resp = requests.get(url, timeout=60) + resp.raise_for_status() + + messages_list: List[List[Message]] = [] + reader = csv.DictReader(io.StringIO(resp.text)) + for ex in reader: + q = str(ex.get("Question", "")) + correct = str(ex.get("Correct Answer", "")).strip() + inc1 = str(ex.get("Incorrect Answer 1", "")) + inc2 = str(ex.get("Incorrect Answer 2", "")) + inc3 = str(ex.get("Incorrect Answer 3", "")) + choices = [correct, inc1, inc2, inc3] + user_content = ( + f"{q}\n\n(A) {choices[0]}\n(B) {choices[1]}\n(C) {choices[2]}\n(D) {choices[3]}\n\nAnswer with one letter." + ) + messages_list.append( + [ + Message(role="system", content=SYSTEM_PROMPT), + Message(role="user", content=user_content), + # Correct answer is always option A by construction + Message(role="system", content="__GT__:A"), + ] + ) + if not messages_list: + raise RuntimeError("Failed to load GPQA messages: no rows found from source") + return messages_list + + +def _extract_abcd_letter(text: str) -> str | None: + if not text: + return None + m = re.search(r"\b([ABCD])\b", text.upper()) + return m.group(1) if m else None + + +_GPQA_INPUT_MESSAGES = _load_gpqa_messages_from_csv() + + +@export_benchmark("gpqa") +@evaluation_test( + model=["fireworks_ai/accounts/fireworks/models/gpt-oss-120b"], + input_messages=_GPQA_INPUT_MESSAGES, + rollout_input_params=[{"extra_body": {"reasoning_effort": "low"}}], + rollout_processor=default_single_turn_rollout_processor, + aggregation_method="mean", + threshold_of_success=None, + num_runs=8, + mode="pointwise", +) +def gpqa_pointwise(row: EvaluationRow) -> EvaluationRow: + assistant_msgs = [m for m in row.messages if m.role == "assistant"] + content = assistant_msgs[-1].content if assistant_msgs else "" + + pred = _extract_abcd_letter(content or "") + # Retrieve GT from the trailing system message we appended + gt_tokens = [m.content for m in row.messages if m.role == "system" and (m.content or "").startswith("__GT__:")] + gt = gt_tokens[-1].split(":", 1)[1].strip() if gt_tokens else None + + is_valid = pred is not None and gt in {"A", "B", "C", "D"} + score = 1.0 if (is_valid and pred == gt) else 0.0 + + row.evaluation_result = EvaluateResult( + score=score, + reason=("Correct option" if score == 1.0 else "Incorrect option"), + is_score_valid=is_valid, + metrics={ + "exact_match": MetricResult( + score=score, + is_score_valid=is_valid, + reason=("Matched" if score == 1.0 else "Not matched"), + data={"pred": pred, "gt": gt}, + ) + }, + ) + return row + + diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 69966b39..95613ebc 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -36,11 +36,22 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: request_params = {"model": config.model, "messages": messages_payload, **config.input_params} # Ensure caching is disabled only for this request (review feedback) request_params["cache"] = {"no-cache": True} - # Allow passing reasoning effort to Fireworks via LiteLLM using extra_body - # Expected: config.input_params may contain {"reasoning": {"effort": "low|medium|high"}} - if "reasoning" in config.input_params: + # Single-level reasoning effort: expect `reasoning_effort` only + effort_val = None + if isinstance(config.input_params, dict): + if "reasoning_effort" in config.input_params: + effort_val = str(config.input_params["reasoning_effort"]) # flat shape + elif isinstance(config.input_params.get("extra_body"), dict) and "reasoning_effort" in config.input_params["extra_body"]: + # Accept if user passed it directly inside extra_body + effort_val = str(config.input_params["extra_body"]["reasoning_effort"]) # already in extra_body + + if effort_val: + # Always under extra_body so LiteLLM forwards to provider-specific param set request_params.setdefault("extra_body", {}) - request_params["extra_body"]["reasoning"] = config.input_params["reasoning"] + request_params["extra_body"]["reasoning_effort"] = effort_val + # Ensure unsupported top-level keys are not present + if "reasoning_effort" in request_params: + request_params.pop("reasoning_effort", None) if row.tools is not None: request_params["tools"] = row.tools @@ -87,7 +98,10 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: async with semaphore: - return await process_row(r) + try: + return await process_row(r) + except Exception as e: + return r tasks = [_sem_wrapper(row) for row in rows] dataset = list(await asyncio.gather(*tasks)) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index ef516f6b..7557ae3d 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -754,6 +754,334 @@ def dual_mode_wrapper(*args, **kwargs): # Create the dual mode wrapper dual_mode_wrapper = create_dual_mode_wrapper() + # Attach metadata so non-pytest runners (e.g., export_benchmark) can reconstruct runs + try: + dual_mode_wrapper.__ep_original_test_func = test_func # type: ignore[attr-defined] + dual_mode_wrapper.__ep_config = { + "model": model, + "input_messages": input_messages, + "input_dataset": input_dataset, + "dataset_adapter": dataset_adapter, + "rollout_input_params": rollout_input_params, + "rollout_processor": rollout_processor, + "evaluation_test_kwargs": evaluation_test_kwargs, + "aggregation_method": aggregation_method, + "passed_threshold": passed_threshold, + "num_runs": num_runs, + "max_dataset_rows": max_dataset_rows, + "mcp_config_path": mcp_config_path, + "max_concurrent_rollouts": max_concurrent_rollouts, + "server_script_path": server_script_path, + "steps": steps, + "mode": mode, + "combine_datasets": combine_datasets, + } # type: ignore[attr-defined] + + # Provide a direct runner method to avoid external imports + def __ep_run_direct( + *, + model_override: str | None = None, + num_runs_override: int | None = None, + rollout_input_params_override: Dict[str, Any] | None = None, + ): + cfg = dual_mode_wrapper.__ep_config # type: ignore[attr-defined] + models = cfg.get("model") or [] + _model = model_override or (models[0] if models else None) + if not _model: + raise ValueError("No model provided for direct run") + rip = rollout_input_params_override + if rip is None: + rip_list = cfg.get("rollout_input_params") + rip = rip_list[0] if isinstance(rip_list, list) and rip_list else {} + return run_evaluation_test_direct( + test_func=dual_mode_wrapper.__ep_original_test_func, # type: ignore[attr-defined] + model=_model, + input_messages=cfg.get("input_messages"), + input_dataset=cfg.get("input_dataset"), + dataset_adapter=cfg.get("dataset_adapter"), + rollout_input_params=rip, + rollout_processor=cfg.get("rollout_processor"), + aggregation_method=cfg.get("aggregation_method"), + threshold_of_success=cfg.get("passed_threshold"), + num_runs=(num_runs_override if num_runs_override is not None else cfg.get("num_runs")), + max_dataset_rows=cfg.get("max_dataset_rows"), + mcp_config_path=cfg.get("mcp_config_path"), + max_concurrent_rollouts=cfg.get("max_concurrent_rollouts"), + server_script_path=cfg.get("server_script_path"), + steps=cfg.get("steps"), + mode=cfg.get("mode"), + combine_datasets=cfg.get("combine_datasets"), + ) + + dual_mode_wrapper.__ep_run_direct = __ep_run_direct # type: ignore[attr-defined] + except Exception: + # Best-effort; never fail pytest setup due to metadata attachment + pass + return dual_mode_wrapper return decorator + + +def run_evaluation_test_direct( + *, + test_func: TestFunction, + model: str, + input_messages: Optional[List[InputMessagesParam]] = None, + input_dataset: Optional[List[DatasetPathParam]] = None, + dataset_adapter: Callable[[List[Dict[str, Any]]], Dataset] = default_dataset_adapter, + rollout_input_params: Optional[RolloutInputParam] = None, + rollout_processor: RolloutProcessor = default_no_op_rollout_processor, + aggregation_method: AggregationMethod = "mean", + threshold_of_success: Optional[float] = None, + num_runs: int = 1, + max_dataset_rows: Optional[int] = None, + mcp_config_path: Optional[str] = None, + max_concurrent_rollouts: int = 8, + server_script_path: Optional[str] = None, + steps: int = 30, + mode: EvaluationTestMode = "batch", + combine_datasets: bool = True, +) -> Dict[str, Any]: + """ + Programmatic runner that executes the same pipeline as @evaluation_test without pytest. + Honors EP_* env overrides and emits the same summary/JSON artifact. + Returns a dict with keys: summary, results. + """ + + def _parse_ep_max_rows(default_value: int | None) -> int | None: + raw = os.getenv("EP_MAX_DATASET_ROWS") + if raw is None: + return default_value + s = raw.strip().lower() + if s == "none": + return None + try: + return int(s) + except ValueError: + return default_value + + def _deep_update_dict(base: dict, override: dict) -> dict: + for key, value in override.items(): + if isinstance(value, dict) and isinstance(base.get(key), dict): + _deep_update_dict(base[key], value) + else: + base[key] = value + return base + + # Build dataset/messages + data: List[EvaluationRow] = [] + if input_dataset is not None: + # Concatenate rows across multiple paths/URLs + data_jsonl: List[Dict[str, Any]] = [] + for p in input_dataset: + data_jsonl.extend(load_jsonl(p)) + effective_max_rows = _parse_ep_max_rows(max_dataset_rows) + if effective_max_rows is not None: + data_jsonl = data_jsonl[:effective_max_rows] + data = dataset_adapter(data_jsonl) + elif input_messages is not None: + effective_max_rows = _parse_ep_max_rows(max_dataset_rows) + msgs = input_messages + if effective_max_rows is not None and isinstance(msgs, list): + msgs = msgs[:effective_max_rows] # type: ignore + if isinstance(msgs, list) and msgs and isinstance(msgs[0], Message): + data = [EvaluationRow(messages=msgs)] # type: ignore[arg-type] + else: + data = [EvaluationRow(messages=m) for m in msgs] # type: ignore + else: + raise ValueError("No input dataset or input messages provided") + + # Build input params and apply env JSON override + input_params: Dict[str, Any] = rollout_input_params or {} + try: + import json as _json + + _env_override = os.getenv("EP_INPUT_PARAMS_JSON") + if _env_override: + override_obj = _json.loads(_env_override) + if isinstance(override_obj, dict): + input_params = _deep_update_dict(dict(input_params), override_obj) + except Exception: + pass + + # Prepare metadata + eval_metadata = EvalMetadata( + name=test_func.__name__, + description=test_func.__doc__, + status="running", + num_runs=num_runs, + aggregation_method=aggregation_method, + threshold_of_success=threshold_of_success, + passed=None, + ) + + completion_params = CompletionParams( + model=model, + temperature=input_params.get("temperature"), + max_tokens=input_params.get("max_tokens"), + max_tool_calls=input_params.get("max_tool_calls"), + ) + + for row in data: + if row.input_metadata is None: + row.input_metadata = InputMetadata() + row.input_metadata.completion_params = completion_params + if row.input_metadata.session_data is None: + row.input_metadata.session_data = {} + row.input_metadata.session_data["mode"] = mode + row.eval_metadata = eval_metadata + row.pid = os.getpid() + default_logger.log(row) + + config = RolloutProcessorConfig( + model=model, + input_params=input_params, + mcp_config_path=mcp_config_path or "", + max_concurrent_rollouts=max_concurrent_rollouts, + server_script_path=server_script_path, + steps=steps, + ) + + all_results: List[EvaluationRow] = [] + try: + for _ in range(num_runs): + fresh_rows = [copy.deepcopy(r) for r in data] + processed_rows = execute_function(rollout_processor, rows=fresh_rows, config=config) + if mode == "pointwise": + for row in processed_rows: + result = execute_function(test_func, row=row) + if result is None or not isinstance(result, EvaluationRow): + raise ValueError( + f"Test function {test_func.__name__} did not return an EvaluationRow instance." + ) + all_results.append(result) + else: + results = execute_function(test_func, rows=processed_rows) + if results is None or not isinstance(results, list) or not results: + raise ValueError( + f"Test function {test_func.__name__} did not return a non-empty list of EvaluationRow instances." + ) + if not all(isinstance(r, EvaluationRow) for r in results): + raise ValueError( + f"Test function {test_func.__name__} returned a list containing non-EvaluationRow instances." + ) + all_results.extend(results) + + scores = [r.evaluation_result.score for r in all_results if r.evaluation_result] + agg_score = aggregate(scores, aggregation_method) + + ci_low: float | None = None + ci_high: float | None = None + if aggregation_method == "mean": + try: + result_ci = compute_fixed_set_mu_ci(all_results) + mu_ci_low, mu_ci_high = result_ci[1], result_ci[2] + if mu_ci_low is not None and mu_ci_high is not None: + ci_low = float(mu_ci_low) + ci_high = float(mu_ci_high) + except Exception: + ci_low = None + ci_high = None + + passed = None + if threshold_of_success is not None: + passed = agg_score >= threshold_of_success + for r in all_results: + if r.eval_metadata is not None: + r.eval_metadata.status = "finished" + r.eval_metadata.passed = passed + default_logger.log(r) + + # Summary/JSON artifact (same EP_* env behavior) + summary_obj: Dict[str, Any] = {} + try: + should_print = os.getenv("EP_PRINT_SUMMARY") == "1" + summary_path = os.getenv("EP_SUMMARY_JSON") + suite_name = test_func.__name__ + total_rows = len(all_results) + summary_obj = { + "suite": suite_name, + "model": model, + "agg_score": float(agg_score) if agg_score is not None else None, + "num_runs": num_runs, + "rows": total_rows, + } + if ci_low is not None and ci_high is not None: + summary_obj["agg_ci_low"] = ci_low + summary_obj["agg_ci_high"] = ci_high + if should_print: + if ci_low is not None and ci_high is not None: + print( + f"EP Summary | suite={suite_name} model={model} agg={summary_obj['agg_score']:.3f} ci95=[{ci_low:.3f},{ci_high:.3f}] runs={num_runs} rows={total_rows}" + ) + else: + print( + f"EP Summary | suite={suite_name} model={model} agg={summary_obj['agg_score']:.3f} runs={num_runs} rows={total_rows}" + ) + if summary_path: + import json as _json + import pathlib as _pathlib + import time as _time + import re as _re + + def _sanitize_filename(text: str) -> str: + safe = _re.sub(r"[^A-Za-z0-9._-]+", "-", text.strip()) + return safe[:120] + + def _extract_effort_tag(params: dict) -> str | None: + try: + if not isinstance(params, dict): + return None + if "extra_body" in params and isinstance(params["extra_body"], dict): + eb = params["extra_body"] + if isinstance(eb.get("reasoning"), dict) and "effort" in eb["reasoning"]: + return str(eb["reasoning"]["effort"]).lower() + if "reasoning_effort" in eb: + return str(eb["reasoning_effort"]).lower() + if "reasoning" in params and isinstance(params["reasoning"], dict) and "effort" in params["reasoning"]: + return str(params["reasoning"]["effort"]).lower() + except Exception: + return None + return None + + model_slug = _sanitize_filename(model) + effort_tag = _extract_effort_tag(input_params) or "" + effort_suffix = f"__effort-{_sanitize_filename(effort_tag)}" if effort_tag else "" + base_name = f"{suite_name}__{model_slug}{effort_suffix}__{mode}__runs{num_runs}.json" + + p = _pathlib.Path(summary_path) + summary_obj["timestamp"] = int(_time.time()) + if p.suffix.lower() != ".json" or str(summary_path).endswith("/") or p.is_dir(): + out_dir = p + out_dir.mkdir(parents=True, exist_ok=True) + out_file = out_dir / base_name + else: + parent = p.parent + parent.mkdir(parents=True, exist_ok=True) + if effort_tag: + out_file = parent / f"{p.stem}__{_sanitize_filename(effort_tag)}{p.suffix}" + else: + out_file = p + with open(out_file, "w", encoding="utf-8") as f: + _json.dump(summary_obj, f) + except Exception: + pass + + if threshold_of_success is not None and not passed: + assert agg_score >= threshold_of_success, ( + f"Aggregated score {agg_score:.3f} below threshold {threshold_of_success}" + ) + + return {"summary": summary_obj, "results": all_results} + except Exception: + # Mark errors on rows + if eval_metadata is not None: + eval_metadata.status = "error" + eval_metadata.passed = False + for r in (data or []): + if r.eval_metadata is not None: + r.eval_metadata.status = "error" + r.eval_metadata.passed = False + default_logger.log(r) + raise diff --git a/eval_protocol/pytest/plugin.py b/eval_protocol/pytest/plugin.py index 5eb9a946..6c58d1e2 100644 --- a/eval_protocol/pytest/plugin.py +++ b/eval_protocol/pytest/plugin.py @@ -131,10 +131,9 @@ def pytest_configure(config) -> None: merged[k] = v reasoning_effort = config.getoption("--ep-reasoning-effort") if reasoning_effort: - # Standardize into extra_body.reasoning.effort in EP_INPUT_PARAMS_JSON + # Always place under extra_body to avoid LiteLLM rejecting top-level params eb = merged.setdefault("extra_body", {}) - reasoning = eb.setdefault("reasoning", {}) - reasoning["effort"] = str(reasoning_effort) + eb["reasoning_effort"] = str(reasoning_effort) if merged: os.environ["EP_INPUT_PARAMS_JSON"] = _json.dumps(merged) except Exception: