diff --git a/frontend/app/home/page.tsx b/frontend/app/home/page.tsx index cadf0d9..1dca67a 100644 --- a/frontend/app/home/page.tsx +++ b/frontend/app/home/page.tsx @@ -1,6 +1,7 @@ 'use client'; import ReactFlowView from '@/components/ui-react-flow/react-flow-view'; import { GlobalProvider } from '@/context/GlobalContext'; +import { WebSocketProvider } from '@/context/WebSocketContext'; import AppHeader from '@/components/ui-header/app-header'; import SettingsBar from '@/components/ui-header/settings-bar'; import { NotificationsProvider } from '@/components/notifications'; @@ -8,6 +9,7 @@ import { NotificationsProvider } from '@/components/notifications'; export default function Home() { return ( + {/* Top section for header and settings bar */} @@ -22,6 +24,7 @@ export default function Home() { + ); } diff --git a/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx b/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx index 5b33465..544e67d 100644 --- a/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx +++ b/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx @@ -1,6 +1,6 @@ import { Card } from '@/components/ui/card'; import { Handle, Position, useReactFlow } from '@xyflow/react'; -import useWebsocket from '@/hooks/useWebsocket'; +import useNodeData from '@/hooks/useNodeData'; import React from 'react'; import { useGlobalContext } from '@/context/GlobalContext'; import { ArrowUpRight } from 'lucide-react'; @@ -16,7 +16,7 @@ import { import SignalGraphView from './signal-graph-full'; export default function SignalGraphNode({ id }: { id?: string }) { - const { renderData } = useWebsocket(20, 10); + const { renderData } = useNodeData(20, 10); const processedData = renderData; const reactFlowInstance = useReactFlow(); diff --git a/frontend/context/WebSocketContext.tsx b/frontend/context/WebSocketContext.tsx new file mode 100644 index 0000000..7e09eb4 --- /dev/null +++ b/frontend/context/WebSocketContext.tsx @@ -0,0 +1,158 @@ +'use client'; + +import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react'; +import { useGlobalContext } from './GlobalContext'; +import { ProcessingConfig } from '@/lib/processing'; + +export type DataPoint = { + time: string; + signal1: number; + signal2: number; + signal3: number; + signal4: number; +}; + +type Subscriber = (points: DataPoint[]) => void; + +type WebSocketContextType = { + subscribe: (fn: Subscriber) => () => void; + sendProcessingConfig: (config: ProcessingConfig) => void; +}; + +const WebSocketContext = createContext(undefined); + +const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = { + apply_bandpass: false, + use_iir: false, + l_freq: null, + h_freq: null, + downsample_factor: null, + sfreq: 256, + n_channels: 4, +}; + +function formatTimestamp(raw: any): string { + const s = String(raw); + // ISO 8601: "2026-03-11T03:55:22.715574979Z" - extract "03:55:22" + if (s.includes('T')) return s.slice(11, 19); + return s; +} + +function normalizeBatch(batch: any): DataPoint[] { + return batch.timestamps.map((time: any, i: number) => ({ + time: formatTimestamp(time), + signal1: batch.signals[0][i], + signal2: batch.signals[1][i], + signal3: batch.signals[2][i], + signal4: batch.signals[3][i], + })); +} + +export function WebSocketProvider({ children }: { children: ReactNode }) { + const { dataStreaming } = useGlobalContext(); + const wsRef = useRef(null); + const processingConfigRef = useRef(null); + const subscribersRef = useRef>(new Set()); + const closingTimeoutRef = useRef(null); + const isClosingGracefullyRef = useRef(false); + + const subscribe = useCallback((fn: Subscriber) => { + subscribersRef.current.add(fn); + return () => subscribersRef.current.delete(fn); + }, []); + + const sendProcessingConfig = useCallback((config: ProcessingConfig) => { + processingConfigRef.current = config; + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify(config)); + console.log('Sent processing config:', config); + } + }, []); + + // Forward processing-config-update events from filter node to backend + useEffect(() => { + const handler = (event: Event) => { + sendProcessingConfig((event as CustomEvent).detail); + }; + window.addEventListener('processing-config-update', handler); + return () => window.removeEventListener('processing-config-update', handler); + }, [sendProcessingConfig]); + + // Manage WebSocket lifecycle + useEffect(() => { + if (!dataStreaming) { + if (wsRef.current?.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) { + isClosingGracefullyRef.current = true; + wsRef.current.send('clientClosing'); + closingTimeoutRef.current = setTimeout(() => { + console.warn('Timeout: no confirmed closing received. Forcing close.'); + wsRef.current?.close(); + isClosingGracefullyRef.current = false; + }, 5000); + } + return; + } + + if (wsRef.current && wsRef.current.readyState !== WebSocket.CLOSED) return; + + console.log('Opening WebSocket connection...'); + const ws = new WebSocket('ws://localhost:8080'); + wsRef.current = ws; + + ws.onopen = () => { + console.log('WebSocket connection opened.'); + ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG)); + }; + + ws.onmessage = (event) => { + const message = event.data; + if (message === 'confirmed closing') { + console.log("Received 'confirmed closing' from server."); + if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current); + ws.close(); + isClosingGracefullyRef.current = false; + } else { + try { + const points = normalizeBatch(JSON.parse(message)); + subscribersRef.current.forEach((fn) => fn(points)); + } catch (e) { + console.error('Failed to parse WebSocket message:', e); + } + } + }; + + ws.onclose = (event) => { + console.log('WebSocket connection closed:', event.code, event.reason); + wsRef.current = null; + isClosingGracefullyRef.current = false; + }; + + ws.onerror = () => { + if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current); + isClosingGracefullyRef.current = false; + }; + + return () => { + if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current); + if (ws.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) { + ws.send('clientClosing'); + closingTimeoutRef.current = setTimeout(() => ws.close(), 5000); + } else if (ws.readyState !== WebSocket.CLOSED) { + ws.close(); + } + wsRef.current = null; + }; + }, [dataStreaming]); + + return ( + + {children} + + ); +} + +export function useWebSocketContext() { + const ctx = useContext(WebSocketContext); + if (!ctx) throw new Error('useWebSocketContext must be used within a WebSocketProvider'); + return ctx; +} diff --git a/frontend/hooks/useNodeData.ts b/frontend/hooks/useNodeData.ts new file mode 100644 index 0000000..0a432aa --- /dev/null +++ b/frontend/hooks/useNodeData.ts @@ -0,0 +1,45 @@ +import { useEffect, useRef, useState } from 'react'; +import { useWebSocketContext, DataPoint } from '@/context/WebSocketContext'; +import { useGlobalContext } from '@/context/GlobalContext'; + +export default function useNodeData(chartSize: number, batchesPerSecond: number) { + const { subscribe } = useWebSocketContext(); + const { dataStreaming } = useGlobalContext(); + const [renderData, setRenderData] = useState([]); + const bufferRef = useRef([]); + + // Subscribe to incoming data points from the shared WebSocket + useEffect(() => { + const unsubscribe = subscribe((points) => { + bufferRef.current.push(...points); + }); + return unsubscribe; + }, [subscribe]); + + // Drain the buffer into renderData at the node's own rate + useEffect(() => { + if (!dataStreaming || batchesPerSecond <= 0) return; + + const intervalTime = 1000 / batchesPerSecond; + const id = setInterval(() => { + if (bufferRef.current.length > 0) { + const batch = bufferRef.current.splice( + 0, + Math.min(bufferRef.current.length, chartSize) + ); + setRenderData((prev) => [...prev, ...batch].slice(-chartSize)); + } + }, intervalTime); + + return () => clearInterval(id); + }, [dataStreaming, batchesPerSecond, chartSize]); + + // Always clear the buffer on start/stop to prevent backlog buildup. + // renderData is never cleared so the chart holds its last frame when paused + // and new data naturally replaces it as it arrives. + useEffect(() => { + bufferRef.current = []; + }, [dataStreaming]); + + return { renderData }; +} diff --git a/frontend/hooks/useWebsocket.tsx b/frontend/hooks/useWebsocket.tsx deleted file mode 100644 index 6223e59..0000000 --- a/frontend/hooks/useWebsocket.tsx +++ /dev/null @@ -1,173 +0,0 @@ -import { useEffect, useState, useRef } from 'react'; -import { useGlobalContext } from '@/context/GlobalContext'; -import { ProcessingConfig } from '@/lib/processing'; - -const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = { - apply_bandpass: false, - use_iir: false, - l_freq: null, - h_freq: null, - downsample_factor: null, - sfreq: 256, - n_channels: 4, -}; - -export default function useWebsocket( - chartSize: number, - batchesPerSecond: number -) { - const { dataStreaming } = useGlobalContext(); - const [renderData, setRenderData] = useState([]); - const bufferRef = useRef([]); - const wsRef = useRef(null); - const closingTimeoutRef = useRef(null); - const [isClosingGracefully, setIsClosingGracefully] = useState(false); - const processingConfigRef = useRef(null); - - const intervalTime = 1000 / batchesPerSecond; - - const sendProcessingConfig = (config: ProcessingConfig) => { - processingConfigRef.current = config - - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(config)) - console.log('Sent processing config:', config) - } - } - - useEffect(() => { - const handleConfigUpdate = (event: Event) => { - sendProcessingConfig((event as CustomEvent).detail); - }; - window.addEventListener('processing-config-update', handleConfigUpdate); - return () => window.removeEventListener('processing-config-update', handleConfigUpdate); - }, []); - - const normalizeBatch = (batch: any) => { - return batch.timestamps.map((time: number, i: number) => ({ - time, - signal1: batch.signals[0][i], - signal2: batch.signals[1][i], - signal3: batch.signals[2][i], - signal4: batch.signals[3][i], - })); - }; - - useEffect(() => { - console.log('data streaming:', dataStreaming); - - if (!dataStreaming && wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - if (!isClosingGracefully) { - console.log("Initiating graceful close..."); - setIsClosingGracefully(true); - wsRef.current.send('clientClosing'); - - closingTimeoutRef.current = setTimeout(() => { - console.warn("Timeout: No 'confirmed closing' received. Forcing WebSocket close."); - if (wsRef.current) { - wsRef.current.close(); - } - setIsClosingGracefully(false); - }, 5000); - } - return; - } - - if (!dataStreaming && (!wsRef.current || wsRef.current.readyState === WebSocket.CLOSED)) { - return; - } - - if (dataStreaming && (!wsRef.current || wsRef.current.readyState === WebSocket.CLOSED)) { - console.log("Opening new WebSocket connection..."); - const ws = new WebSocket('ws://localhost:8080'); - wsRef.current = ws; - - ws.onopen = () => { - console.log('WebSocket connection opened.'); - ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG)); - }; - - ws.onmessage = (event) => { - const message = event.data; - if (message === 'confirmed closing') { - console.log("Received 'confirmed closing' from server. Proceeding to close."); - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - if (wsRef.current) { - wsRef.current.close(); - } - setIsClosingGracefully(false); - } else { - try { - const parsedData = JSON.parse(message); - const normalizedPoints = normalizeBatch(parsedData); - bufferRef.current.push(...normalizedPoints); - } catch (e) { - console.error("Failed to parse non-confirmation message as JSON:", e, message); - } - } - }; - - ws.onclose = (event) => { - console.log('WebSocket connection closed:', event.code, event.reason); - wsRef.current = null; - setIsClosingGracefully(false); - }; - - ws.onerror = (error) => { - console.error('WebSocket error:', error); - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - setIsClosingGracefully(false); - }; - } - - const updateRenderData = () => { - if (bufferRef.current.length > 0) { - const nextBatch = bufferRef.current.splice( - 0, - Math.min(bufferRef.current.length, chartSize) - ); - setRenderData((prevData) => - [...(Array.isArray(prevData) ? prevData : []), ...nextBatch].slice(-chartSize) - ); - } - }; - - let intervalId: NodeJS.Timeout | null = null; - if (dataStreaming) { - intervalId = setInterval(updateRenderData, intervalTime); - } - - return () => { - console.log("Cleanup function running."); - if (intervalId) { - clearInterval(intervalId); - } - - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && !isClosingGracefully) { - console.log("Component unmounting or dependencies changed: Initiating graceful close during cleanup."); - wsRef.current.send('clientClosing'); - closingTimeoutRef.current = setTimeout(() => { - console.warn("Timeout: No 'confirmed closing' received during cleanup. Forcing WebSocket close."); - if (wsRef.current) { - wsRef.current.close(); - } - }, 5000); - } else if (wsRef.current && wsRef.current.readyState !== WebSocket.CLOSED) { - console.log("Forcing immediate WebSocket close during cleanup."); - wsRef.current.close(); - } - wsRef.current = null; - setIsClosingGracefully(false); - }; - }, [chartSize, batchesPerSecond, dataStreaming, isClosingGracefully]); - - return { renderData, sendProcessingConfig }; -} \ No newline at end of file