Skip to content

Commit dcbbfad

Browse files
committed
Merge branch 'implement_aime_gpqa_health' into derekx/pipelining
2 parents 3cf966a + 0ac1fce commit dcbbfad

File tree

118 files changed

+15325
-1253
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+15325
-1253
lines changed

docs/cross_process_events.md

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# Event Bus System
2+
3+
The eval protocol includes a flexible event bus system that supports both in-process and cross-process event communication. This is particularly useful for scenarios where you have:
4+
5+
- An evaluation test running in one process
6+
- A logs server running in another process
7+
- Real-time updates between processes
8+
9+
## Architecture
10+
11+
The event bus system consists of:
12+
13+
1. **EventBus**: The core interface for event communication
14+
2. **SqliteEventBus**: An implementation that adds cross-process capabilities using SQLite
15+
16+
### Core EventBus Interface
17+
18+
The `EventBus` class provides the basic event bus functionality:
19+
20+
```python
21+
from eval_protocol.event_bus import EventBus
22+
23+
event_bus = EventBus()
24+
25+
def handle_event(event_type: str, data):
26+
print(f"Received {event_type}: {data}")
27+
28+
event_bus.subscribe(handle_event)
29+
event_bus.emit("test_event", {"data": "value"})
30+
```
31+
32+
### SqliteEventBus Implementation
33+
34+
The `SqliteEventBus` extends `EventBus` to add cross-process communication capabilities using the existing SQLite database infrastructure. Events are stored in the same database as evaluation rows, providing:
35+
36+
- **No additional dependencies** - Uses existing peewee/SQLite infrastructure
37+
- **Reliable delivery** - Database transactions ensure event persistence
38+
- **Automatic cleanup** - Old events are automatically cleaned up
39+
- **Process isolation** - Each process has a unique ID to avoid processing its own events
40+
41+
### Database Schema
42+
43+
Events are stored in a new `Event` table with the following structure:
44+
45+
- `event_id`: Unique identifier for each event
46+
- `event_type`: Type of event (e.g., "row_upserted")
47+
- `data`: JSON data payload
48+
- `timestamp`: When the event was created
49+
- `process_id`: ID of the process that created the event
50+
- `processed`: Whether the event has been processed by other processes
51+
52+
## Usage
53+
54+
### Basic Usage (In-Process)
55+
56+
```python
57+
from eval_protocol.event_bus import EventBus
58+
59+
# Create a basic event bus for in-process communication
60+
event_bus = EventBus()
61+
62+
# Subscribe to events
63+
def handle_event(event_type: str, data):
64+
print(f"Received {event_type}: {data}")
65+
66+
event_bus.subscribe(handle_event)
67+
68+
# Emit events
69+
event_bus.emit("test_event", {"data": "value"})
70+
```
71+
72+
### Cross-Process Usage
73+
74+
```python
75+
from eval_protocol.event_bus import SqliteEventBus
76+
77+
# Create a cross-process event bus
78+
event_bus = SqliteEventBus()
79+
80+
# Subscribe to events
81+
def handle_event(event_type: str, data):
82+
print(f"Received {event_type}: {data}")
83+
84+
event_bus.subscribe(handle_event)
85+
86+
# Start listening for cross-process events
87+
event_bus.start_listening()
88+
89+
# Emit events (will be broadcast to other processes)
90+
event_bus.emit("row_upserted", evaluation_row)
91+
```
92+
93+
### Using the Global Event Bus
94+
95+
The global `event_bus` instance is a `SqliteEventBus` that provides cross-process functionality:
96+
97+
```python
98+
from eval_protocol.event_bus import event_bus
99+
100+
# Subscribe to events
101+
def handle_event(event_type: str, data):
102+
print(f"Received {event_type}: {data}")
103+
104+
event_bus.subscribe(handle_event)
105+
106+
# Start listening for cross-process events
107+
event_bus.start_listening()
108+
109+
# Emit events
110+
event_bus.emit("row_upserted", evaluation_row)
111+
```
112+
113+
### In Evaluation Tests
114+
115+
The event bus is automatically used by the dataset logger. When you log evaluation rows, they are automatically broadcast to all listening processes:
116+
117+
```python
118+
from eval_protocol.dataset_logger import default_logger
119+
120+
# This will automatically emit a "row_upserted" event
121+
default_logger.log(evaluation_row)
122+
```
123+
124+
### In Logs Server
125+
126+
The logs server automatically starts listening for cross-process events and broadcasts them to connected WebSocket clients:
127+
128+
```python
129+
from eval_protocol.utils.logs_server import serve_logs
130+
131+
# This will start the server and listen for cross-process events
132+
serve_logs()
133+
```
134+
135+
## Configuration
136+
137+
### EventBus Configuration
138+
139+
The basic `EventBus` requires no configuration - it works entirely in-memory.
140+
141+
### SqliteEventBus Configuration
142+
143+
The `SqliteEventBus` automatically uses the same SQLite database as the evaluation row store, so no additional configuration is required. The database is located at:
144+
145+
- Default: `~/.eval_protocol/logs.db`
146+
- Custom: Can be specified when creating the event bus
147+
148+
#### Custom Database Path
149+
150+
```python
151+
from eval_protocol.event_bus import SqliteEventBus
152+
153+
# Use a custom database path
154+
event_bus = SqliteEventBus(db_path="/path/to/custom.db")
155+
```
156+
157+
## Performance Considerations
158+
159+
### EventBus Performance
160+
161+
- **In-memory**: Events are processed immediately with no latency
162+
- **Memory usage**: Events are not persisted, so memory usage is minimal
163+
- **Scalability**: Suitable for high-frequency events within a single process
164+
165+
### SqliteEventBus Performance
166+
167+
- **Database-based**: Events are stored in SQLite with proper indexing
168+
- **Polling frequency**: Events are checked every 100ms by default
169+
- **Memory usage**: Events are automatically cleaned up after 24 hours
170+
- **Latency**: ~100ms latency due to polling interval
171+
- **Scalability**: Suitable for moderate event volumes (< 1000 events/second)
172+
173+
## Event Types
174+
175+
The following event types are currently supported:
176+
177+
- `row_upserted`: Emitted when an evaluation row is logged
178+
- `log`: Legacy event type (handled the same as `row_upserted`)
179+
180+
## Testing
181+
182+
You can test the cross-process event bus using the provided example:
183+
184+
1. Start the logs server in one terminal:
185+
```bash
186+
python examples/cross_process_events_example.py server
187+
```
188+
189+
2. Run the evaluation in another terminal:
190+
```bash
191+
python examples/cross_process_events_example.py eval
192+
```
193+
194+
## Troubleshooting
195+
196+
### Events Not Received
197+
198+
1. Check that the event bus is started listening: `event_bus.start_listening()`
199+
2. Verify the database is accessible and writable
200+
3. Check for database lock issues (multiple processes accessing the same database)
201+
4. Ensure both processes are using the same database path
202+
203+
### Database Lock Issues
204+
205+
SQLite has limitations with concurrent access. If you experience database locks:
206+
207+
1. Ensure processes are not writing to the database simultaneously
208+
2. Consider using a different database backend for high-concurrency scenarios
209+
3. The event bus automatically handles some concurrency issues
210+
211+
### High Database Size
212+
213+
The system automatically cleans up old processed events after 24 hours. If you're seeing high database size:
214+
215+
1. Check the database file size: `~/.eval_protocol/logs.db`
216+
2. Manually clean up old events if needed
217+
3. Adjust the cleanup interval in the code if necessary
218+
219+
### Performance Issues
220+
221+
If you're experiencing performance issues:
222+
223+
1. Check the polling interval (currently 100ms)
224+
2. Monitor database size and cleanup frequency
225+
3. Consider reducing the number of events emitted
226+
4. Profile the database queries for bottlenecks

eval_protocol/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ def parse_args(args=None):
289289

290290
# Logs command
291291
logs_parser = subparsers.add_parser("logs", help="Serve logs with file watching and real-time updates")
292+
logs_parser.add_argument("--port", type=int, default=8000, help="Port to bind to (default: 8000)")
292293

293294
# Run command (for Hydra-based evaluations)
294295
# This subparser intentionally defines no arguments itself.

eval_protocol/cli_commands/logs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@
1111
def logs_command(args):
1212
"""Serve logs with file watching and real-time updates"""
1313

14+
port = args.port
1415
print(f"🚀 Starting Eval Protocol Logs Server")
15-
print(f"🌐 URL: http://localhost:8000")
16-
print(f"🔌 WebSocket: ws://localhost:8000/ws")
16+
print(f"🌐 URL: http://localhost:{port}")
17+
print(f"🔌 WebSocket: ws://localhost:{port}/ws")
1718
print(f"👀 Watching paths: {['current directory']}")
1819
print("Press Ctrl+C to stop the server")
1920
print("-" * 50)
2021

2122
try:
22-
serve_logs()
23+
serve_logs(port=args.port)
2324
return 0
2425
except KeyboardInterrupt:
2526
print("\n🛑 Server stopped by user")

eval_protocol/common_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
1414
1515
Returns:
1616
A list of dictionaries, where each dictionary is a parsed JSON object from a line.
17-
Returns an empty list if the file is not found or if errors occur during parsing.
17+
Returns an empty list if the file is not found or if errors occur during parsing. Supports HTTP urls and local file paths.
1818
"""
1919
data: List[Dict[str, Any]] = []
2020
if file_path.startswith("http://") or file_path.startswith("https://"):
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from eval_protocol.dataset_logger.local_fs_dataset_logger_adapter import LocalFSDatasetLoggerAdapter
1+
from eval_protocol.dataset_logger.sqlite_dataset_logger_adapter import SqliteDatasetLoggerAdapter
22

3-
default_logger = LocalFSDatasetLoggerAdapter()
3+
default_logger = SqliteDatasetLoggerAdapter()

eval_protocol/dataset_logger/dataset_logger.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
if TYPE_CHECKING:
55
from eval_protocol.models import EvaluationRow
66

7+
LOG_EVENT_TYPE = "log"
8+
79

810
class DatasetLogger(ABC):
911
"""
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import os
2+
from typing import List, Optional
3+
4+
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
5+
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore
6+
from eval_protocol.directory_utils import find_eval_protocol_dir
7+
from eval_protocol.event_bus import event_bus
8+
from eval_protocol.event_bus.logger import logger
9+
from eval_protocol.models import EvaluationRow
10+
11+
12+
class SqliteDatasetLoggerAdapter(DatasetLogger):
13+
def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluationRowStore] = None):
14+
eval_protocol_dir = find_eval_protocol_dir()
15+
if db_path is not None and store is not None:
16+
raise ValueError("Provide only one of db_path or store, not both.")
17+
if store is not None:
18+
self.db_path = store.db_path
19+
self._store = store
20+
else:
21+
self.db_path = db_path if db_path is not None else os.path.join(eval_protocol_dir, "logs.db")
22+
self._store = SqliteEvaluationRowStore(self.db_path)
23+
24+
def log(self, row: "EvaluationRow") -> None:
25+
data = row.model_dump(exclude_none=True, mode="json")
26+
self._store.upsert_row(data=data)
27+
try:
28+
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
29+
except Exception as e:
30+
# Avoid breaking storage due to event emission issues
31+
logger.error(f"Failed to emit row_upserted event: {e}")
32+
pass
33+
34+
def read(self, rollout_id: Optional[str] = None) -> List["EvaluationRow"]:
35+
from eval_protocol.models import EvaluationRow
36+
37+
results = self._store.read_rows(rollout_id=rollout_id)
38+
return [EvaluationRow(**data) for data in results]
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import os
2+
from typing import List, Optional
3+
4+
from peewee import CharField, Model, SqliteDatabase
5+
from playhouse.sqlite_ext import JSONField
6+
7+
from eval_protocol.models import EvaluationRow
8+
9+
10+
class SqliteEvaluationRowStore:
11+
"""
12+
Lightweight reusable SQLite store for evaluation rows.
13+
14+
Stores arbitrary row data as JSON keyed by a unique string `rollout_id`.
15+
"""
16+
17+
def __init__(self, db_path: str):
18+
os.makedirs(os.path.dirname(db_path), exist_ok=True)
19+
self._db_path = db_path
20+
self._db = SqliteDatabase(self._db_path)
21+
22+
class BaseModel(Model):
23+
class Meta:
24+
database = self._db
25+
26+
class EvaluationRow(BaseModel): # type: ignore
27+
rollout_id = CharField(unique=True)
28+
data = JSONField()
29+
30+
self._EvaluationRow = EvaluationRow
31+
32+
self._db.connect()
33+
self._db.create_tables([EvaluationRow])
34+
35+
@property
36+
def db_path(self) -> str:
37+
return self._db_path
38+
39+
def upsert_row(self, data: dict) -> None:
40+
rollout_id = data["execution_metadata"]["rollout_id"]
41+
if rollout_id is None:
42+
raise ValueError("execution_metadata.rollout_id is required to upsert a row")
43+
if self._EvaluationRow.select().where(self._EvaluationRow.rollout_id == rollout_id).exists():
44+
self._EvaluationRow.update(data=data).where(self._EvaluationRow.rollout_id == rollout_id).execute()
45+
else:
46+
self._EvaluationRow.create(rollout_id=rollout_id, data=data)
47+
48+
def read_rows(self, rollout_id: Optional[str] = None) -> List[dict]:
49+
if rollout_id is None:
50+
query = self._EvaluationRow.select().dicts()
51+
else:
52+
query = self._EvaluationRow.select().dicts().where(self._EvaluationRow.rollout_id == rollout_id)
53+
results = list(query)
54+
return [result["data"] for result in results]
55+
56+
def delete_row(self, rollout_id: str) -> int:
57+
return self._EvaluationRow.delete().where(self._EvaluationRow.rollout_id == rollout_id).execute()
58+
59+
def delete_all_rows(self) -> int:
60+
return self._EvaluationRow.delete().execute()
File renamed without changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Global event bus instance - uses SqliteEventBus for cross-process functionality
2+
from eval_protocol.event_bus.event_bus import EventBus
3+
from eval_protocol.event_bus.sqlite_event_bus import SqliteEventBus
4+
5+
event_bus: EventBus = SqliteEventBus()

0 commit comments

Comments
 (0)