Skip to content

Commit 58eba6e

Browse files
committed
merge
2 parents 073d99a + 1a1046e commit 58eba6e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1952
-615
lines changed

docs/cross_process_events.md

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# Event Bus System
2+
3+
The eval protocol includes a flexible event bus system that supports both in-process and cross-process event communication. This is particularly useful for scenarios where you have:
4+
5+
- An evaluation test running in one process
6+
- A logs server running in another process
7+
- Real-time updates between processes
8+
9+
## Architecture
10+
11+
The event bus system consists of:
12+
13+
1. **EventBus**: The core interface for event communication
14+
2. **SqliteEventBus**: An implementation that adds cross-process capabilities using SQLite
15+
16+
### Core EventBus Interface
17+
18+
The `EventBus` class provides the basic event bus functionality:
19+
20+
```python
21+
from eval_protocol.event_bus import EventBus
22+
23+
event_bus = EventBus()
24+
25+
def handle_event(event_type: str, data):
26+
print(f"Received {event_type}: {data}")
27+
28+
event_bus.subscribe(handle_event)
29+
event_bus.emit("test_event", {"data": "value"})
30+
```
31+
32+
### SqliteEventBus Implementation
33+
34+
The `SqliteEventBus` extends `EventBus` to add cross-process communication capabilities using the existing SQLite database infrastructure. Events are stored in the same database as evaluation rows, providing:
35+
36+
- **No additional dependencies** - Uses existing peewee/SQLite infrastructure
37+
- **Reliable delivery** - Database transactions ensure event persistence
38+
- **Automatic cleanup** - Old events are automatically cleaned up
39+
- **Process isolation** - Each process has a unique ID to avoid processing its own events
40+
41+
### Database Schema
42+
43+
Events are stored in a new `Event` table with the following structure:
44+
45+
- `event_id`: Unique identifier for each event
46+
- `event_type`: Type of event (e.g., "row_upserted")
47+
- `data`: JSON data payload
48+
- `timestamp`: When the event was created
49+
- `process_id`: ID of the process that created the event
50+
- `processed`: Whether the event has been processed by other processes
51+
52+
## Usage
53+
54+
### Basic Usage (In-Process)
55+
56+
```python
57+
from eval_protocol.event_bus import EventBus
58+
59+
# Create a basic event bus for in-process communication
60+
event_bus = EventBus()
61+
62+
# Subscribe to events
63+
def handle_event(event_type: str, data):
64+
print(f"Received {event_type}: {data}")
65+
66+
event_bus.subscribe(handle_event)
67+
68+
# Emit events
69+
event_bus.emit("test_event", {"data": "value"})
70+
```
71+
72+
### Cross-Process Usage
73+
74+
```python
75+
from eval_protocol.event_bus import SqliteEventBus
76+
77+
# Create a cross-process event bus
78+
event_bus = SqliteEventBus()
79+
80+
# Subscribe to events
81+
def handle_event(event_type: str, data):
82+
print(f"Received {event_type}: {data}")
83+
84+
event_bus.subscribe(handle_event)
85+
86+
# Start listening for cross-process events
87+
event_bus.start_listening()
88+
89+
# Emit events (will be broadcast to other processes)
90+
event_bus.emit("row_upserted", evaluation_row)
91+
```
92+
93+
### Using the Global Event Bus
94+
95+
The global `event_bus` instance is a `SqliteEventBus` that provides cross-process functionality:
96+
97+
```python
98+
from eval_protocol.event_bus import event_bus
99+
100+
# Subscribe to events
101+
def handle_event(event_type: str, data):
102+
print(f"Received {event_type}: {data}")
103+
104+
event_bus.subscribe(handle_event)
105+
106+
# Start listening for cross-process events
107+
event_bus.start_listening()
108+
109+
# Emit events
110+
event_bus.emit("row_upserted", evaluation_row)
111+
```
112+
113+
### In Evaluation Tests
114+
115+
The event bus is automatically used by the dataset logger. When you log evaluation rows, they are automatically broadcast to all listening processes:
116+
117+
```python
118+
from eval_protocol.dataset_logger import default_logger
119+
120+
# This will automatically emit a "row_upserted" event
121+
default_logger.log(evaluation_row)
122+
```
123+
124+
### In Logs Server
125+
126+
The logs server automatically starts listening for cross-process events and broadcasts them to connected WebSocket clients:
127+
128+
```python
129+
from eval_protocol.utils.logs_server import serve_logs
130+
131+
# This will start the server and listen for cross-process events
132+
serve_logs()
133+
```
134+
135+
## Configuration
136+
137+
### EventBus Configuration
138+
139+
The basic `EventBus` requires no configuration - it works entirely in-memory.
140+
141+
### SqliteEventBus Configuration
142+
143+
The `SqliteEventBus` automatically uses the same SQLite database as the evaluation row store, so no additional configuration is required. The database is located at:
144+
145+
- Default: `~/.eval_protocol/logs.db`
146+
- Custom: Can be specified when creating the event bus
147+
148+
#### Custom Database Path
149+
150+
```python
151+
from eval_protocol.event_bus import SqliteEventBus
152+
153+
# Use a custom database path
154+
event_bus = SqliteEventBus(db_path="/path/to/custom.db")
155+
```
156+
157+
## Performance Considerations
158+
159+
### EventBus Performance
160+
161+
- **In-memory**: Events are processed immediately with no latency
162+
- **Memory usage**: Events are not persisted, so memory usage is minimal
163+
- **Scalability**: Suitable for high-frequency events within a single process
164+
165+
### SqliteEventBus Performance
166+
167+
- **Database-based**: Events are stored in SQLite with proper indexing
168+
- **Polling frequency**: Events are checked every 100ms by default
169+
- **Memory usage**: Events are automatically cleaned up after 24 hours
170+
- **Latency**: ~100ms latency due to polling interval
171+
- **Scalability**: Suitable for moderate event volumes (< 1000 events/second)
172+
173+
## Event Types
174+
175+
The following event types are currently supported:
176+
177+
- `row_upserted`: Emitted when an evaluation row is logged
178+
- `log`: Legacy event type (handled the same as `row_upserted`)
179+
180+
## Testing
181+
182+
You can test the cross-process event bus using the provided example:
183+
184+
1. Start the logs server in one terminal:
185+
```bash
186+
python examples/cross_process_events_example.py server
187+
```
188+
189+
2. Run the evaluation in another terminal:
190+
```bash
191+
python examples/cross_process_events_example.py eval
192+
```
193+
194+
## Troubleshooting
195+
196+
### Events Not Received
197+
198+
1. Check that the event bus is started listening: `event_bus.start_listening()`
199+
2. Verify the database is accessible and writable
200+
3. Check for database lock issues (multiple processes accessing the same database)
201+
4. Ensure both processes are using the same database path
202+
203+
### Database Lock Issues
204+
205+
SQLite has limitations with concurrent access. If you experience database locks:
206+
207+
1. Ensure processes are not writing to the database simultaneously
208+
2. Consider using a different database backend for high-concurrency scenarios
209+
3. The event bus automatically handles some concurrency issues
210+
211+
### High Database Size
212+
213+
The system automatically cleans up old processed events after 24 hours. If you're seeing high database size:
214+
215+
1. Check the database file size: `~/.eval_protocol/logs.db`
216+
2. Manually clean up old events if needed
217+
3. Adjust the cleanup interval in the code if necessary
218+
219+
### Performance Issues
220+
221+
If you're experiencing performance issues:
222+
223+
1. Check the polling interval (currently 100ms)
224+
2. Monitor database size and cleanup frequency
225+
3. Consider reducing the number of events emitted
226+
4. Profile the database queries for bottlenecks

eval_protocol/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@
1010

1111
import warnings
1212

13-
from .adapters.braintrust import reward_fn_to_scorer, scorer_to_reward_fn
13+
from eval_protocol.adapters.braintrust import reward_fn_to_scorer, scorer_to_reward_fn
14+
1415
from .auth import get_fireworks_account_id, get_fireworks_api_key
1516
from .common_utils import load_jsonl
1617
from .config import RewardKitConfig, get_config, load_config
1718
from .mcp_env import (
1819
AnthropicPolicy,
19-
OpenAIPolicy,
20-
LiteLLMPolicy,
2120
FireworksPolicy,
21+
LiteLLMPolicy,
22+
OpenAIPolicy,
2223
make,
2324
rollout,
2425
test_mcp,

eval_protocol/common_utils.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import json
2-
import logging
2+
import re
33
from typing import Any, Dict, List
44

5-
logger = logging.getLogger(__name__)
6-
75

86
def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
97
"""
@@ -14,23 +12,19 @@ def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
1412
1513
Returns:
1614
A list of dictionaries, where each dictionary is a parsed JSON object from a line.
17-
Returns an empty list if the file is not found or if errors occur during parsing,
18-
with errors logged.
15+
Returns an empty list if the file is not found or if errors occur during parsing.
1916
"""
2017
data: List[Dict[str, Any]] = []
21-
try:
22-
with open(file_path, "r", encoding="utf-8") as f:
23-
for i, line in enumerate(f):
24-
try:
25-
data.append(json.loads(line.strip()))
26-
except json.JSONDecodeError as e:
27-
logger.error(f"Error decoding JSON on line {i+1} in {file_path}: {e} - Line: '{line.strip()}'")
28-
# Optionally, re-raise, or return partial data, or handle as per desired strictness
29-
# For now, we'll log and continue, returning successfully parsed lines.
30-
except FileNotFoundError:
31-
logger.error(f"File not found: {file_path}")
32-
return []
33-
except Exception as e:
34-
logger.error(f"An unexpected error occurred while reading {file_path}: {e}")
35-
return []
18+
with open(file_path, "r", encoding="utf-8") as f:
19+
for line_number, line in enumerate(f):
20+
try:
21+
data.append(json.loads(line.strip()))
22+
except json.JSONDecodeError as e:
23+
print(f"Error parsing JSON line for file {file_path} at line {line_number}")
24+
# attempt to find "row_id" in the line by finding index of "row_id" and performing regex of `"row_id": (.*),`
25+
row_id_index = line.find("row_id")
26+
if row_id_index != -1:
27+
row_id = re.search(r'"row_id": (.*),', line[row_id_index:])
28+
raise ValueError(f"{e.msg} at line {line_number}: {line} ({row_id})")
29+
raise e
3630
return data
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from eval_protocol.dataset_logger.local_fs_dataset_logger_adapter import LocalFSDatasetLoggerAdapter
1+
from eval_protocol.dataset_logger.sqlite_dataset_logger_adapter import SqliteDatasetLoggerAdapter
22

3-
default_logger = LocalFSDatasetLoggerAdapter()
3+
default_logger = SqliteDatasetLoggerAdapter()

eval_protocol/dataset_logger/dataset_logger.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
if TYPE_CHECKING:
55
from eval_protocol.models import EvaluationRow
66

7+
LOG_EVENT_TYPE = "log"
8+
79

810
class DatasetLogger(ABC):
911
"""

0 commit comments

Comments
 (0)