Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions eval_protocol/utils/logs_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ def __init__(

# Subscribe to events and start listening for cross-process events
event_bus.subscribe(self._handle_event)
event_bus.start_listening()

logger.info(f"LogsServer initialized on {host}:{port}")

Expand Down Expand Up @@ -288,6 +287,12 @@ def _handle_event(self, event_type: str, data: Any) -> None:
data = EvaluationRow(**data)
self.websocket_manager.broadcast_row_upserted(data)

def start_loops(self):
"""Start the broadcast loop and evaluation watcher."""
self.websocket_manager.start_broadcast_loop()
self.evaluation_watcher.start()
event_bus.start_listening()

async def run_async(self):
"""
Run the logs server asynchronously with file watching.
Expand All @@ -300,11 +305,7 @@ async def run_async(self):
logger.info(f"Serving files from: {self.build_dir}")
logger.info("WebSocket endpoint available at /ws")

# Start the broadcast loop
self.websocket_manager.start_broadcast_loop()

# Start the evaluation watcher
self.evaluation_watcher.start()
self.start_loops()

config = uvicorn.Config(
self.app,
Expand Down Expand Up @@ -336,24 +337,26 @@ def run(self):

def create_app(host: str = "localhost", port: int = 8000, build_dir: Optional[str] = None) -> FastAPI:
"""
Factory function to create a FastAPI app instance.
Factory function to create a FastAPI app instance and start the server with async loops.

This allows uvicorn to call it with parameters and avoids top-level variable instantiation.
This creates a LogsServer instance and starts it in a background thread to ensure
all async loops (WebSocket broadcast, evaluation watching) are running.

Args:
host: Host to bind to
port: Port to bind to
build_dir: Optional custom build directory path

Returns:
FastAPI app instance
FastAPI app instance with server running in background
"""
if build_dir is None:
build_dir = os.path.abspath(
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "vite-app", "dist")
)

server = LogsServer(host=host, port=port, build_dir=build_dir)
server.start_loops()
return server.app


Expand Down
2 changes: 1 addition & 1 deletion examples/aime2025_chat_completion/tests/test_aime2025.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]:
rollout_input_params=[{"extra_body": {"reasoning_effort": "low"}}],
rollout_processor=default_single_turn_rollout_processor,
aggregation_method="mean",
threshold_of_success=None,
passed_threshold=None,
num_runs=2,
max_dataset_rows=2,
max_concurrent_rollouts=4,
Expand Down
2 changes: 1 addition & 1 deletion examples/gpqa/tests/test_gpqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _load_gpqa_messages_from_csv() -> List[List[Message]]:
], # default to low effort; override via CLI plugin
rollout_processor=default_single_turn_rollout_processor,
aggregation_method="mean",
threshold_of_success=None,
passed_threshold=None,
num_runs=8,
mode="pointwise",
)
Expand Down
14 changes: 4 additions & 10 deletions examples/healthbench/tests/test_evaluation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from typing import Dict, List

import json
from typing import Dict, List

from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult
from eval_protocol.pytest.evaluation_test import evaluation_test
from eval_protocol.pytest.default_single_turn_rollout_process import (
default_single_turn_rollout_processor,
)
from eval_protocol.pytest.evaluation_test import evaluation_test


SYSTEM_PROMPT = (
"You are a clinician assistant. Provide safe, accurate guidance."
)
SYSTEM_PROMPT = "You are a clinician assistant. Provide safe, accurate guidance."


# Inline two small HealthBench-like samples and attach tiny rubrics in-memory
Expand Down Expand Up @@ -56,7 +52,7 @@
rollout_input_params=[{"temperature": 0.2, "max_tokens": 512}],
rollout_processor=default_single_turn_rollout_processor,
aggregation_method="mean",
threshold_of_success=None,
passed_threshold=None,
num_runs=1,
max_dataset_rows=2,
mode="pointwise",
Expand Down Expand Up @@ -91,5 +87,3 @@ def test_healthbench_pointwise(row: EvaluationRow) -> EvaluationRow:
},
)
return row


2 changes: 1 addition & 1 deletion tests/pytest/test_markdown_highlighting.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def markdown_dataset_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evalu
@evaluation_test(
input_dataset=["tests/pytest/data/markdown_dataset.jsonl"],
dataset_adapter=markdown_dataset_to_evaluation_row,
model=["fireworks_ai/accounts/fireworks/models/kimi-k2-instruct"],
model=["fireworks_ai/accounts/fireworks/models/gpt-oss-120b"],
rollout_input_params=[{"temperature": 0.0, "max_tokens": 4096}],
passed_threshold=0.5,
rollout_processor=default_single_turn_rollout_processor,
Expand Down
3 changes: 2 additions & 1 deletion tests/pytest/test_pytest_function_calling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from typing import Any, Dict, List

from eval_protocol.models import EvaluationRow
from eval_protocol.pytest import default_single_turn_rollout_processor, evaluation_test
from eval_protocol.rewards.function_calling import exact_tool_match_reward
Expand All @@ -19,7 +20,7 @@ def function_calling_to_evaluation_row(rows: List[Dict[str, Any]]) -> List[Evalu

@evaluation_test(
input_dataset=["tests/pytest/data/function_calling.jsonl"],
model=["fireworks_ai/accounts/fireworks/models/kimi-k2-instruct"],
model=["fireworks_ai/accounts/fireworks/models/gpt-oss-120b"],
mode="pointwise",
dataset_adapter=function_calling_to_evaluation_row,
rollout_processor=default_single_turn_rollout_processor,
Expand Down
89 changes: 36 additions & 53 deletions tests/test_logs_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import httpx
import psutil
import pytest
from fastapi import FastAPI
Expand Down Expand Up @@ -332,10 +333,14 @@ async def test_handle_event(self, temp_build_dir):
# The event should be queued for broadcasting
assert not server.websocket_manager._broadcast_queue.empty()

def test_create_app_factory(self, temp_build_dir):
@pytest.mark.asyncio
async def test_create_app_factory(self, temp_build_dir):
"""Test the create_app factory function."""
app = create_app(build_dir=str(temp_build_dir))
assert isinstance(app, FastAPI)
with patch("eval_protocol.utils.logs_server.LogsServer.start_loops") as mock_start_loops:
app = create_app(build_dir=str(temp_build_dir))
assert isinstance(app, FastAPI)
# Verify that start_loops was called
mock_start_loops.assert_called_once()

def test_serve_logs_convenience_function(self, temp_build_dir):
"""Test the serve_logs convenience function."""
Expand Down Expand Up @@ -475,13 +480,11 @@ def test_health_endpoint(self, temp_build_dir_with_files):
assert data["status"] == "ok"

@pytest.mark.asyncio
async def test_server_runs_on_specific_port(self, temp_build_dir_with_files):
"""Integration test: verify that LogsServer actually runs on the specified port (async requests)."""
async def test_server_runs_on_specific_port(self):
"""Integration test: verify that LogsServer runs on specified port and handles port parameters correctly."""
import multiprocessing
import socket

import httpx

# Find an available port for testing
def find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
Expand All @@ -491,54 +494,34 @@ def find_free_port():

test_port = find_free_port()

# Create and start server in background
server = LogsServer(build_dir=str(temp_build_dir_with_files), port=test_port)

# Start server in background task
server_task = asyncio.create_task(server.run_async())

try:
# Wait longer for server to start and be ready
await asyncio.sleep(3)

async with httpx.AsyncClient() as client:
# Test that we can actually connect to the server on the specified port
response = await client.get(f"http://localhost:{test_port}/", timeout=10)
assert response.status_code == 200
assert "Test" in response.text
# Start server with dynamic port and build_dir
server_process = multiprocessing.Process(target=serve_logs, kwargs={"port": test_port}, daemon=True)
server_process.start()

# Test the health endpoint
response = await client.get(f"http://localhost:{test_port}/health", timeout=10)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"

finally:
# Clean up
server_task.cancel()
# Wait for server to be ready
for _ in range(20):
try:
await server_task
except asyncio.CancelledError:
response = httpx.get(f"http://localhost:{test_port}/health", timeout=1)
if response.status_code == 200:
break
except httpx.RequestError:
pass

def test_serve_logs_port_parameter_integration(self, temp_build_dir_with_files):
"""Integration test: verify that serve_logs function actually works with port parameter."""
# This test verifies that serve_logs creates LogsServer with the correct port
# without actually starting the server
test_port = 9999

# Use a different approach - mock the LogsServer class and verify the port parameter
with patch("eval_protocol.utils.logs_server.LogsServer") as mock_logs_server_class:
mock_server_instance = Mock()
mock_logs_server_class.return_value = mock_server_instance

# Call serve_logs with specific port
serve_logs(port=test_port)

# Verify that LogsServer was created with the correct port
mock_logs_server_class.assert_called_once_with(port=test_port)
# Verify that the run method was called on the instance
mock_server_instance.run.assert_called_once()
await asyncio.sleep(1)

async with httpx.AsyncClient() as client:
# Test health endpoint
response = await client.get(f"http://localhost:{test_port}/health", timeout=10)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"

# Clean up server
if server_process.is_alive():
server_process.terminate()
server_process.join(timeout=2)
if server_process.is_alive():
server_process.kill()
server_process.join(timeout=1)


@pytest.mark.asyncio
Expand Down
15 changes: 15 additions & 0 deletions vite-app/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ export const discoverServerConfig = async (): Promise<void> => {
return;
}

// Check if we're in Vite development mode
if (import.meta.env.DEV) {
// In dev mode, use localhost:8000
config.websocket.host = 'localhost';
config.websocket.port = '8000';
config.websocket.protocol = 'ws';

config.api.host = 'localhost';
config.api.port = '8000';
config.api.protocol = 'http';

console.log('Using Vite dev config (localhost:8000):', config);
return;
}

// Fallback: Try to discover server configuration from the current location
const currentHost = window.location.hostname;
const currentPort = window.location.port;
Expand Down
11 changes: 11 additions & 0 deletions vite-app/src/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,14 @@ declare module '*.png' {
const content: string;
export default content;
}
/// <reference types="vite/client" />

interface ImportMetaEnv {
readonly DEV: boolean
readonly PROD: boolean
readonly MODE: string
}

interface ImportMeta {
readonly env: ImportMetaEnv
}
Loading