Skip to content

Commit 69ba4ac

Browse files
author
Dylan Huang
authored
sqlite_dataset_logger_adapter.py (#38)
* Add logging of process ID in evaluation tests - Enhanced the `evaluation_test` function to log the process ID using `default_logger`, improving traceability of evaluation processes during testing. * init * add peewee * works * TODO: set default logger to sqlite * ensure update/create a log works * fix * Add EventBus for event handling and integrate with SqliteEvaluationRowStore - Introduced a new EventBus class to facilitate decoupling of components in the evaluation system. - Integrated event emission for "row_upserted" in SqliteEvaluationRowStore to notify subscribers of new data. - Updated SqliteDatasetLoggerAdapter to allow initialization with either a database path or a store instance, ensuring flexibility in logging setup. - Enhanced test cases to validate logging functionality and event handling with multiple evaluation rows. * TODO: implement event_bus across processes * save * Refactor serialization in SqliteEventBusDatabase and update tests for event handling - Updated the serialization of event data to exclude None values when using Pydantic models. - Modified tests to ensure event data is deserialized correctly into EvaluationRow and to clarify the behavior of event reception in local processes. * Introduce LOG_EVENT_TYPE constant and update event emission in SqliteDatasetLoggerAdapter - Added a constant LOG_EVENT_TYPE for event type consistency. - Updated event emission in SqliteDatasetLoggerAdapter to use LOG_EVENT_TYPE instead of hardcoded string. - Enhanced error handling during event emission with logging for failures. * Enhance WebSocket message handling for logs - Added support for "log" message type in WebSocket handling, allowing individual log entries to be processed. - Updated logging statements to provide clearer context for received logs. - Introduced LogMessageSchema for runtime validation of log messages. * works with broadcast queue * vite build * watcher to set status to stopped for certain tests
1 parent 986452f commit 69ba4ac

23 files changed

+1171
-235
lines changed

docs/cross_process_events.md

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# Event Bus System
2+
3+
The eval protocol includes a flexible event bus system that supports both in-process and cross-process event communication. This is particularly useful for scenarios where you have:
4+
5+
- An evaluation test running in one process
6+
- A logs server running in another process
7+
- Real-time updates between processes
8+
9+
## Architecture
10+
11+
The event bus system consists of:
12+
13+
1. **EventBus**: The core interface for event communication
14+
2. **SqliteEventBus**: An implementation that adds cross-process capabilities using SQLite
15+
16+
### Core EventBus Interface
17+
18+
The `EventBus` class provides the basic event bus functionality:
19+
20+
```python
21+
from eval_protocol.event_bus import EventBus
22+
23+
event_bus = EventBus()
24+
25+
def handle_event(event_type: str, data):
26+
print(f"Received {event_type}: {data}")
27+
28+
event_bus.subscribe(handle_event)
29+
event_bus.emit("test_event", {"data": "value"})
30+
```
31+
32+
### SqliteEventBus Implementation
33+
34+
The `SqliteEventBus` extends `EventBus` to add cross-process communication capabilities using the existing SQLite database infrastructure. Events are stored in the same database as evaluation rows, providing:
35+
36+
- **No additional dependencies** - Uses existing peewee/SQLite infrastructure
37+
- **Reliable delivery** - Database transactions ensure event persistence
38+
- **Automatic cleanup** - Old events are automatically cleaned up
39+
- **Process isolation** - Each process has a unique ID to avoid processing its own events
40+
41+
### Database Schema
42+
43+
Events are stored in a new `Event` table with the following structure:
44+
45+
- `event_id`: Unique identifier for each event
46+
- `event_type`: Type of event (e.g., "row_upserted")
47+
- `data`: JSON data payload
48+
- `timestamp`: When the event was created
49+
- `process_id`: ID of the process that created the event
50+
- `processed`: Whether the event has been processed by other processes
51+
52+
## Usage
53+
54+
### Basic Usage (In-Process)
55+
56+
```python
57+
from eval_protocol.event_bus import EventBus
58+
59+
# Create a basic event bus for in-process communication
60+
event_bus = EventBus()
61+
62+
# Subscribe to events
63+
def handle_event(event_type: str, data):
64+
print(f"Received {event_type}: {data}")
65+
66+
event_bus.subscribe(handle_event)
67+
68+
# Emit events
69+
event_bus.emit("test_event", {"data": "value"})
70+
```
71+
72+
### Cross-Process Usage
73+
74+
```python
75+
from eval_protocol.event_bus import SqliteEventBus
76+
77+
# Create a cross-process event bus
78+
event_bus = SqliteEventBus()
79+
80+
# Subscribe to events
81+
def handle_event(event_type: str, data):
82+
print(f"Received {event_type}: {data}")
83+
84+
event_bus.subscribe(handle_event)
85+
86+
# Start listening for cross-process events
87+
event_bus.start_listening()
88+
89+
# Emit events (will be broadcast to other processes)
90+
event_bus.emit("row_upserted", evaluation_row)
91+
```
92+
93+
### Using the Global Event Bus
94+
95+
The global `event_bus` instance is a `SqliteEventBus` that provides cross-process functionality:
96+
97+
```python
98+
from eval_protocol.event_bus import event_bus
99+
100+
# Subscribe to events
101+
def handle_event(event_type: str, data):
102+
print(f"Received {event_type}: {data}")
103+
104+
event_bus.subscribe(handle_event)
105+
106+
# Start listening for cross-process events
107+
event_bus.start_listening()
108+
109+
# Emit events
110+
event_bus.emit("row_upserted", evaluation_row)
111+
```
112+
113+
### In Evaluation Tests
114+
115+
The event bus is automatically used by the dataset logger. When you log evaluation rows, they are automatically broadcast to all listening processes:
116+
117+
```python
118+
from eval_protocol.dataset_logger import default_logger
119+
120+
# This will automatically emit a "row_upserted" event
121+
default_logger.log(evaluation_row)
122+
```
123+
124+
### In Logs Server
125+
126+
The logs server automatically starts listening for cross-process events and broadcasts them to connected WebSocket clients:
127+
128+
```python
129+
from eval_protocol.utils.logs_server import serve_logs
130+
131+
# This will start the server and listen for cross-process events
132+
serve_logs()
133+
```
134+
135+
## Configuration
136+
137+
### EventBus Configuration
138+
139+
The basic `EventBus` requires no configuration - it works entirely in-memory.
140+
141+
### SqliteEventBus Configuration
142+
143+
The `SqliteEventBus` automatically uses the same SQLite database as the evaluation row store, so no additional configuration is required. The database is located at:
144+
145+
- Default: `~/.eval_protocol/logs.db`
146+
- Custom: Can be specified when creating the event bus
147+
148+
#### Custom Database Path
149+
150+
```python
151+
from eval_protocol.event_bus import SqliteEventBus
152+
153+
# Use a custom database path
154+
event_bus = SqliteEventBus(db_path="/path/to/custom.db")
155+
```
156+
157+
## Performance Considerations
158+
159+
### EventBus Performance
160+
161+
- **In-memory**: Events are processed immediately with no latency
162+
- **Memory usage**: Events are not persisted, so memory usage is minimal
163+
- **Scalability**: Suitable for high-frequency events within a single process
164+
165+
### SqliteEventBus Performance
166+
167+
- **Database-based**: Events are stored in SQLite with proper indexing
168+
- **Polling frequency**: Events are checked every 100ms by default
169+
- **Memory usage**: Events are automatically cleaned up after 24 hours
170+
- **Latency**: ~100ms latency due to polling interval
171+
- **Scalability**: Suitable for moderate event volumes (< 1000 events/second)
172+
173+
## Event Types
174+
175+
The following event types are currently supported:
176+
177+
- `row_upserted`: Emitted when an evaluation row is logged
178+
- `log`: Legacy event type (handled the same as `row_upserted`)
179+
180+
## Testing
181+
182+
You can test the cross-process event bus using the provided example:
183+
184+
1. Start the logs server in one terminal:
185+
```bash
186+
python examples/cross_process_events_example.py server
187+
```
188+
189+
2. Run the evaluation in another terminal:
190+
```bash
191+
python examples/cross_process_events_example.py eval
192+
```
193+
194+
## Troubleshooting
195+
196+
### Events Not Received
197+
198+
1. Check that the event bus is started listening: `event_bus.start_listening()`
199+
2. Verify the database is accessible and writable
200+
3. Check for database lock issues (multiple processes accessing the same database)
201+
4. Ensure both processes are using the same database path
202+
203+
### Database Lock Issues
204+
205+
SQLite has limitations with concurrent access. If you experience database locks:
206+
207+
1. Ensure processes are not writing to the database simultaneously
208+
2. Consider using a different database backend for high-concurrency scenarios
209+
3. The event bus automatically handles some concurrency issues
210+
211+
### High Database Size
212+
213+
The system automatically cleans up old processed events after 24 hours. If you're seeing high database size:
214+
215+
1. Check the database file size: `~/.eval_protocol/logs.db`
216+
2. Manually clean up old events if needed
217+
3. Adjust the cleanup interval in the code if necessary
218+
219+
### Performance Issues
220+
221+
If you're experiencing performance issues:
222+
223+
1. Check the polling interval (currently 100ms)
224+
2. Monitor database size and cleanup frequency
225+
3. Consider reducing the number of events emitted
226+
4. Profile the database queries for bottlenecks
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from eval_protocol.dataset_logger.local_fs_dataset_logger_adapter import LocalFSDatasetLoggerAdapter
1+
from eval_protocol.dataset_logger.sqlite_dataset_logger_adapter import SqliteDatasetLoggerAdapter
22

3-
default_logger = LocalFSDatasetLoggerAdapter()
3+
default_logger = SqliteDatasetLoggerAdapter()

eval_protocol/dataset_logger/dataset_logger.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
if TYPE_CHECKING:
55
from eval_protocol.models import EvaluationRow
66

7+
LOG_EVENT_TYPE = "log"
8+
79

810
class DatasetLogger(ABC):
911
"""
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import os
2+
from typing import List, Optional
3+
4+
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
5+
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore
6+
from eval_protocol.directory_utils import find_eval_protocol_dir
7+
from eval_protocol.event_bus import event_bus
8+
from eval_protocol.event_bus.logger import logger
9+
from eval_protocol.models import EvaluationRow
10+
11+
12+
class SqliteDatasetLoggerAdapter(DatasetLogger):
13+
def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluationRowStore] = None):
14+
eval_protocol_dir = find_eval_protocol_dir()
15+
if db_path is not None and store is not None:
16+
raise ValueError("Provide only one of db_path or store, not both.")
17+
if store is not None:
18+
self.db_path = store.db_path
19+
self._store = store
20+
else:
21+
self.db_path = db_path if db_path is not None else os.path.join(eval_protocol_dir, "logs.db")
22+
self._store = SqliteEvaluationRowStore(self.db_path)
23+
24+
def log(self, row: "EvaluationRow") -> None:
25+
row_id = row.input_metadata.row_id
26+
data = row.model_dump(exclude_none=True, mode="json")
27+
self._store.upsert_row(row_id=row_id, data=data)
28+
try:
29+
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
30+
except Exception as e:
31+
# Avoid breaking storage due to event emission issues
32+
logger.error(f"Failed to emit row_upserted event: {e}")
33+
pass
34+
35+
def read(self, row_id: Optional[str] = None) -> List["EvaluationRow"]:
36+
from eval_protocol.models import EvaluationRow
37+
38+
results = self._store.read_rows(row_id=row_id)
39+
return [EvaluationRow(**data) for data in results]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import os
2+
from typing import List, Optional
3+
4+
from peewee import CharField, Model, SqliteDatabase
5+
from playhouse.sqlite_ext import JSONField
6+
7+
from eval_protocol.models import EvaluationRow
8+
9+
10+
class SqliteEvaluationRowStore:
11+
"""
12+
Lightweight reusable SQLite store for evaluation rows.
13+
14+
Stores arbitrary row data as JSON keyed by a unique string `row_id`.
15+
"""
16+
17+
def __init__(self, db_path: str):
18+
os.makedirs(os.path.dirname(db_path), exist_ok=True)
19+
self._db_path = db_path
20+
self._db = SqliteDatabase(self._db_path)
21+
22+
class BaseModel(Model):
23+
class Meta:
24+
database = self._db
25+
26+
class EvaluationRow(BaseModel): # type: ignore
27+
row_id = CharField(unique=True)
28+
data = JSONField()
29+
30+
self._EvaluationRow = EvaluationRow
31+
32+
self._db.connect()
33+
self._db.create_tables([EvaluationRow])
34+
35+
@property
36+
def db_path(self) -> str:
37+
return self._db_path
38+
39+
def upsert_row(self, row_id: str, data: dict) -> None:
40+
if self._EvaluationRow.select().where(self._EvaluationRow.row_id == row_id).exists():
41+
self._EvaluationRow.update(data=data).where(self._EvaluationRow.row_id == row_id).execute()
42+
else:
43+
self._EvaluationRow.create(row_id=row_id, data=data)
44+
45+
def read_rows(self, row_id: Optional[str] = None) -> List[dict]:
46+
if row_id is None:
47+
query = self._EvaluationRow.select().dicts()
48+
else:
49+
query = self._EvaluationRow.select().dicts().where(self._EvaluationRow.row_id == row_id)
50+
results = list(query)
51+
return [result["data"] for result in results]
52+
53+
def delete_row(self, row_id: str) -> int:
54+
return self._EvaluationRow.delete().where(self._EvaluationRow.row_id == row_id).execute()
55+
56+
def delete_all_rows(self) -> int:
57+
return self._EvaluationRow.delete().execute()
File renamed without changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Global event bus instance - uses SqliteEventBus for cross-process functionality
2+
from eval_protocol.event_bus.event_bus import EventBus
3+
from eval_protocol.event_bus.sqlite_event_bus import SqliteEventBus
4+
5+
event_bus: EventBus = SqliteEventBus()
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from typing import Any, Callable, List
2+
3+
from eval_protocol.event_bus.logger import logger
4+
5+
6+
class EventBus:
7+
"""Core event bus interface for decoupling components in the evaluation system."""
8+
9+
def __init__(self):
10+
self._listeners: List[Callable[[str, Any], None]] = []
11+
12+
def subscribe(self, callback: Callable[[str, Any], None]) -> None:
13+
"""Subscribe to events.
14+
15+
Args:
16+
callback: Function that takes (event_type, data) parameters
17+
"""
18+
self._listeners.append(callback)
19+
20+
def unsubscribe(self, callback: Callable[[str, Any], None]) -> None:
21+
"""Unsubscribe from events.
22+
23+
Args:
24+
callback: The callback function to remove
25+
"""
26+
try:
27+
self._listeners.remove(callback)
28+
except ValueError:
29+
pass # Callback wasn't subscribed
30+
31+
def emit(self, event_type: str, data: Any) -> None:
32+
"""Emit an event to all subscribers.
33+
34+
Args:
35+
event_type: Type of event (e.g., "row_upserted")
36+
data: Event data
37+
"""
38+
for listener in self._listeners:
39+
try:
40+
listener(event_type, data)
41+
except Exception as e:
42+
logger.debug(f"Event listener failed for {event_type}: {e}")
43+
44+
def start_listening(self) -> None:
45+
"""Start listening for cross-process events. Override in subclasses."""
46+
pass
47+
48+
def stop_listening(self) -> None:
49+
"""Stop listening for cross-process events. Override in subclasses."""
50+
pass

0 commit comments

Comments
 (0)