Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions docs/cross_process_events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Event Bus System

The eval protocol includes a flexible event bus system that supports both in-process and cross-process event communication. This is particularly useful for scenarios where you have:

- An evaluation test running in one process
- A logs server running in another process
- Real-time updates between processes

## Architecture

The event bus system consists of:

1. **EventBus**: The core interface for event communication
2. **SqliteEventBus**: An implementation that adds cross-process capabilities using SQLite

### Core EventBus Interface

The `EventBus` class provides the basic event bus functionality:

```python
from eval_protocol.event_bus import EventBus

event_bus = EventBus()

def handle_event(event_type: str, data):
print(f"Received {event_type}: {data}")

event_bus.subscribe(handle_event)
event_bus.emit("test_event", {"data": "value"})
```

### SqliteEventBus Implementation

The `SqliteEventBus` extends `EventBus` to add cross-process communication capabilities using the existing SQLite database infrastructure. Events are stored in the same database as evaluation rows, providing:

- **No additional dependencies** - Uses existing peewee/SQLite infrastructure
- **Reliable delivery** - Database transactions ensure event persistence
- **Automatic cleanup** - Old events are automatically cleaned up
- **Process isolation** - Each process has a unique ID to avoid processing its own events

### Database Schema

Events are stored in a new `Event` table with the following structure:

- `event_id`: Unique identifier for each event
- `event_type`: Type of event (e.g., "row_upserted")
- `data`: JSON data payload
- `timestamp`: When the event was created
- `process_id`: ID of the process that created the event
- `processed`: Whether the event has been processed by other processes

## Usage

### Basic Usage (In-Process)

```python
from eval_protocol.event_bus import EventBus

# Create a basic event bus for in-process communication
event_bus = EventBus()

# Subscribe to events
def handle_event(event_type: str, data):
print(f"Received {event_type}: {data}")

event_bus.subscribe(handle_event)

# Emit events
event_bus.emit("test_event", {"data": "value"})
```

### Cross-Process Usage

```python
from eval_protocol.event_bus import SqliteEventBus

# Create a cross-process event bus
event_bus = SqliteEventBus()

# Subscribe to events
def handle_event(event_type: str, data):
print(f"Received {event_type}: {data}")

event_bus.subscribe(handle_event)

# Start listening for cross-process events
event_bus.start_listening()

# Emit events (will be broadcast to other processes)
event_bus.emit("row_upserted", evaluation_row)
```

### Using the Global Event Bus

The global `event_bus` instance is a `SqliteEventBus` that provides cross-process functionality:

```python
from eval_protocol.event_bus import event_bus

# Subscribe to events
def handle_event(event_type: str, data):
print(f"Received {event_type}: {data}")

event_bus.subscribe(handle_event)

# Start listening for cross-process events
event_bus.start_listening()

# Emit events
event_bus.emit("row_upserted", evaluation_row)
```

### In Evaluation Tests

The event bus is automatically used by the dataset logger. When you log evaluation rows, they are automatically broadcast to all listening processes:

```python
from eval_protocol.dataset_logger import default_logger

# This will automatically emit a "row_upserted" event
default_logger.log(evaluation_row)
```

### In Logs Server

The logs server automatically starts listening for cross-process events and broadcasts them to connected WebSocket clients:

```python
from eval_protocol.utils.logs_server import serve_logs

# This will start the server and listen for cross-process events
serve_logs()
```

## Configuration

### EventBus Configuration

The basic `EventBus` requires no configuration - it works entirely in-memory.

### SqliteEventBus Configuration

The `SqliteEventBus` automatically uses the same SQLite database as the evaluation row store, so no additional configuration is required. The database is located at:

- Default: `~/.eval_protocol/logs.db`
- Custom: Can be specified when creating the event bus

#### Custom Database Path

```python
from eval_protocol.event_bus import SqliteEventBus

# Use a custom database path
event_bus = SqliteEventBus(db_path="/path/to/custom.db")
```

## Performance Considerations

### EventBus Performance

- **In-memory**: Events are processed immediately with no latency
- **Memory usage**: Events are not persisted, so memory usage is minimal
- **Scalability**: Suitable for high-frequency events within a single process

### SqliteEventBus Performance

- **Database-based**: Events are stored in SQLite with proper indexing
- **Polling frequency**: Events are checked every 100ms by default
- **Memory usage**: Events are automatically cleaned up after 24 hours
- **Latency**: ~100ms latency due to polling interval
- **Scalability**: Suitable for moderate event volumes (< 1000 events/second)

## Event Types

The following event types are currently supported:

- `row_upserted`: Emitted when an evaluation row is logged
- `log`: Legacy event type (handled the same as `row_upserted`)

## Testing

You can test the cross-process event bus using the provided example:

1. Start the logs server in one terminal:
```bash
python examples/cross_process_events_example.py server
```

2. Run the evaluation in another terminal:
```bash
python examples/cross_process_events_example.py eval
```

## Troubleshooting

### Events Not Received

1. Check that the event bus is started listening: `event_bus.start_listening()`
2. Verify the database is accessible and writable
3. Check for database lock issues (multiple processes accessing the same database)
4. Ensure both processes are using the same database path

### Database Lock Issues

SQLite has limitations with concurrent access. If you experience database locks:

1. Ensure processes are not writing to the database simultaneously
2. Consider using a different database backend for high-concurrency scenarios
3. The event bus automatically handles some concurrency issues

### High Database Size

The system automatically cleans up old processed events after 24 hours. If you're seeing high database size:

1. Check the database file size: `~/.eval_protocol/logs.db`
2. Manually clean up old events if needed
3. Adjust the cleanup interval in the code if necessary

### Performance Issues

If you're experiencing performance issues:

1. Check the polling interval (currently 100ms)
2. Monitor database size and cleanup frequency
3. Consider reducing the number of events emitted
4. Profile the database queries for bottlenecks
4 changes: 2 additions & 2 deletions eval_protocol/dataset_logger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from eval_protocol.dataset_logger.local_fs_dataset_logger_adapter import LocalFSDatasetLoggerAdapter
from eval_protocol.dataset_logger.sqlite_dataset_logger_adapter import SqliteDatasetLoggerAdapter

default_logger = LocalFSDatasetLoggerAdapter()
default_logger = SqliteDatasetLoggerAdapter()
2 changes: 2 additions & 0 deletions eval_protocol/dataset_logger/dataset_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
if TYPE_CHECKING:
from eval_protocol.models import EvaluationRow

LOG_EVENT_TYPE = "log"


class DatasetLogger(ABC):
"""
Expand Down
39 changes: 39 additions & 0 deletions eval_protocol/dataset_logger/sqlite_dataset_logger_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from typing import List, Optional

from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore
from eval_protocol.directory_utils import find_eval_protocol_dir
from eval_protocol.event_bus import event_bus
from eval_protocol.event_bus.logger import logger
from eval_protocol.models import EvaluationRow


class SqliteDatasetLoggerAdapter(DatasetLogger):
def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluationRowStore] = None):
eval_protocol_dir = find_eval_protocol_dir()
if db_path is not None and store is not None:
raise ValueError("Provide only one of db_path or store, not both.")
if store is not None:
self.db_path = store.db_path
self._store = store
else:
self.db_path = db_path if db_path is not None else os.path.join(eval_protocol_dir, "logs.db")
self._store = SqliteEvaluationRowStore(self.db_path)

def log(self, row: "EvaluationRow") -> None:
row_id = row.input_metadata.row_id
data = row.model_dump(exclude_none=True, mode="json")
self._store.upsert_row(row_id=row_id, data=data)
try:
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
except Exception as e:
# Avoid breaking storage due to event emission issues
logger.error(f"Failed to emit row_upserted event: {e}")
pass

def read(self, row_id: Optional[str] = None) -> List["EvaluationRow"]:
from eval_protocol.models import EvaluationRow

results = self._store.read_rows(row_id=row_id)
return [EvaluationRow(**data) for data in results]
57 changes: 57 additions & 0 deletions eval_protocol/dataset_logger/sqlite_evaluation_row_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
from typing import List, Optional

from peewee import CharField, Model, SqliteDatabase
from playhouse.sqlite_ext import JSONField

from eval_protocol.models import EvaluationRow


class SqliteEvaluationRowStore:
"""
Lightweight reusable SQLite store for evaluation rows.

Stores arbitrary row data as JSON keyed by a unique string `row_id`.
"""

def __init__(self, db_path: str):
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._db_path = db_path
self._db = SqliteDatabase(self._db_path)

class BaseModel(Model):
class Meta:
database = self._db

class EvaluationRow(BaseModel): # type: ignore
row_id = CharField(unique=True)
data = JSONField()

self._EvaluationRow = EvaluationRow

self._db.connect()
self._db.create_tables([EvaluationRow])

@property
def db_path(self) -> str:
return self._db_path

def upsert_row(self, row_id: str, data: dict) -> None:
if self._EvaluationRow.select().where(self._EvaluationRow.row_id == row_id).exists():
self._EvaluationRow.update(data=data).where(self._EvaluationRow.row_id == row_id).execute()
else:
self._EvaluationRow.create(row_id=row_id, data=data)

def read_rows(self, row_id: Optional[str] = None) -> List[dict]:
if row_id is None:
query = self._EvaluationRow.select().dicts()
else:
query = self._EvaluationRow.select().dicts().where(self._EvaluationRow.row_id == row_id)
results = list(query)
return [result["data"] for result in results]

def delete_row(self, row_id: str) -> int:
return self._EvaluationRow.delete().where(self._EvaluationRow.row_id == row_id).execute()

def delete_all_rows(self) -> int:
return self._EvaluationRow.delete().execute()
5 changes: 5 additions & 0 deletions eval_protocol/event_bus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Global event bus instance - uses SqliteEventBus for cross-process functionality
from eval_protocol.event_bus.event_bus import EventBus
from eval_protocol.event_bus.sqlite_event_bus import SqliteEventBus

event_bus: EventBus = SqliteEventBus()
50 changes: 50 additions & 0 deletions eval_protocol/event_bus/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Any, Callable, List

from eval_protocol.event_bus.logger import logger


class EventBus:
"""Core event bus interface for decoupling components in the evaluation system."""

def __init__(self):
self._listeners: List[Callable[[str, Any], None]] = []

def subscribe(self, callback: Callable[[str, Any], None]) -> None:
"""Subscribe to events.

Args:
callback: Function that takes (event_type, data) parameters
"""
self._listeners.append(callback)

def unsubscribe(self, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe from events.

Args:
callback: The callback function to remove
"""
try:
self._listeners.remove(callback)
except ValueError:
pass # Callback wasn't subscribed

def emit(self, event_type: str, data: Any) -> None:
"""Emit an event to all subscribers.

Args:
event_type: Type of event (e.g., "row_upserted")
data: Event data
"""
for listener in self._listeners:
try:
listener(event_type, data)
except Exception as e:
logger.debug(f"Event listener failed for {event_type}: {e}")

def start_listening(self) -> None:
"""Start listening for cross-process events. Override in subclasses."""
pass

def stop_listening(self) -> None:
"""Stop listening for cross-process events. Override in subclasses."""
pass
Loading
Loading