-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_safe_api.py
More file actions
233 lines (193 loc) · 7.6 KB
/
thread_safe_api.py
File metadata and controls
233 lines (193 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
Thread-safe state bridge for GitHub search operations.
Provides a clean interface between worker threads and the Streamlit UI layer.
Uses structured data (dicts) instead of parsing human-readable strings.
"""
import threading
from queue import Queue
import logging
from typing import Dict, Any, Optional, List
import time
logger = logging.getLogger(__name__)
class ThreadSafeState:
"""Thread-safe state container for search operations."""
def __init__(self):
self.lock = threading.RLock()
self.progress_value = 0.0
self.status_message = ""
self.is_running = False
self.error = None
self.results = None
self.cancel_requested = False
self.completed_stats = None
self.search_stats = self._default_stats()
def _default_stats(self) -> Dict[str, Any]:
return {
"total_fetched": 0,
"current_batch": 0,
"total_batches": 0,
"completed_patterns": 0,
"total_patterns": 0,
"rate_limit_hits": 0,
"requests_made": 0,
"is_extended_search": False,
"search_query": "",
"result_limit": 0,
"cooldown_time": 40,
"search_phase": "idle",
"start_time": None,
"elapsed_time": 0,
}
# --- Setters (called from worker threads) ---
def start_search(self, query: str, limit: int, extended: bool) -> None:
"""Initialize search state."""
with self.lock:
self.reset()
self.is_running = True
self.cancel_requested = False
self.search_stats["start_time"] = time.time()
self.search_stats["search_query"] = query
self.search_stats["result_limit"] = limit
self.search_stats["is_extended_search"] = extended
self.search_stats["search_phase"] = "starting"
def set_progress(self, value: float) -> None:
"""Set progress value (0.0-1.0)."""
with self.lock:
self.progress_value = min(max(0.0, value), 1.0)
self._update_elapsed()
def set_status(self, message: str) -> None:
"""Set status message."""
with self.lock:
self.status_message = message
self._update_elapsed()
# Auto-detect phase from keywords
msg_lower = message.lower()
if "cooling down" in msg_lower:
self.search_stats["search_phase"] = "cooling_down"
elif "progress:" in msg_lower:
self.search_stats["search_phase"] = "fetching"
self.search_stats["requests_made"] += 1
elif "batch" in msg_lower and "processing" in msg_lower:
self.search_stats["search_phase"] = "batch_processing"
elif "completed" in msg_lower or "search completed" in msg_lower:
self.search_stats["search_phase"] = "completed"
elif "rate limit" in msg_lower:
self.search_stats["rate_limit_hits"] += 1
def update_batch_info(self, current: int, total: int) -> None:
"""Directly update batch progress (structured — no string parsing)."""
with self.lock:
self.search_stats["current_batch"] = current
self.search_stats["total_batches"] = total
def update_pattern_info(self, completed: int, total: int) -> None:
"""Directly update pattern progress (structured — no string parsing)."""
with self.lock:
self.search_stats["completed_patterns"] = completed
self.search_stats["total_patterns"] = total
def set_error(self, error_message: Optional[str]) -> None:
"""Set error message."""
with self.lock:
self.error = error_message
def set_results(self, results: List[Dict[str, Any]]) -> None:
"""Set search results."""
with self.lock:
self.results = results
def set_running(self, is_running: bool) -> None:
"""Set running state."""
with self.lock:
previous = self.is_running
self.is_running = is_running
if not is_running and previous:
self._update_elapsed()
self.completed_stats = self.search_stats.copy()
logger.info("Search completed, stored final stats")
def request_cancel(self) -> None:
"""Request cancellation of the running search."""
with self.lock:
self.cancel_requested = True
logger.info("Search cancellation requested")
# --- Getters (called from main/Streamlit thread) ---
def get_progress(self) -> float:
with self.lock:
return self.progress_value
def get_status(self) -> str:
with self.lock:
return self.status_message
def get_error(self) -> Optional[str]:
with self.lock:
return self.error
def get_results(self) -> Optional[List[Dict[str, Any]]]:
with self.lock:
return self.results
def is_search_running(self) -> bool:
with self.lock:
return self.is_running
def is_cancel_requested(self) -> bool:
with self.lock:
return self.cancel_requested
def get_stats(self) -> Dict[str, Any]:
"""Get current or completed stats."""
with self.lock:
if self.is_running:
return self.search_stats.copy()
elif self.completed_stats:
return self.completed_stats.copy()
return self.search_stats.copy()
# --- Internal helpers ---
def _update_elapsed(self) -> None:
"""Update elapsed time since search started."""
if self.search_stats.get("start_time"):
self.search_stats["elapsed_time"] = time.time() - self.search_stats["start_time"]
def reset(self) -> None:
"""Reset state for a new search. Does not clear previous results."""
with self.lock:
self.progress_value = 0.0
self.status_message = ""
self.is_running = False
self.error = None
self.cancel_requested = False
self.results = None
self.search_stats = self._default_stats()
# Singleton instance
thread_safe_state = ThreadSafeState()
def thread_safe_search_github(
query: str,
limit: int,
extended: bool = False,
cooldown: int = 40,
state: ThreadSafeState = thread_safe_state,
) -> List[Dict[str, Any]]:
"""
Thread-safe wrapper for GitHub search.
Uses callback functions to update the thread-safe state,
which can safely be polled from the main Streamlit thread.
"""
from github_api import search_github as _search_github
# Initialize search state
state.start_search(query, limit, extended)
state.search_stats["cooldown_time"] = cooldown
# Define callbacks that update thread-safe state
def on_progress(value: float):
state.set_progress(value)
def on_status(message: str):
state.set_status(message)
def on_error(message: str):
state.set_error(message)
def should_cancel() -> bool:
return state.is_cancel_requested()
try:
results = _search_github(
query=query,
limit=limit,
on_progress=on_progress,
on_status=on_status,
on_error=on_error,
extended=extended,
cooldown_time=cooldown,
should_cancel=should_cancel,
)
state.set_results(results)
return results
except Exception as e:
logger.error(f"Error in thread_safe_search_github: {str(e)}", exc_info=True)
state.set_error(f"Search error: {str(e)}")
return []