Skip to content

Commit cc0abb2

Browse files
author
Dylan Huang
committed
Enhance singleton lock functionality and file locking in LocalFSDatasetLoggerAdapter
- Updated `is_process_running` to include a timeout parameter, allowing for more flexible process monitoring. - Implemented file locking mechanisms in `LocalFSDatasetLoggerAdapter` to prevent race conditions during logging operations, ensuring data integrity when multiple processes access log files. - Added methods for acquiring and releasing file locks, improving the robustness of the logging process.
1 parent 4ff7912 commit cc0abb2

File tree

4 files changed

+601
-70
lines changed

4 files changed

+601
-70
lines changed

eval_protocol/dataset_logger/local_fs_dataset_logger_adapter.py

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
import json
22
import os
3-
import shutil
4-
import tempfile
3+
import time
54
from datetime import datetime, timezone
5+
from pathlib import Path
66
from typing import TYPE_CHECKING, List, Optional
77

88
from eval_protocol.common_utils import load_jsonl
99
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
1010
from eval_protocol.dataset_logger.directory_utils import find_eval_protocol_datasets_dir
11+
from eval_protocol.singleton_lock import acquire_singleton_lock, release_singleton_lock
1112

1213
if TYPE_CHECKING:
1314
from eval_protocol.models import EvaluationRow
1415

1516

1617
class LocalFSDatasetLoggerAdapter(DatasetLogger):
1718
"""
18-
Logger that stores logs in the local filesystem.
19+
Logger that stores logs in the local filesystem with file locking to prevent race conditions.
1920
"""
2021

2122
def __init__(self):
@@ -39,6 +40,44 @@ def current_jsonl_path(self) -> str:
3940
"""
4041
return os.path.join(self.datasets_dir, f"{self.current_date}.jsonl")
4142

43+
def _acquire_file_lock(self, file_path: str, timeout: float = 30.0) -> bool:
44+
"""
45+
Acquire a lock for a specific file using the singleton lock mechanism.
46+
47+
Args:
48+
file_path: Path to the file to lock
49+
timeout: Maximum time to wait for lock acquisition in seconds
50+
51+
Returns:
52+
True if lock was acquired, False if timeout occurred
53+
"""
54+
# Create a lock name based on the file path
55+
lock_name = f"file_lock_{os.path.basename(file_path)}"
56+
base_dir = Path(os.path.dirname(file_path))
57+
58+
start_time = time.time()
59+
while time.time() - start_time < timeout:
60+
result = acquire_singleton_lock(base_dir, lock_name)
61+
if result is None:
62+
# Successfully acquired lock
63+
return True
64+
else:
65+
# Lock is held by another process, wait and retry
66+
time.sleep(0.1)
67+
68+
return False
69+
70+
def _release_file_lock(self, file_path: str) -> None:
71+
"""
72+
Release the lock for a specific file.
73+
74+
Args:
75+
file_path: Path to the file to unlock
76+
"""
77+
lock_name = f"file_lock_{os.path.basename(file_path)}"
78+
base_dir = Path(os.path.dirname(file_path))
79+
release_singleton_lock(base_dir, lock_name)
80+
4281
def log(self, row: "EvaluationRow") -> None:
4382
"""Log a row, updating existing row with same ID or appending new row."""
4483
row_id = row.input_metadata.row_id
@@ -49,25 +88,35 @@ def log(self, row: "EvaluationRow") -> None:
4988
if filename.endswith(".jsonl"):
5089
file_path = os.path.join(self.datasets_dir, filename)
5190
if os.path.exists(file_path):
52-
with open(file_path, "r") as f:
53-
lines = f.readlines()
54-
55-
# Find the line with matching ID
56-
for i, line in enumerate(lines):
91+
if self._acquire_file_lock(file_path):
5792
try:
58-
line_data = json.loads(line.strip())
59-
if line_data["input_metadata"]["row_id"] == row_id:
60-
# Update existing row
61-
lines[i] = row.model_dump_json(exclude_none=True) + os.linesep
62-
with open(file_path, "w") as f:
63-
f.writelines(lines)
64-
return
65-
except json.JSONDecodeError:
66-
continue
93+
with open(file_path, "r") as f:
94+
lines = f.readlines()
95+
96+
# Find the line with matching ID
97+
for i, line in enumerate(lines):
98+
try:
99+
line_data = json.loads(line.strip())
100+
if line_data["input_metadata"]["row_id"] == row_id:
101+
# Update existing row
102+
lines[i] = row.model_dump_json(exclude_none=True) + os.linesep
103+
with open(file_path, "w") as f:
104+
f.writelines(lines)
105+
return
106+
except json.JSONDecodeError:
107+
continue
108+
finally:
109+
self._release_file_lock(file_path)
67110

68111
# If no existing row found, append new row to current file
69-
with open(self.current_jsonl_path, "a") as f:
70-
f.write(row.model_dump_json(exclude_none=True) + os.linesep)
112+
if self._acquire_file_lock(self.current_jsonl_path):
113+
try:
114+
with open(self.current_jsonl_path, "a") as f:
115+
f.write(row.model_dump_json(exclude_none=True) + os.linesep)
116+
finally:
117+
self._release_file_lock(self.current_jsonl_path)
118+
else:
119+
raise RuntimeError(f"Failed to acquire lock for log file {self.current_jsonl_path}")
71120

72121
def read(self, row_id: Optional[str] = None) -> List["EvaluationRow"]:
73122
"""Read rows from all JSONL files in the datasets directory. Also
@@ -82,14 +131,18 @@ def read(self, row_id: Optional[str] = None) -> List["EvaluationRow"]:
82131
for filename in os.listdir(self.datasets_dir):
83132
if filename.endswith(".jsonl"):
84133
file_path = os.path.join(self.datasets_dir, filename)
85-
data = load_jsonl(file_path)
86-
for r in data:
87-
row = EvaluationRow(**r)
88-
if row.input_metadata.row_id not in existing_row_ids:
89-
existing_row_ids.add(row.input_metadata.row_id)
90-
else:
91-
raise ValueError(f"Duplicate Row ID {row.input_metadata.row_id} already exists")
92-
all_rows.append(row)
134+
if self._acquire_file_lock(file_path):
135+
try:
136+
data = load_jsonl(file_path)
137+
for r in data:
138+
row = EvaluationRow(**r)
139+
if row.input_metadata.row_id not in existing_row_ids:
140+
existing_row_ids.add(row.input_metadata.row_id)
141+
else:
142+
raise ValueError(f"Duplicate Row ID {row.input_metadata.row_id} already exists")
143+
all_rows.append(row)
144+
finally:
145+
self._release_file_lock(file_path)
93146

94147
if row_id:
95148
# Filter by row_id if specified

eval_protocol/pytest/eval_watcher.py

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from eval_protocol.dataset_logger.directory_utils import find_eval_protocol_dir
2424
from eval_protocol.logging_utils import get_logger
2525
from eval_protocol.models import EvaluationRow
26-
from eval_protocol.utils.singleton_lock import (
26+
from eval_protocol.singleton_lock import (
2727
acquire_singleton_lock,
2828
get_lock_file_paths,
2929
get_lock_holder_pid,
@@ -213,7 +213,7 @@ def _start_watcher_process(check_interval: float) -> Optional[int]:
213213
return None
214214

215215

216-
def ensure_singleton_watcher(check_interval: float = 2.0) -> bool:
216+
def ensure_singleton_watcher(check_interval: float = 2.0) -> Optional[int]:
217217
"""
218218
Ensure the singleton EvaluationWatcher instance exists and is running.
219219
This function is OS-level global - only one watcher will run across all processes.
@@ -223,53 +223,34 @@ def ensure_singleton_watcher(check_interval: float = 2.0) -> bool:
223223
check_interval: How often to check for terminated processes (seconds)
224224
225225
Returns:
226-
True if watcher was started successfully, False if another watcher is already running
226+
PID of the watcher process if it was started successfully, None if it failed to start
227227
"""
228-
229228
# Check if a watcher is already running before attempting to start a new one
230229
if is_watcher_running():
231230
logger.info("🔍 Evaluation watcher is already running")
232-
return False
231+
return None
233232

234233
# Start the watcher in a completely independent background process
235-
try:
236-
pid = _start_watcher_process(check_interval)
237-
if pid is None:
238-
logger.error("❌ Failed to start evaluation watcher: process creation failed")
239-
return False
240-
241-
logger.info(f"🔍 Started evaluation watcher in independent background process (PID: {pid})")
242-
243-
# Spin until the watcher is running, or timeout after 10 seconds
244-
timeout = 10.0
245-
interval = 0.1
246-
waited = 0.0
247-
while waited < timeout:
248-
if is_watcher_running():
249-
break
250-
time.sleep(interval)
251-
waited += interval
252-
else:
253-
logger.error(
254-
f"❌ Watcher process (PID: {pid}) started but didn't acquire the lock after {timeout} seconds"
255-
)
256-
return False
257-
258-
# Don't wait for the process - let it run independently
259-
return True
260-
except Exception as e:
261-
logger.error(f"❌ Failed to start evaluation watcher: {e}")
262-
return False
234+
pid = _start_watcher_process(check_interval)
235+
logger.info(f"🔍 Started evaluation watcher in independent background process (PID: {pid})")
236+
return pid
263237

264238

265239
def is_watcher_running() -> bool:
266240
"""Check if the evaluation watcher is currently running."""
267241
return is_lock_held(get_eval_protocol_dir(), LOCK_NAME)
268242

269243

270-
def get_watcher_pid() -> Optional[int]:
271-
"""Get the PID of the currently running evaluation watcher."""
272-
return get_lock_holder_pid(get_eval_protocol_dir(), LOCK_NAME)
244+
def get_watcher_pid(timeout: float = 10.0) -> Optional[int]:
245+
"""Get the PID of the currently running evaluation watcher. Tries for 10 seconds."""
246+
interval = 0.1
247+
started = time.time()
248+
while time.time() - started < timeout:
249+
pid = get_lock_holder_pid(get_eval_protocol_dir(), LOCK_NAME)
250+
if pid is not None:
251+
return pid
252+
time.sleep(interval)
253+
return None
273254

274255

275256
def stop_watcher() -> bool:
@@ -280,7 +261,7 @@ def stop_watcher() -> bool:
280261
return False
281262

282263
try:
283-
os.kill(pid, signal.SIGTERM)
264+
os.kill(pid, signal.SIGKILL)
284265
logger.info(f"🔍 Sent SIGTERM to evaluation watcher process {pid}")
285266
return True
286267
except OSError as e:

eval_protocol/singleton_lock.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"""
1414

1515
import os
16+
import time
1617
from pathlib import Path
1718
from typing import Optional, Tuple
1819

@@ -124,7 +125,7 @@ def release_singleton_lock(base_dir: Path, lock_name: str) -> None:
124125
pass
125126

126127

127-
def is_process_running(pid: int) -> bool:
128+
def is_process_running(pid: int, timeout: float = 10.0) -> bool:
128129
"""
129130
Check if a process is still running.
130131
@@ -134,11 +135,19 @@ def is_process_running(pid: int) -> bool:
134135
Returns:
135136
True if the process is running, False otherwise
136137
"""
137-
try:
138-
os.kill(pid, 0)
139-
return True
140-
except OSError:
141-
return False
138+
start = time.time()
139+
140+
def _is_process_running(pid: int) -> bool:
141+
try:
142+
os.kill(pid, 0)
143+
return True
144+
except OSError:
145+
return False
146+
147+
while time.time() - start < timeout:
148+
if not _is_process_running(pid):
149+
return False
150+
return True
142151

143152

144153
def is_lock_held(base_dir: Path, lock_name: str) -> bool:

0 commit comments

Comments
 (0)