diff --git a/embedding_cluster/indexer.py b/embedding_cluster/indexer.py index 5532aad..b7bb623 100644 --- a/embedding_cluster/indexer.py +++ b/embedding_cluster/indexer.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Awaitable, Callable from chromadb.api import ClientAPI from chromadb.api.models.Collection import Collection @@ -29,12 +29,23 @@ logger = logging.getLogger(__name__) +PROGRESS_UPDATE_INTERVAL = 10 + async def main_indexer( settings: Settings, on_progress: Callable[[dict[str, Any]], None] | None = None, + on_log: Callable[[str, str, str], Awaitable[None]] | None = None, cancel_event: asyncio.Event | None = None, ) -> None: + async def _emit_log( + message: str, + level: str = "info", + verbosity: str = "low", + ) -> None: + if on_log is not None: + await on_log(message, level, verbosity) + chromadb_client: ClientAPI = chromadb.PersistentClient(path="./chromadb") chromadb_docs_collections: dict[str, ChromaDocsCollection] = ( init_chroma_docs_collection(settings) @@ -53,27 +64,53 @@ async def main_indexer( and len(settings.image_embedding_fields) > 0 ): logger.info("Loading image model: %s", settings.image_model_name) - image_model = CLIPModel.from_pretrained(settings.image_model_name).to( - settings.process_unit_device - ) - image_model_processor = CLIPProcessor.from_pretrained(settings.image_model_name) + await _emit_log(f"Loading image model: {settings.image_model_name}...") + try: + image_model = await asyncio.to_thread( + lambda: CLIPModel.from_pretrained(settings.image_model_name).to( + settings.process_unit_device + ) + ) + image_model_processor = await asyncio.to_thread( + CLIPProcessor.from_pretrained, settings.image_model_name + ) + await _emit_log("Image model loaded successfully") + except Exception as exc: + await _emit_log( + f"Failed to load image model: {exc}", + level="error", + ) + raise if ( settings.text_embedding_fields is not None and len(settings.text_embedding_fields) > 0 ): logger.info("Loading text model: %s", settings.text_model_name) - text_model_transformer = SentenceTransformer(settings.text_model_name).to( - settings.process_unit_device - ) + await _emit_log(f"Loading text model: {settings.text_model_name}...") + try: + text_model_transformer = await asyncio.to_thread( + lambda: SentenceTransformer(settings.text_model_name).to( + settings.process_unit_device + ) + ) + await _emit_log("Text model loaded successfully") + except Exception as exc: + await _emit_log( + f"Failed to load text model: {exc}", + level="error", + ) + raise start_time = time.perf_counter() + await _emit_log("Loading CSV file...") with open(settings.local_csv_filename) as csv_file: csv_iter = csv.DictReader(csv_file) + await _emit_log("CSV file opened, reading rows...") rows_read = 0 curr_rows: list[dict[str, Any]] = [] - + batch_num = 0 skipped_rows = 0 if settings.index_start_line is not None: skipped_rows = 1 @@ -84,16 +121,42 @@ async def main_indexer( for row in csv_iter: if cancel_event is not None and cancel_event.is_set(): - logger.info("Indexing cancelled at row %d", rows_read + skipped_rows) + logger.info( + "Indexing cancelled at row %d", + rows_read + skipped_rows, + ) + await _emit_log( + f"Indexing cancelled at row {rows_read + skipped_rows}", + level="warning", + ) break rows_read += 1 curr_rows.append(row) + if on_progress is not None and rows_read % PROGRESS_UPDATE_INTERVAL == 0: + on_progress( + { + "rows_indexed": rows_read, + "total_rows": None, + "errors": 0, + "elapsed_seconds": (time.perf_counter() - start_time), + } + ) + await _emit_log( + f"Reading row {rows_read}...", + verbosity="high", + ) if ( settings.index_end_line is not None and settings.index_end_line == rows_read + skipped_rows ): break if len(curr_rows) == settings.index_bulk_size: + batch_num += 1 + batch_start = rows_read - len(curr_rows) + 1 + await _emit_log( + f"Processing batch {batch_num} ({batch_start}-{rows_read})...", + verbosity="medium", + ) await _handle_batch( settings=settings, rows=curr_rows, @@ -104,6 +167,10 @@ async def main_indexer( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) + await _emit_log( + f"Batch {batch_num} complete", + verbosity="medium", + ) curr_rows = [] chromadb_docs_collections = init_chroma_docs_collection(settings) if on_progress is not None: @@ -112,15 +179,25 @@ async def main_indexer( "rows_indexed": rows_read, "total_rows": None, "errors": 0, - "elapsed_seconds": time.perf_counter() - start_time, + "elapsed_seconds": (time.perf_counter() - start_time), } ) + await _emit_log( + f"Indexed {rows_read} rows so far", + verbosity="medium", + ) logger.info( "Indexed %d rows. [%d]", rows_read, skipped_rows + rows_read, ) if len(curr_rows) > 0: + batch_num += 1 + batch_start = rows_read - len(curr_rows) + 1 + await _emit_log( + f"Processing batch {batch_num} ({batch_start}-{rows_read})...", + verbosity="medium", + ) await _handle_batch( settings=settings, rows=curr_rows, @@ -131,16 +208,26 @@ async def main_indexer( chromadb_docs_collections=chromadb_docs_collections, chromadb_collections=chromadb_collections, ) + await _emit_log( + f"Batch {batch_num} complete", + verbosity="medium", + ) if on_progress is not None: on_progress( { "rows_indexed": rows_read, "total_rows": None, "errors": 0, - "elapsed_seconds": time.perf_counter() - start_time, + "elapsed_seconds": (time.perf_counter() - start_time), } ) + elapsed = time.perf_counter() - start_time + await _emit_log( + f"Indexing complete: {rows_read} rows in {elapsed:.1f}s", + level="success", + ) + async def _handle_batch( settings: Settings, diff --git a/embedding_cluster/server/routes/index.py b/embedding_cluster/server/routes/index.py index 5ff5951..b1d115b 100644 --- a/embedding_cluster/server/routes/index.py +++ b/embedding_cluster/server/routes/index.py @@ -1,7 +1,9 @@ from __future__ import annotations import asyncio +import contextlib import logging +import time from pathlib import Path from typing import Any @@ -39,8 +41,22 @@ def resolve_csv_path(csv_filename: str) -> Path: return Path("./uploads") / candidate +def _get_collection_names(settings: Settings) -> list[str]: + """Build collection names from settings.""" + names: list[str] = [] + prefix = settings.chromadb_collection_prefix + if settings.image_embedding_fields: + for field in settings.image_embedding_fields: + names.append(f"{prefix}{field}") + if settings.text_embedding_fields: + for field in settings.text_embedding_fields: + names.append(f"{prefix}{field}") + return names + + async def _run_indexing(task_state: TaskState, request: IndexRequest) -> None: """Run indexing in background, updating task state and broadcasting progress.""" + heartbeat_task: asyncio.Task[None] | None = None try: # Construct Settings from IndexRequest try: @@ -67,6 +83,7 @@ async def _run_indexing(task_state: TaskState, request: IndexRequest) -> None: # Update task status to RUNNING task_state.status = TaskStatus.RUNNING + start_time = time.monotonic() # Define progress callback def on_progress(progress_data: dict[str, Any]) -> None: @@ -79,20 +96,33 @@ def on_progress(progress_data: dict[str, Any]) -> None: # ruff: noqa: RUF006 asyncio.create_task(ws_manager.broadcast(task_state.job_id, progress_data)) - rows_indexed = progress_data.get("rows_indexed") - if isinstance(rows_indexed, int) and rows_indexed > 0: - # ruff: noqa: RUF006 - asyncio.create_task( - ws_manager.broadcast( - task_state.job_id, - { - "type": "log", - "level": "info", - "message": f"Indexed {rows_indexed} rows", - }, - ) + # Define log callback + async def on_log(message: str, level: str, verbosity: str) -> None: + await ws_manager.broadcast( + task_state.job_id, + { + "type": "log", + "level": level, + "message": message, + "verbosity": verbosity, + }, + ) + + # Heartbeat background task + async def _heartbeat() -> None: + while True: + await asyncio.sleep(3) + elapsed = time.monotonic() - start_time + await ws_manager.broadcast( + task_state.job_id, + { + "type": "heartbeat", + "elapsed_seconds": elapsed, + }, ) + heartbeat_task = asyncio.create_task(_heartbeat()) + total_rows = request.total_rows on_progress( { @@ -103,13 +133,51 @@ def on_progress(progress_data: dict[str, Any]) -> None: } ) - # Run indexer with callback and cancel event + # Run indexer with callbacks and cancel event await main_indexer( - settings, on_progress=on_progress, cancel_event=task_state.cancel_event + settings, + on_progress=on_progress, + on_log=on_log, + cancel_event=task_state.cancel_event, ) - # Success - task_state.status = TaskStatus.COMPLETED + elapsed = time.monotonic() - start_time + rows_indexed = task_state.progress.get("rows_indexed", 0) + + # Check if cancelled (cancel_event set by cancel endpoint) + if task_state.status == TaskStatus.CANCELLED: + logger.info("Indexing cancelled for job %s", task_state.job_id) + # ruff: noqa: RUF006 + asyncio.create_task( + ws_manager.broadcast( + task_state.job_id, + { + "type": "cancelled", + "status": "cancelled", + "progress": task_state.progress, + "total_indexed": rows_indexed, + "elapsed_seconds": elapsed, + }, + ) + ) + else: + # Success — send completion message + task_state.status = TaskStatus.COMPLETED + collection_names = _get_collection_names(settings) + # ruff: noqa: RUF006 + asyncio.create_task( + ws_manager.broadcast( + task_state.job_id, + { + "type": "completed", + "status": "completed", + "progress": task_state.progress, + "total_indexed": rows_indexed, + "collection_names": collection_names, + "elapsed_seconds": elapsed, + }, + ) + ) except Exception as e: logger.exception("Indexing failed for job %s", task_state.job_id) task_state.status = TaskStatus.FAILED @@ -119,12 +187,18 @@ def on_progress(progress_data: dict[str, Any]) -> None: ws_manager.broadcast( task_state.job_id, { + "type": "error", "status": task_state.status.value, "error": task_state.error, + "message": str(e), "progress": task_state.progress, }, ) ) + finally: + if heartbeat_task is not None: + with contextlib.suppress(RuntimeError): + heartbeat_task.cancel() @router.post("/start", response_model=IndexStartResponse) diff --git a/frontend/src/components/csv/CsvPreview.tsx b/frontend/src/components/csv/CsvPreview.tsx index 7814840..947aec6 100644 --- a/frontend/src/components/csv/CsvPreview.tsx +++ b/frontend/src/components/csv/CsvPreview.tsx @@ -1,10 +1,108 @@ +import { useState, useRef, useCallback } from 'react'; + interface CsvPreviewProps { columns: string[]; rows: Record[]; totalRows: number; + previewLimit: number; + onLimitChange: (limit: number) => void; } -export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps) { +const IMAGE_URL_PATTERN = + /^https?:\/\/.+\.(jpg|jpeg|png|gif|webp|svg|bmp|avif)(\?.*)?$/i; + +function isImageUrl(value: string): boolean { + return IMAGE_URL_PATTERN.test(value.trim()); +} + +interface ImagePreviewState { + url: string; + x: number; + y: number; +} + +function ImageCell({ value }: { value: string }) { + const [preview, setPreview] = useState(null); + const cellRef = useRef(null); + const hideTimeout = useRef | null>(null); + + const showPreview = useCallback( + (e: React.MouseEvent) => { + if (hideTimeout.current) { + clearTimeout(hideTimeout.current); + hideTimeout.current = null; + } + setPreview({ url: value.trim(), x: e.clientX, y: e.clientY }); + }, + [value], + ); + + const movePreview = useCallback( + (e: React.MouseEvent) => { + if (preview) { + setPreview({ url: value.trim(), x: e.clientX, y: e.clientY }); + } + }, + [preview, value], + ); + + const hidePreview = useCallback(() => { + hideTimeout.current = setTimeout(() => setPreview(null), 100); + }, []); + + return ( + + + {value || ''} + + {preview && ( +
+
+ Preview { + (e.target as HTMLImageElement).style.display = 'none'; + }} + /> +
+
+ )} + + ); +} + +const PAGE_SIZE = 25; + +export default function CsvPreview({ + columns, + rows, + totalRows, + previewLimit, + onLimitChange, +}: CsvPreviewProps) { + const [currentPage, setCurrentPage] = useState(1); + + // Reset to page 1 when rows change (limit changed, new CSV, etc.) + const rowCountRef = useRef(rows.length); + if (rows.length !== rowCountRef.current) { + rowCountRef.current = rows.length; + if (currentPage !== 1) setCurrentPage(1); + } + if (!rows || rows.length === 0) { return (
@@ -13,13 +111,57 @@ export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps ); } + // Detect which columns contain image URLs by sampling first few rows + const imageColumns = new Set( + columns.filter((col) => + rows.some((row) => { + const val = row[col]; + return val && isImageUrl(val); + }), + ), + ); + + // Pagination + const totalPages = Math.max(1, Math.ceil(rows.length / PAGE_SIZE)); + const startIndex = (currentPage - 1) * PAGE_SIZE; + const paginatedRows = rows.length > PAGE_SIZE + ? rows.slice(startIndex, startIndex + PAGE_SIZE) + : rows; + + // Generate visible page numbers (show up to 5 around current) + const pageNumbers: number[] = []; + const maxVisible = 5; + let startPage = Math.max(1, currentPage - Math.floor(maxVisible / 2)); + const endPage = Math.min(totalPages, startPage + maxVisible - 1); + startPage = Math.max(1, endPage - maxVisible + 1); + for (let i = startPage; i <= endPage; i++) { + pageNumbers.push(i); + } + return (

Data Preview

- - Showing {rows.length} of {totalRows} total rows - +
+ + + + Showing {rows.length} of {totalRows} total rows + +
@@ -27,30 +169,97 @@ export default function CsvPreview({ columns, rows, totalRows }: CsvPreviewProps {columns.map((column) => ( - + {column} ))} - {rows.map((row, index) => ( - - {columns.map((column) => ( - - {row[column] || ''} - - ))} - - ))} + {paginatedRows.map((row, index) => { + const globalIndex = startIndex + index; + return ( + + {columns.map((column) => + imageColumns.has(column) && row[column] ? ( + + ) : ( + + {row[column] || ''} + + ), + )} + + ); + })}
+ + {totalPages > 1 && ( +
+ + Page {currentPage} of {totalPages} + +
+ + + {pageNumbers.map((page) => ( + + ))} + + +
+
+ )}
); } diff --git a/frontend/src/components/index/IndexProgress.tsx b/frontend/src/components/index/IndexProgress.tsx index 27aae7c..3a7eba3 100644 --- a/frontend/src/components/index/IndexProgress.tsx +++ b/frontend/src/components/index/IndexProgress.tsx @@ -1,27 +1,43 @@ -import { useEffect, useRef } from 'react'; +import { useEffect, useMemo, useRef, useState } from 'react'; import { useMutation } from '@tanstack/react-query'; -import { useIndexWebSocket } from '../../hooks/useIndexWebSocket'; +import { useIndexWebSocket, type LogMessage } from '../../hooks/useIndexWebSocket'; import { cancelIndex } from '../../api/indexing'; +type VerbosityLevel = 'low' | 'medium' | 'high'; + +const VERBOSITY_LEVELS: VerbosityLevel[] = ['low', 'medium', 'high']; + +const VERBOSITY_INCLUDES: Record = { + low: ['low'], + medium: ['low', 'medium'], + high: ['low', 'medium', 'high'], +}; + interface IndexProgressProps { jobId: string; onDone: () => void; } export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { - const { progress, logs, status, isConnected } = useIndexWebSocket(jobId); + const { progress, logs, status, isConnected, isStuckWarning, isStuckError, markCancelled } = useIndexWebSocket(jobId); const logsEndRef = useRef(null); + const [verbosity, setVerbosity] = useState('medium'); const cancelMutation = useMutation({ mutationFn: cancelIndex, + onSuccess: () => markCancelled(), }); - // Auto-scroll logs + const filteredLogs = useMemo(() => { + const allowed = VERBOSITY_INCLUDES[verbosity]; + return logs.filter((log: LogMessage) => allowed.includes(log.verbosity as VerbosityLevel)); + }, [logs, verbosity]); + useEffect(() => { if (logsEndRef.current) { logsEndRef.current.scrollIntoView({ behavior: 'smooth' }); } - }, [logs]); + }, [filteredLogs]); const formatTime = (seconds: number) => { const mins = Math.floor(seconds / 60); @@ -39,6 +55,45 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { return (
+ {/* Stuck error modal */} + {isStuckError && !isFinished && ( +
+
+
+ + + +
+
+

Backend Not Responding

+

+ No messages received for 30+ seconds. The backend may have crashed or become unresponsive. + Consider cancelling and checking server logs. +

+
+
+
+ )} + + {/* Stuck warning banner */} + {isStuckWarning && !isStuckError && !isFinished && ( +
+
+
+ + + +
+
+

Slow Response

+

+ No messages received for 15+ seconds. The backend may be processing a large batch. +

+
+
+
+ )} + {/* Header / Status Badge */}
@@ -102,9 +157,26 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { {/* Logs Panel */}
-

Live Logs

+
+

Live Logs

+
+ + +
+
- {logs.length === 0 && !progress.error ? ( + {filteredLogs.length === 0 && !progress.error ? (
Waiting for logs...
) : ( <> @@ -113,7 +185,7 @@ export default function IndexProgress({ jobId, onDone }: IndexProgressProps) { [error] {progress.error}
)} - {logs.map((log, index) => ( + {filteredLogs.map((log, index) => (
void; } interface WebSocketMessage { @@ -26,6 +33,7 @@ interface WebSocketMessage { status?: string; level?: string; message?: string; + verbosity?: string; rows_indexed?: number; total_rows?: number | null; errors?: number; @@ -48,8 +56,23 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult const [logs, setLogs] = useState([]); const [status, setStatus] = useState('pending'); const [isConnected, setIsConnected] = useState(false); + const [isStuckWarning, setIsStuckWarning] = useState(false); + const [isStuckError, setIsStuckError] = useState(false); const wsRef = useRef(null); + const timerRef = useRef | null>(null); + const lastMessageRef = useRef(Date.now()); + const stuckIntervalRef = useRef | null>(null); + const stopTimersRef = useRef<(() => void) | null>(null); + // Track the last server-reported elapsed_seconds to anchor the client timer + const serverElapsedRef = useRef(0); + const serverElapsedAtRef = useRef(Date.now()); + + const resetStuckTimer = useCallback(() => { + lastMessageRef.current = Date.now(); + setIsStuckWarning(false); + setIsStuckError(false); + }, []); useEffect(() => { if (!jobId) { @@ -67,46 +90,101 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult setLogs([]); setStatus('pending'); setIsConnected(false); + setIsStuckWarning(false); + setIsStuckError(false); + serverElapsedRef.current = 0; + serverElapsedAtRef.current = Date.now(); + lastMessageRef.current = Date.now(); + + // Helper to stop elapsed timer and stuck detection when indexing finishes + const stopTimers = () => { + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + if (stuckIntervalRef.current) { + clearInterval(stuckIntervalRef.current); + stuckIntervalRef.current = null; + } + }; + stopTimersRef.current = stopTimers; const ws = createIndexWebSocket(jobId); wsRef.current = ws; + // Client-side elapsed timer — ticks every second for smooth display + timerRef.current = setInterval(() => { + const now = Date.now(); + const delta = (now - serverElapsedAtRef.current) / 1000; + setProgress(prev => ({ + ...prev, + elapsed_seconds: serverElapsedRef.current + delta, + })); + }, 1000); + + // Stuck detection interval — checks every 5s + stuckIntervalRef.current = setInterval(() => { + const silence = Date.now() - lastMessageRef.current; + if (silence >= STUCK_ERROR_MS) { + setIsStuckError(true); + setIsStuckWarning(true); + } else if (silence >= STUCK_WARNING_MS) { + setIsStuckWarning(true); + setIsStuckError(false); + } else { + setIsStuckWarning(false); + setIsStuckError(false); + } + }, 5000); + ws.onopen = () => { console.log('WebSocket connected'); setIsConnected(true); + resetStuckTimer(); }; ws.onmessage = (event) => { try { const data = JSON.parse(event.data) as WebSocketMessage; + resetStuckTimer(); + + // Sync server elapsed time for client timer anchor + if (typeof data.elapsed_seconds === 'number') { + serverElapsedRef.current = data.elapsed_seconds; + serverElapsedAtRef.current = Date.now(); + } // Handle explicit status updates if (data.status) { - setStatus(data.status); + setStatus(data.status); } if (typeof data.error === 'string') { setProgress(prev => ({ ...prev, - error: data.error + error: data.error as string, })); } if (data.progress && typeof data.progress === 'object') { - const progress = data.progress as WebSocketMessage; + const progressMsg = data.progress as WebSocketMessage; + if (typeof progressMsg.elapsed_seconds === 'number') { + serverElapsedRef.current = progressMsg.elapsed_seconds; + serverElapsedAtRef.current = Date.now(); + } setProgress(prev => ({ ...prev, - rows_indexed: typeof progress.rows_indexed === 'number' - ? progress.rows_indexed + rows_indexed: typeof progressMsg.rows_indexed === 'number' + ? progressMsg.rows_indexed : prev.rows_indexed, - total_rows: typeof progress.total_rows === 'number' - ? progress.total_rows + total_rows: typeof progressMsg.total_rows === 'number' + ? progressMsg.total_rows : prev.total_rows, - errors: typeof progress.errors === 'number' ? progress.errors : prev.errors, - elapsed_seconds: typeof progress.elapsed_seconds === 'number' - ? progress.elapsed_seconds + errors: typeof progressMsg.errors === 'number' ? progressMsg.errors : prev.errors, + elapsed_seconds: typeof progressMsg.elapsed_seconds === 'number' + ? progressMsg.elapsed_seconds : prev.elapsed_seconds, - error: typeof progress.error === 'string' ? progress.error : prev.error, + error: typeof progressMsg.error === 'string' ? progressMsg.error : prev.error, })); } @@ -125,23 +203,46 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult } else if (data.type === 'log') { setLogs(prev => [...prev, { level: data.level || 'info', - message: data.message || '' + message: data.message || '', + verbosity: data.verbosity || 'low', }]); + } else if (data.type === 'heartbeat') { + // Heartbeat keeps stuck detection happy — elapsed already synced above } else if (data.type === 'completed') { setStatus('completed'); + stopTimers(); + // Sync final elapsed time from server + if (typeof data.elapsed_seconds === 'number') { + setProgress(prev => ({ ...prev, elapsed_seconds: data.elapsed_seconds as number })); + } setLogs(prev => [...prev, { level: 'success', - message: `Indexing completed. Total indexed: ${data.total_indexed}. Collections: ${Array.isArray(data.collection_names) ? data.collection_names.join(', ') : ''}` + message: `Indexing completed. Total indexed: ${data.total_indexed}. Collections: ${Array.isArray(data.collection_names) ? data.collection_names.join(', ') : ''}`, + verbosity: 'low', + }]); + } else if (data.type === 'cancelled') { + setStatus('cancelled'); + stopTimers(); + // Sync final elapsed time from server + if (typeof data.elapsed_seconds === 'number') { + setProgress(prev => ({ ...prev, elapsed_seconds: data.elapsed_seconds as number })); + } + setLogs(prev => [...prev, { + level: 'warning', + message: `Indexing cancelled. Rows indexed so far: ${data.total_indexed ?? 0}.`, + verbosity: 'low', }]); } else if (data.type === 'error') { setStatus('error'); + stopTimers(); setLogs(prev => [...prev, { level: 'error', - message: data.message || 'Unknown error occurred' + message: data.message || 'Unknown error occurred', + verbosity: 'low', }]); setProgress(prev => ({ ...prev, - error: data.message || prev.error || 'Unknown error occurred' + error: data.message || prev.error || 'Unknown error occurred', })); } } catch (err) { @@ -158,8 +259,8 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult ws.onclose = () => { console.log('WebSocket disconnected'); setIsConnected(false); - // Don't overwrite 'completed' or 'failed' status on close - setStatus(prev => (prev === 'completed' || prev === 'failed' || prev === 'error') ? prev : 'disconnected'); + // Don't overwrite terminal status on close + setStatus(prev => (prev === 'completed' || prev === 'failed' || prev === 'error' || prev === 'cancelled') ? prev : 'disconnected'); }; return () => { @@ -167,8 +268,23 @@ export function useIndexWebSocket(jobId: string | null): UseIndexWebSocketResult wsRef.current.close(); wsRef.current = null; } + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + if (stuckIntervalRef.current) { + clearInterval(stuckIntervalRef.current); + stuckIntervalRef.current = null; + } }; - }, [jobId]); + }, [jobId, resetStuckTimer]); + + const markCancelled = useCallback(() => { + setStatus('cancelled'); + setIsStuckWarning(false); + setIsStuckError(false); + stopTimersRef.current?.(); + }, []); - return { progress, logs, status, isConnected }; + return { progress, logs, status, isConnected, isStuckWarning, isStuckError, markCancelled }; } diff --git a/frontend/src/pages/IndexPage.tsx b/frontend/src/pages/IndexPage.tsx index 07f1710..b2cc654 100644 --- a/frontend/src/pages/IndexPage.tsx +++ b/frontend/src/pages/IndexPage.tsx @@ -16,11 +16,12 @@ export default function IndexPage() { const [columns, setColumns] = useState([]); const [totalRows, setTotalRows] = useState(0); const [indexJobId, setIndexJobId] = useState(null); + const [previewLimit, setPreviewLimit] = useState(10); // Query for CSV Preview const { data: previewData, isLoading: isPreviewLoading } = useQuery({ - queryKey: ['previewCsv', uploadedFilename], - queryFn: () => previewCsv(uploadedFilename!), + queryKey: ['previewCsv', uploadedFilename, previewLimit], + queryFn: () => previewCsv(uploadedFilename!, previewLimit), enabled: !!uploadedFilename && step === 'preview', }); @@ -79,7 +80,18 @@ export default function IndexPage() {
) : ( <> -
+
+