From 29c47fafc9e7ce4637524e40855e6728a9787b9a Mon Sep 17 00:00:00 2001 From: aGallea Date: Wed, 25 Feb 2026 13:04:55 +0200 Subject: [PATCH 1/7] feat(index): add real-time progress updates, informative logs, verbosity filter, and stuck detection - Add intermediate progress updates every 10 rows instead of only on batch boundaries - Add on_log callback to indexer for rich phase-level logging (CSV loading, model loading, batch progress, completion) - Add verbosity levels (low/medium/high) with frontend dropdown filter on Live Logs panel - Wire on_log to WebSocket broadcast in index route - Add 3-second heartbeat from backend for connection health monitoring - Add client-side elapsed timer with server time resync for smooth display - Add stuck detection: warning banner at 15s silence, error banner at 30s - Add try/except around model loading with specific error reporting via on_log - Guard heartbeat cleanup with contextlib.suppress for closed event loops - Update test mock signatures for on_log parameter compatibility - Fix ruff formatting in test_server_search.py and test_utils.py --- embedding_cluster/indexer.py | 105 ++++++++++++++-- embedding_cluster/server/routes/index.py | 89 +++++++++++--- .../src/components/index/IndexProgress.tsx | 87 ++++++++++++-- frontend/src/hooks/useIndexWebSocket.ts | 112 +++++++++++++++--- tests/test_server_index.py | 6 +- tests/test_server_search.py | 12 +- tests/test_utils.py | 2 + 7 files changed, 349 insertions(+), 64 deletions(-) diff --git a/embedding_cluster/indexer.py b/embedding_cluster/indexer.py index 5532aad..a265b41 100644 --- a/embedding_cluster/indexer.py +++ b/embedding_cluster/indexer.py @@ -29,12 +29,23 @@ logger = logging.getLogger(__name__) +PROGRESS_UPDATE_INTERVAL = 10 + async def main_indexer( settings: Settings, on_progress: Callable[[dict[str, Any]], None] | None = None, + on_log: Callable[[str, str, str], None] | None = None, cancel_event: asyncio.Event | None = None, ) -> None: + def _emit_log( + message: str, + level: str = "info", + verbosity: str = "low", + ) -> None: + if on_log is not None: + on_log(message, level, verbosity) + chromadb_client: ClientAPI = chromadb.PersistentClient(path="./chromadb") chromadb_docs_collections: dict[str, ChromaDocsCollection] = ( init_chroma_docs_collection(settings) @@ -53,27 +64,49 @@ async def main_indexer( and len(settings.image_embedding_fields) > 0 ): logger.info("Loading image model: %s", settings.image_model_name) - image_model = CLIPModel.from_pretrained(settings.image_model_name).to( - settings.process_unit_device - ) - image_model_processor = CLIPProcessor.from_pretrained(settings.image_model_name) + _emit_log(f"Loading image model: {settings.image_model_name}...") + try: + image_model = CLIPModel.from_pretrained(settings.image_model_name).to( + settings.process_unit_device + ) + image_model_processor = CLIPProcessor.from_pretrained( + settings.image_model_name + ) + _emit_log("Image model loaded successfully") + except Exception as exc: + _emit_log( + f"Failed to load image model: {exc}", + level="error", + ) + raise if ( settings.text_embedding_fields is not None and len(settings.text_embedding_fields) > 0 ): logger.info("Loading text model: %s", settings.text_model_name) - text_model_transformer = SentenceTransformer(settings.text_model_name).to( - settings.process_unit_device - ) + _emit_log(f"Loading text model: {settings.text_model_name}...") + try: + text_model_transformer = SentenceTransformer(settings.text_model_name).to( + settings.process_unit_device + ) + _emit_log("Text model loaded successfully") + except Exception as exc: + _emit_log( + f"Failed to load text model: {exc}", + level="error", + ) + raise start_time = time.perf_counter() + _emit_log("Loading CSV file...") with open(settings.local_csv_filename) as csv_file: csv_iter = csv.DictReader(csv_file) + _emit_log("CSV file opened, reading rows...") rows_read = 0 curr_rows: list[dict[str, Any]] = [] - + batch_num = 0 skipped_rows = 0 if settings.index_start_line is not None: skipped_rows = 1 @@ -84,16 +117,42 @@ async def main_indexer( for row in csv_iter: if cancel_event is not None and cancel_event.is_set(): - logger.info("Indexing cancelled at row %d", rows_read + skipped_rows) + logger.info( + "Indexing cancelled at row %d", + rows_read + skipped_rows, + ) + _emit_log( + f"Indexing cancelled at row {rows_read + skipped_rows}", + level="warning", + ) break rows_read += 1 curr_rows.append(row) + if on_progress is not None and rows_read % PROGRESS_UPDATE_INTERVAL == 0: + on_progress( + { + "rows_indexed": rows_read, + "total_rows": None, + "errors": 0, + "elapsed_seconds": (time.perf_counter() - start_time), + } + ) + _emit_log( + f"Processing row {rows_read}...", + verbosity="high", + ) if ( settings.index_end_line is not None and settings.index_end_line == rows_read + skipped_rows ): break if len(curr_rows) == settings.index_bulk_size: + batch_num += 1 + batch_start = rows_read - len(curr_rows) + 1 + _emit_log( + f"Processing batch {batch_num} ({batch_start}-{rows_read})...", + verbosity="medium", + ) await _handle_batch( settings=settings, rows=curr_rows, @@ -104,6 +163,10 @@ async def main_indexer( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) + _emit_log( + f"Batch {batch_num} complete, writing to ChromaDB...", + verbosity="medium", + ) curr_rows = [] chromadb_docs_collections = init_chroma_docs_collection(settings) if on_progress is not None: @@ -112,15 +175,25 @@ async def main_indexer( "rows_indexed": rows_read, "total_rows": None, "errors": 0, - "elapsed_seconds": time.perf_counter() - start_time, + "elapsed_seconds": (time.perf_counter() - start_time), } ) + _emit_log( + f"Indexed {rows_read} rows so far", + verbosity="medium", + ) logger.info( "Indexed %d rows. [%d]", rows_read, skipped_rows + rows_read, ) if len(curr_rows) > 0: + batch_num += 1 + batch_start = rows_read - len(curr_rows) + 1 + _emit_log( + f"Processing batch {batch_num} ({batch_start}-{rows_read})...", + verbosity="medium", + ) await _handle_batch( settings=settings, rows=curr_rows, @@ -131,16 +204,26 @@ async def main_indexer( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) + _emit_log( + f"Batch {batch_num} complete, writing to ChromaDB...", + verbosity="medium", + ) if on_progress is not None: on_progress( { "rows_indexed": rows_read, "total_rows": None, "errors": 0, - "elapsed_seconds": time.perf_counter() - start_time, + "elapsed_seconds": (time.perf_counter() - start_time), } ) + elapsed = time.perf_counter() - start_time + _emit_log( + f"Indexing complete: {rows_read} rows in {elapsed:.1f}s", + level="success", + ) + async def _handle_batch( settings: Settings, diff --git a/embedding_cluster/server/routes/index.py b/embedding_cluster/server/routes/index.py index 5ff5951..f1c322a 100644 --- a/embedding_cluster/server/routes/index.py +++ b/embedding_cluster/server/routes/index.py @@ -1,7 +1,9 @@ from __future__ import annotations import asyncio +import contextlib import logging +import time from pathlib import Path from typing import Any @@ -39,8 +41,22 @@ def resolve_csv_path(csv_filename: str) -> Path: return Path("./uploads") / candidate +def _get_collection_names(settings: Settings) -> list[str]: + """Build collection names from settings.""" + names: list[str] = [] + prefix = settings.chromadb_collection_prefix + if settings.image_embedding_fields: + for field in settings.image_embedding_fields: + names.append(f"{prefix}{field}") + if settings.text_embedding_fields: + for field in settings.text_embedding_fields: + names.append(f"{prefix}{field}") + return names + + async def _run_indexing(task_state: TaskState, request: IndexRequest) -> None: """Run indexing in background, updating task state and broadcasting progress.""" + heartbeat_task: asyncio.Task[None] | None = None try: # Construct Settings from IndexRequest try: @@ -67,6 +83,7 @@ async def _run_indexing(task_state: TaskState, request: IndexRequest) -> None: # Update task status to RUNNING task_state.status = TaskStatus.RUNNING + start_time = time.monotonic() # Define progress callback def on_progress(progress_data: dict[str, Any]) -> None: @@ -79,19 +96,35 @@ def on_progress(progress_data: dict[str, Any]) -> None: # ruff: noqa: RUF006 asyncio.create_task(ws_manager.broadcast(task_state.job_id, progress_data)) - rows_indexed = progress_data.get("rows_indexed") - if isinstance(rows_indexed, int) and rows_indexed > 0: - # ruff: noqa: RUF006 - asyncio.create_task( - ws_manager.broadcast( - task_state.job_id, - { - "type": "log", - "level": "info", - "message": f"Indexed {rows_indexed} rows", - }, - ) + # Define log callback + def on_log(message: str, level: str, verbosity: str) -> None: + # ruff: noqa: RUF006 + asyncio.create_task( + ws_manager.broadcast( + task_state.job_id, + { + "type": "log", + "level": level, + "message": message, + "verbosity": verbosity, + }, ) + ) + + # Heartbeat background task + async def _heartbeat() -> None: + while True: + await asyncio.sleep(3) + elapsed = time.monotonic() - start_time + await ws_manager.broadcast( + task_state.job_id, + { + "type": "heartbeat", + "elapsed_seconds": elapsed, + }, + ) + + heartbeat_task = asyncio.create_task(_heartbeat()) total_rows = request.total_rows on_progress( @@ -103,13 +136,33 @@ def on_progress(progress_data: dict[str, Any]) -> None: } ) - # Run indexer with callback and cancel event + # Run indexer with callbacks and cancel event await main_indexer( - settings, on_progress=on_progress, cancel_event=task_state.cancel_event + settings, + on_progress=on_progress, + on_log=on_log, + cancel_event=task_state.cancel_event, ) - # Success + # Success — send completion message task_state.status = TaskStatus.COMPLETED + elapsed = time.monotonic() - start_time + collection_names = _get_collection_names(settings) + rows_indexed = task_state.progress.get("rows_indexed", 0) + # ruff: noqa: RUF006 + asyncio.create_task( + ws_manager.broadcast( + task_state.job_id, + { + "type": "completed", + "status": "completed", + "progress": task_state.progress, + "total_indexed": rows_indexed, + "collection_names": collection_names, + "elapsed_seconds": elapsed, + }, + ) + ) except Exception as e: logger.exception("Indexing failed for job %s", task_state.job_id) task_state.status = TaskStatus.FAILED @@ -119,12 +172,18 @@ def on_progress(progress_data: dict[str, Any]) -> None: ws_manager.broadcast( task_state.job_id, { + "type": "error", "status": task_state.status.value, "error": task_state.error, + "message": str(e), "progress": task_state.progress, }, ) ) + finally: + if heartbeat_task is not None: + with contextlib.suppress(RuntimeError): + heartbeat_task.cancel() @router.post("/start", response_model=IndexStartResponse) diff --git a/frontend/src/components/index/IndexProgress.tsx b/frontend/src/components/index/IndexProgress.tsx index 27aae7c..b83ba42 100644 --- a/frontend/src/components/index/IndexProgress.tsx +++ b/frontend/src/components/index/IndexProgress.tsx @@ -1,27 +1,42 @@ -import { useEffect, useRef } from 'react'; +import { useEffect, useMemo, useRef, useState } from 'react'; import { useMutation } from '@tanstack/react-query'; -import { useIndexWebSocket } from '../../hooks/useIndexWebSocket'; +import { useIndexWebSocket, type LogMessage } from '../../hooks/useIndexWebSocket'; import { cancelIndex } from '../../api/indexing'; +type VerbosityLevel = 'low' | 'medium' | 'high'; + +const VERBOSITY_LEVELS: VerbosityLevel[] = ['low', 'medium', 'high']; + +const VERBOSITY_INCLUDES: Record = { + low: ['low'], + medium: ['low', 'medium'], + high: ['low', 'medium', 'high'], +}; + interface IndexProgressProps { jobId: string; onDone: () => void; } export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { - const { progress, logs, status, isConnected } = useIndexWebSocket(jobId); + const { progress, logs, status, isConnected, isStuckWarning, isStuckError } = useIndexWebSocket(jobId); const logsEndRef = useRef(null); + const [verbosity, setVerbosity] = useState('medium'); const cancelMutation = useMutation({ mutationFn: cancelIndex, }); - // Auto-scroll logs + const filteredLogs = useMemo(() => { + const allowed = VERBOSITY_INCLUDES[verbosity]; + return logs.filter((log: LogMessage) => allowed.includes(log.verbosity as VerbosityLevel)); + }, [logs, verbosity]); + useEffect(() => { if (logsEndRef.current) { logsEndRef.current.scrollIntoView({ behavior: 'smooth' }); } - }, [logs]); + }, [filteredLogs]); const formatTime = (seconds: number) => { const mins = Math.floor(seconds / 60); @@ -39,6 +54,45 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { return (
+ {/* Stuck error modal */} + {isStuckError && !isFinished && ( +
+
+
+ + + +
+
+

Backend Not Responding

+

+ No messages received for 30+ seconds. The backend may have crashed or become unresponsive. + Consider cancelling and checking server logs. +

+
+
+
+ )} + + {/* Stuck warning banner */} + {isStuckWarning && !isStuckError && !isFinished && ( +
+
+
+ + + +
+
+

Slow Response

+

+ No messages received for 15+ seconds. The backend may be processing a large batch. +

+
+
+
+ )} + {/* Header / Status Badge */}
@@ -102,9 +156,26 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { {/* Logs Panel */}
-

Live Logs

+
+

Live Logs

+
+ + +
+
- {logs.length === 0 && !progress.error ? ( + {filteredLogs.length === 0 && !progress.error ? (
Waiting for logs...
) : ( <> @@ -113,7 +184,7 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { [error] {progress.error}
)} - {logs.map((log, index) => ( + {filteredLogs.map((log, index) => (
([]); const [status, setStatus] = useState('pending'); const [isConnected, setIsConnected] = useState(false); + const [isStuckWarning, setIsStuckWarning] = useState(false); + const [isStuckError, setIsStuckError] = useState(false); const wsRef = useRef(null); + const timerRef = useRef | null>(null); + const lastMessageRef = useRef(Date.now()); + const stuckIntervalRef = useRef | null>(null); + // Track the last server-reported elapsed_seconds to anchor the client timer + const serverElapsedRef = useRef(0); + const serverElapsedAtRef = useRef(Date.now()); + + const resetStuckTimer = useCallback(() => { + lastMessageRef.current = Date.now(); + setIsStuckWarning(false); + setIsStuckError(false); + }, []); useEffect(() => { if (!jobId) { @@ -67,46 +88,88 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult setLogs([]); setStatus('pending'); setIsConnected(false); + setIsStuckWarning(false); + setIsStuckError(false); + serverElapsedRef.current = 0; + serverElapsedAtRef.current = Date.now(); + lastMessageRef.current = Date.now(); const ws = createIndexWebSocket(jobId); wsRef.current = ws; + // Client-side elapsed timer — ticks every second for smooth display + timerRef.current = setInterval(() => { + const now = Date.now(); + const delta = (now - serverElapsedAtRef.current) / 1000; + setProgress(prev => ({ + ...prev, + elapsed_seconds: serverElapsedRef.current + delta, + })); + }, 1000); + + // Stuck detection interval — checks every 5s + stuckIntervalRef.current = setInterval(() => { + const silence = Date.now() - lastMessageRef.current; + if (silence >= STUCK_ERROR_MS) { + setIsStuckError(true); + setIsStuckWarning(true); + } else if (silence >= STUCK_WARNING_MS) { + setIsStuckWarning(true); + setIsStuckError(false); + } else { + setIsStuckWarning(false); + setIsStuckError(false); + } + }, 5000); + ws.onopen = () => { console.log('WebSocket connected'); setIsConnected(true); + resetStuckTimer(); }; ws.onmessage = (event) => { try { const data = JSON.parse(event.data) as WebSocketMessage; + resetStuckTimer(); + + // Sync server elapsed time for client timer anchor + if (typeof data.elapsed_seconds === 'number') { + serverElapsedRef.current = data.elapsed_seconds; + serverElapsedAtRef.current = Date.now(); + } // Handle explicit status updates if (data.status) { - setStatus(data.status); + setStatus(data.status); } if (typeof data.error === 'string') { setProgress(prev => ({ ...prev, - error: data.error + error: data.error as string, })); } if (data.progress && typeof data.progress === 'object') { - const progress = data.progress as WebSocketMessage; + const progressMsg = data.progress as WebSocketMessage; + if (typeof progressMsg.elapsed_seconds === 'number') { + serverElapsedRef.current = progressMsg.elapsed_seconds; + serverElapsedAtRef.current = Date.now(); + } setProgress(prev => ({ ...prev, - rows_indexed: typeof progress.rows_indexed === 'number' - ? progress.rows_indexed + rows_indexed: typeof progressMsg.rows_indexed === 'number' + ? progressMsg.rows_indexed : prev.rows_indexed, - total_rows: typeof progress.total_rows === 'number' - ? progress.total_rows + total_rows: typeof progressMsg.total_rows === 'number' + ? progressMsg.total_rows : prev.total_rows, - errors: typeof progress.errors === 'number' ? progress.errors : prev.errors, - elapsed_seconds: typeof progress.elapsed_seconds === 'number' - ? progress.elapsed_seconds + errors: typeof progressMsg.errors === 'number' ? progressMsg.errors : prev.errors, + elapsed_seconds: typeof progressMsg.elapsed_seconds === 'number' + ? progressMsg.elapsed_seconds : prev.elapsed_seconds, - error: typeof progress.error === 'string' ? progress.error : prev.error, + error: typeof progressMsg.error === 'string' ? progressMsg.error : prev.error, })); } @@ -125,23 +188,28 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult } else if (data.type === 'log') { setLogs(prev => [...prev, { level: data.level || 'info', - message: data.message || '' + message: data.message || '', + verbosity: data.verbosity || 'low', }]); + } else if (data.type === 'heartbeat') { + // Heartbeat keeps stuck detection happy — elapsed already synced above } else if (data.type === 'completed') { setStatus('completed'); setLogs(prev => [...prev, { level: 'success', - message: `Indexing completed. Total indexed: ${data.total_indexed}. Collections: ${Array.isArray(data.collection_names) ? data.collection_names.join(', ') : ''}` + message: `Indexing completed. Total indexed: ${data.total_indexed}. Collections: ${Array.isArray(data.collection_names) ? data.collection_names.join(', ') : ''}`, + verbosity: 'low', }]); } else if (data.type === 'error') { setStatus('error'); setLogs(prev => [...prev, { level: 'error', - message: data.message || 'Unknown error occurred' + message: data.message || 'Unknown error occurred', + verbosity: 'low', }]); setProgress(prev => ({ ...prev, - error: data.message || prev.error || 'Unknown error occurred' + error: data.message || prev.error || 'Unknown error occurred', })); } } catch (err) { @@ -167,8 +235,16 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult wsRef.current.close(); wsRef.current = null; } + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + if (stuckIntervalRef.current) { + clearInterval(stuckIntervalRef.current); + stuckIntervalRef.current = null; + } }; - }, [jobId]); + }, [jobId, resetStuckTimer]); - return { progress, logs, status, isConnected }; + return { progress, logs, status, isConnected, isStuckWarning, isStuckError }; } diff --git a/tests/test_server_index.py b/tests/test_server_index.py index b837674..d4b4057 100644 --- a/tests/test_server_index.py +++ b/tests/test_server_index.py @@ -49,7 +49,7 @@ async def client(app): def mock_indexer(): """Mock main_indexer to avoid loading ML models in tests.""" - async def fake_indexer(settings, on_progress=None, cancel_event=None): + async def fake_indexer(settings, on_progress=None, on_log=None, cancel_event=None): if on_progress: on_progress( { @@ -138,7 +138,7 @@ async def test_status_success(client, mock_indexer): async def test_status_includes_error_on_failure(client, mock_indexer): - async def failing_indexer(settings, on_progress=None, cancel_event=None): + async def failing_indexer(settings, on_progress=None, on_log=None, cancel_event=None): raise RuntimeError("boom") with patch( @@ -192,7 +192,7 @@ async def test_cancel_running_job(client, mock_indexer): """Test cancelling a running job.""" # Need a longer-running job for cancellation - async def slow_indexer(settings, on_progress=None, cancel_event=None): + async def slow_indexer(settings, on_progress=None, on_log=None, cancel_event=None): for i in range(10): if cancel_event and cancel_event.is_set(): return diff --git a/tests/test_server_search.py b/tests/test_server_search.py index 8950109..95a7c6d 100644 --- a/tests/test_server_search.py +++ b/tests/test_server_search.py @@ -284,9 +284,7 @@ async def test_search_image_download_failure( assert response.status_code == 500 -async def test_search_text_on_image_collection_uses_clip( - client, mock_chromadb_client -): +async def test_search_text_on_image_collection_uses_clip(client, mock_chromadb_client): """Text query on an image (CLIP) collection should use CLIP text encoder.""" import torch @@ -333,9 +331,7 @@ async def test_search_text_on_image_collection_uses_clip( mock_clip.get_text_features.assert_called_once() -async def test_search_uses_stored_text_model_name( - client, mock_chromadb_client -): +async def test_search_uses_stored_text_model_name(client, mock_chromadb_client): """Text query should use the model name stored in collection metadata.""" mock_coll = MagicMock() mock_coll.count.return_value = 100 @@ -410,9 +406,7 @@ async def test_search_uses_stored_image_model_name( ) as mock_downloader_cls, ): mock_instance = MagicMock() - mock_instance.download_image_exp_backoff = AsyncMock( - return_value=mock_image - ) + mock_instance.download_image_exp_backoff = AsyncMock(return_value=mock_image) mock_downloader_cls.return_value = mock_instance response = await client.post( diff --git a/tests/test_utils.py b/tests/test_utils.py index 05a45ac..47e9123 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -178,6 +178,7 @@ def test_with_image_fields(self, monkeypatch: pytest.MonkeyPatch) -> None: "model_type": "image", }, ) + def test_with_text_fields(self, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("TEXT_EMBEDDING_FIELDS", '["description"]') monkeypatch.setenv("CHROMADB_COLLECTION_PREFIX", "pre_") @@ -196,6 +197,7 @@ def test_with_text_fields(self, monkeypatch: pytest.MonkeyPatch) -> None: "model_type": "text", }, ) + def test_with_no_fields(self) -> None: settings = Settings() mock_client = MagicMock() From 39ab6759fc29009aea9ec7dd3020f04b5ef990e0 Mon Sep 17 00:00:00 2001 From: aGallea Date: Wed, 25 Feb 2026 13:17:13 +0200 Subject: [PATCH 2/7] fix(index): await log broadcasts for real-time delivery and stop timers on completion - Change on_log from sync fire-and-forget to async awaited callback so WebSocket writes complete before the next row, eliminating batchy logs - Wrap CLIPModel and SentenceTransformer loading in asyncio.to_thread to keep the event loop responsive during model init - Add stopTimers helper in useIndexWebSocket to clear elapsed and stuck detection intervals on completed/error - Sync final elapsed_seconds from server on completion --- embedding_cluster/indexer.py | 56 +++++++++++++----------- embedding_cluster/server/routes/index.py | 21 ++++----- frontend/src/hooks/useIndexWebSocket.ts | 18 ++++++++ 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/embedding_cluster/indexer.py b/embedding_cluster/indexer.py index a265b41..507b527 100644 --- a/embedding_cluster/indexer.py +++ b/embedding_cluster/indexer.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Awaitable, Callable from chromadb.api import ClientAPI from chromadb.api.models.Collection import Collection @@ -35,16 +35,16 @@ async def main_indexer( settings: Settings, on_progress: Callable[[dict[str, Any]], None] | None = None, - on_log: Callable[[str, str, str], None] | None = None, + on_log: Callable[[str, str, str], Awaitable[None]] | None = None, cancel_event: asyncio.Event | None = None, ) -> None: - def _emit_log( + async def _emit_log( message: str, level: str = "info", verbosity: str = "low", ) -> None: if on_log is not None: - on_log(message, level, verbosity) + await on_log(message, level, verbosity) chromadb_client: ClientAPI = chromadb.PersistentClient(path="./chromadb") chromadb_docs_collections: dict[str, ChromaDocsCollection] = ( @@ -64,17 +64,19 @@ def _emit_log( and len(settings.image_embedding_fields) > 0 ): logger.info("Loading image model: %s", settings.image_model_name) - _emit_log(f"Loading image model: {settings.image_model_name}...") + await _emit_log(f"Loading image model: {settings.image_model_name}...") try: - image_model = CLIPModel.from_pretrained(settings.image_model_name).to( - settings.process_unit_device + image_model = await asyncio.to_thread( + lambda: CLIPModel.from_pretrained(settings.image_model_name).to( + settings.process_unit_device + ) ) - image_model_processor = CLIPProcessor.from_pretrained( - settings.image_model_name + image_model_processor = await asyncio.to_thread( + CLIPProcessor.from_pretrained, settings.image_model_name ) - _emit_log("Image model loaded successfully") + await _emit_log("Image model loaded successfully") except Exception as exc: - _emit_log( + await _emit_log( f"Failed to load image model: {exc}", level="error", ) @@ -85,14 +87,16 @@ def _emit_log( and len(settings.text_embedding_fields) > 0 ): logger.info("Loading text model: %s", settings.text_model_name) - _emit_log(f"Loading text model: {settings.text_model_name}...") + await _emit_log(f"Loading text model: {settings.text_model_name}...") try: - text_model_transformer = SentenceTransformer(settings.text_model_name).to( - settings.process_unit_device + text_model_transformer = await asyncio.to_thread( + lambda: SentenceTransformer(settings.text_model_name).to( + settings.process_unit_device + ) ) - _emit_log("Text model loaded successfully") + await _emit_log("Text model loaded successfully") except Exception as exc: - _emit_log( + await _emit_log( f"Failed to load text model: {exc}", level="error", ) @@ -100,10 +104,10 @@ def _emit_log( start_time = time.perf_counter() - _emit_log("Loading CSV file...") + await _emit_log("Loading CSV file...") with open(settings.local_csv_filename) as csv_file: csv_iter = csv.DictReader(csv_file) - _emit_log("CSV file opened, reading rows...") + await _emit_log("CSV file opened, reading rows...") rows_read = 0 curr_rows: list[dict[str, Any]] = [] batch_num = 0 @@ -121,7 +125,7 @@ def _emit_log( "Indexing cancelled at row %d", rows_read + skipped_rows, ) - _emit_log( + await _emit_log( f"Indexing cancelled at row {rows_read + skipped_rows}", level="warning", ) @@ -137,7 +141,7 @@ def _emit_log( "elapsed_seconds": (time.perf_counter() - start_time), } ) - _emit_log( + await _emit_log( f"Processing row {rows_read}...", verbosity="high", ) @@ -149,7 +153,7 @@ def _emit_log( if len(curr_rows) == settings.index_bulk_size: batch_num += 1 batch_start = rows_read - len(curr_rows) + 1 - _emit_log( + await _emit_log( f"Processing batch {batch_num} ({batch_start}-{rows_read})...", verbosity="medium", ) @@ -163,7 +167,7 @@ def _emit_log( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) - _emit_log( + await _emit_log( f"Batch {batch_num} complete, writing to ChromaDB...", verbosity="medium", ) @@ -178,7 +182,7 @@ def _emit_log( "elapsed_seconds": (time.perf_counter() - start_time), } ) - _emit_log( + await _emit_log( f"Indexed {rows_read} rows so far", verbosity="medium", ) @@ -190,7 +194,7 @@ def _emit_log( if len(curr_rows) > 0: batch_num += 1 batch_start = rows_read - len(curr_rows) + 1 - _emit_log( + await _emit_log( f"Processing batch {batch_num} ({batch_start}-{rows_read})...", verbosity="medium", ) @@ -204,7 +208,7 @@ def _emit_log( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) - _emit_log( + await _emit_log( f"Batch {batch_num} complete, writing to ChromaDB...", verbosity="medium", ) @@ -219,7 +223,7 @@ def _emit_log( ) elapsed = time.perf_counter() - start_time - _emit_log( + await _emit_log( f"Indexing complete: {rows_read} rows in {elapsed:.1f}s", level="success", ) diff --git a/embedding_cluster/server/routes/index.py b/embedding_cluster/server/routes/index.py index f1c322a..dbc0f69 100644 --- a/embedding_cluster/server/routes/index.py +++ b/embedding_cluster/server/routes/index.py @@ -97,18 +97,15 @@ def on_progress(progress_data: dict[str, Any]) -> None: asyncio.create_task(ws_manager.broadcast(task_state.job_id, progress_data)) # Define log callback - def on_log(message: str, level: str, verbosity: str) -> None: - # ruff: noqa: RUF006 - asyncio.create_task( - ws_manager.broadcast( - task_state.job_id, - { - "type": "log", - "level": level, - "message": message, - "verbosity": verbosity, - }, - ) + async def on_log(message: str, level: str, verbosity: str) -> None: + await ws_manager.broadcast( + task_state.job_id, + { + "type": "log", + "level": level, + "message": message, + "verbosity": verbosity, + }, ) # Heartbeat background task diff --git a/frontend/src/hooks/useIndexWebSocket.ts b/frontend/src/hooks/useIndexWebSocket.ts index a5d1135..1a27c5d 100644 --- a/frontend/src/hooks/useIndexWebSocket.ts +++ b/frontend/src/hooks/useIndexWebSocket.ts @@ -94,6 +94,18 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult serverElapsedAtRef.current = Date.now(); lastMessageRef.current = Date.now(); + // Helper to stop elapsed timer and stuck detection when indexing finishes + const stopTimers = () => { + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + if (stuckIntervalRef.current) { + clearInterval(stuckIntervalRef.current); + stuckIntervalRef.current = null; + } + }; + const ws = createIndexWebSocket(jobId); wsRef.current = ws; @@ -195,6 +207,11 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult // Heartbeat keeps stuck detection happy — elapsed already synced above } else if (data.type === 'completed') { setStatus('completed'); + stopTimers(); + // Sync final elapsed time from server + if (typeof data.elapsed_seconds === 'number') { + setProgress(prev => ({ ...prev, elapsed_seconds: data.elapsed_seconds as number })); + } setLogs(prev => [...prev, { level: 'success', message: `Indexing completed. Total indexed: ${data.total_indexed}. Collections: ${Array.isArray(data.collection_names) ? data.collection_names.join(', ') : ''}`, @@ -202,6 +219,7 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult }]); } else if (data.type === 'error') { setStatus('error'); + stopTimers(); setLogs(prev => [...prev, { level: 'error', message: data.message || 'Unknown error occurred', From 1b9bb8a9e1bc56fd595282f0ec367cebe07d7539 Mon Sep 17 00:00:00 2001 From: aGallea Date: Wed, 25 Feb 2026 15:01:35 +0200 Subject: [PATCH 3/7] feat(frontend): enhance CSV preview with image hover, upload reset, row limit, and pagination - Add image URL hover preview with floating tooltip in data preview table - Add 'Upload Different CSV' button to return to upload step - Add row limit dropdown (10/25/50/100) wired to preview API - Add client-side pagination (25 rows per page) with full navigation controls Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus --- frontend/src/components/csv/CsvPreview.tsx | 247 +++++++++++++++++++-- frontend/src/pages/IndexPage.tsx | 20 +- 2 files changed, 245 insertions(+), 22 deletions(-) diff --git a/frontend/src/components/csv/CsvPreview.tsx b/frontend/src/components/csv/CsvPreview.tsx index 7814840..947aec6 100644 --- a/frontend/src/components/csv/CsvPreview.tsx +++ b/frontend/src/components/csv/CsvPreview.tsx @@ -1,10 +1,108 @@ +import { useState, useRef, useCallback } from 'react'; + interface CsvPreviewProps { columns: string[]; rows: Record[]; totalRows: number; + previewLimit: number; + onLimitChange: (limit: number) => void; } -export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps) { +const IMAGE_URL_PATTERN = + /^https?:\/\/.+\.(jpg|jpeg|png|gif|webp|svg|bmp|avif)(\?.*)?$/i; + +function isImageUrl(value: string): boolean { + return IMAGE_URL_PATTERN.test(value.trim()); +} + +interface ImagePreviewState { + url: string; + x: number; + y: number; +} + +function ImageCell({ value }: { value: string }) { + const [preview, setPreview] = useState(null); + const cellRef = useRef(null); + const hideTimeout = useRef | null>(null); + + const showPreview = useCallback( + (e: React.MouseEvent) => { + if (hideTimeout.current) { + clearTimeout(hideTimeout.current); + hideTimeout.current = null; + } + setPreview({ url: value.trim(), x: e.clientX, y: e.clientY }); + }, + [value], + ); + + const movePreview = useCallback( + (e: React.MouseEvent) => { + if (preview) { + setPreview({ url: value.trim(), x: e.clientX, y: e.clientY }); + } + }, + [preview, value], + ); + + const hidePreview = useCallback(() => { + hideTimeout.current = setTimeout(() => setPreview(null), 100); + }, []); + + return ( + + + {value || ''} + + {preview && ( +
+
+ Preview { + (e.target as HTMLImageElement).style.display = 'none'; + }} + /> +
+
+ )} + + ); +} + +const PAGE_SIZE = 25; + +export default function CsvPreview({ + columns, + rows, + totalRows, + previewLimit, + onLimitChange, +}: CsvPreviewProps) { + const [currentPage, setCurrentPage] = useState(1); + + // Reset to page 1 when rows change (limit changed, new CSV, etc.) + const rowCountRef = useRef(rows.length); + if (rows.length !== rowCountRef.current) { + rowCountRef.current = rows.length; + if (currentPage !== 1) setCurrentPage(1); + } + if (!rows || rows.length === 0) { return (
@@ -13,13 +111,57 @@ export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps ); } + // Detect which columns contain image URLs by sampling first few rows + const imageColumns = new Set( + columns.filter((col) => + rows.some((row) => { + const val = row[col]; + return val && isImageUrl(val); + }), + ), + ); + + // Pagination + const totalPages = Math.max(1, Math.ceil(rows.length / PAGE_SIZE)); + const startIndex = (currentPage - 1) * PAGE_SIZE; + const paginatedRows = rows.length > PAGE_SIZE + ? rows.slice(startIndex, startIndex + PAGE_SIZE) + : rows; + + // Generate visible page numbers (show up to 5 around current) + const pageNumbers: number[] = []; + const maxVisible = 5; + let startPage = Math.max(1, currentPage - Math.floor(maxVisible / 2)); + const endPage = Math.min(totalPages, startPage + maxVisible - 1); + startPage = Math.max(1, endPage - maxVisible + 1); + for (let i = startPage; i <= endPage; i++) { + pageNumbers.push(i); + } + return (

Data Preview

- - Showing {rows.length} of {totalRows} total rows - +
+ + + + Showing {rows.length} of {totalRows} total rows + +
@@ -27,30 +169,97 @@ export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps {columns.map((column) => ( - + {column} ))} - {rows.map((row, index) => ( - - {columns.map((column) => ( - - {row[column] || ''} - - ))} - - ))} + {paginatedRows.map((row, index) => { + const globalIndex = startIndex + index; + return ( + + {columns.map((column) => + imageColumns.has(column) && row[column] ? ( + + ) : ( + + {row[column] || ''} + + ), + )} + + ); + })}
+ + {totalPages > 1 && ( +
+ + Page {currentPage} of {totalPages} + +
+ + + {pageNumbers.map((page) => ( + + ))} + + +
+
+ )}
); } diff --git a/frontend/src/pages/IndexPage.tsx b/frontend/src/pages/IndexPage.tsx index 07f1710..b2cc654 100644 --- a/frontend/src/pages/IndexPage.tsx +++ b/frontend/src/pages/IndexPage.tsx @@ -16,11 +16,12 @@ export default function IndexPage() { const [columns, setColumns] = useState([]); const [totalRows, setTotalRows] = useState(0); const [indexJobId, setIndexJobId] = useState(null); + const [previewLimit, setPreviewLimit] = useState(10); // Query for CSV Preview const { data: previewData, isLoading: isPreviewLoading } = useQuery({ - queryKey: ['previewCsv', uploadedFilename], - queryFn: () => previewCsv(uploadedFilename!), + queryKey: ['previewCsv', uploadedFilename, previewLimit], + queryFn: () => previewCsv(uploadedFilename!, previewLimit), enabled: !!uploadedFilename && step === 'preview', }); @@ -79,7 +80,18 @@ export default function IndexPage() {
) : ( <> -
+
+