Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions frontend/app/home/page.tsx
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
'use client';
import ReactFlowView from '@/components/ui-react-flow/react-flow-view';
import { GlobalProvider } from '@/context/GlobalContext';
import { WebSocketProvider } from '@/context/WebSocketContext';
import AppHeader from '@/components/ui-header/app-header';
import SettingsBar from '@/components/ui-header/settings-bar';
import { NotificationsProvider } from '@/components/notifications';

export default function Home() {
return (
<GlobalProvider>
<WebSocketProvider>
<NotificationsProvider>
<div className="h-screen flex flex-col">
{/* Top section for header and settings bar */}
Expand All @@ -22,6 +24,7 @@ export default function Home() {
</div>
</div>
</NotificationsProvider>
</WebSocketProvider>
</GlobalProvider>
);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Card } from '@/components/ui/card';
import { Handle, Position, useReactFlow } from '@xyflow/react';
import useWebsocket from '@/hooks/useWebsocket';
import useNodeData from '@/hooks/useNodeData';
import React from 'react';
import { useGlobalContext } from '@/context/GlobalContext';
import { ArrowUpRight } from 'lucide-react';
Expand All @@ -16,7 +16,7 @@ import {
import SignalGraphView from './signal-graph-full';

export default function SignalGraphNode({ id }: { id?: string }) {
const { renderData } = useWebsocket(20, 10);
const { renderData } = useNodeData(20, 10);

const processedData = renderData;
const reactFlowInstance = useReactFlow();
Expand Down
158 changes: 158 additions & 0 deletions frontend/context/WebSocketContext.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
'use client';

import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react';
import { useGlobalContext } from './GlobalContext';
import { ProcessingConfig } from '@/lib/processing';

export type DataPoint = {
time: string;
signal1: number;
signal2: number;
signal3: number;
signal4: number;
};

type Subscriber = (points: DataPoint[]) => void;

type WebSocketContextType = {
subscribe: (fn: Subscriber) => () => void;
sendProcessingConfig: (config: ProcessingConfig) => void;
};

const WebSocketContext = createContext<WebSocketContextType | undefined>(undefined);

const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = {
apply_bandpass: false,
use_iir: false,
l_freq: null,
h_freq: null,
downsample_factor: null,
sfreq: 256,
n_channels: 4,
};

function formatTimestamp(raw: any): string {
const s = String(raw);
// ISO 8601: "2026-03-11T03:55:22.715574979Z" - extract "03:55:22"
if (s.includes('T')) return s.slice(11, 19);
return s;
}

function normalizeBatch(batch: any): DataPoint[] {
return batch.timestamps.map((time: any, i: number) => ({
time: formatTimestamp(time),
signal1: batch.signals[0][i],
signal2: batch.signals[1][i],
signal3: batch.signals[2][i],
signal4: batch.signals[3][i],
}));
}

export function WebSocketProvider({ children }: { children: ReactNode }) {
const { dataStreaming } = useGlobalContext();
const wsRef = useRef<WebSocket | null>(null);
const processingConfigRef = useRef<ProcessingConfig | null>(null);
const subscribersRef = useRef<Set<Subscriber>>(new Set());
const closingTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const isClosingGracefullyRef = useRef(false);

const subscribe = useCallback((fn: Subscriber) => {
subscribersRef.current.add(fn);
return () => subscribersRef.current.delete(fn);
}, []);

const sendProcessingConfig = useCallback((config: ProcessingConfig) => {
processingConfigRef.current = config;
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(config));
console.log('Sent processing config:', config);
}
}, []);

// Forward processing-config-update events from filter node to backend
useEffect(() => {
const handler = (event: Event) => {
sendProcessingConfig((event as CustomEvent<ProcessingConfig>).detail);
};
window.addEventListener('processing-config-update', handler);
return () => window.removeEventListener('processing-config-update', handler);
}, [sendProcessingConfig]);

// Manage WebSocket lifecycle
useEffect(() => {
if (!dataStreaming) {
if (wsRef.current?.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) {
isClosingGracefullyRef.current = true;
wsRef.current.send('clientClosing');
closingTimeoutRef.current = setTimeout(() => {
console.warn('Timeout: no confirmed closing received. Forcing close.');
wsRef.current?.close();
isClosingGracefullyRef.current = false;
}, 5000);
}
return;
}

if (wsRef.current && wsRef.current.readyState !== WebSocket.CLOSED) return;

console.log('Opening WebSocket connection...');
const ws = new WebSocket('ws://localhost:8080');
wsRef.current = ws;

ws.onopen = () => {
console.log('WebSocket connection opened.');
ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG));
};

ws.onmessage = (event) => {
const message = event.data;
if (message === 'confirmed closing') {
console.log("Received 'confirmed closing' from server.");
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
ws.close();
isClosingGracefullyRef.current = false;
} else {
try {
const points = normalizeBatch(JSON.parse(message));
subscribersRef.current.forEach((fn) => fn(points));
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
}
};

ws.onclose = (event) => {
console.log('WebSocket connection closed:', event.code, event.reason);
wsRef.current = null;
isClosingGracefullyRef.current = false;
};

ws.onerror = () => {
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
isClosingGracefullyRef.current = false;
};

return () => {
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
if (ws.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) {
ws.send('clientClosing');
closingTimeoutRef.current = setTimeout(() => ws.close(), 5000);
} else if (ws.readyState !== WebSocket.CLOSED) {
ws.close();
}
wsRef.current = null;
};
}, [dataStreaming]);

return (
<WebSocketContext.Provider value={{ subscribe, sendProcessingConfig }}>
{children}
</WebSocketContext.Provider>
);
}

export function useWebSocketContext() {
const ctx = useContext(WebSocketContext);
if (!ctx) throw new Error('useWebSocketContext must be used within a WebSocketProvider');
return ctx;
}
45 changes: 45 additions & 0 deletions frontend/hooks/useNodeData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { useEffect, useRef, useState } from 'react';
import { useWebSocketContext, DataPoint } from '@/context/WebSocketContext';
import { useGlobalContext } from '@/context/GlobalContext';

export default function useNodeData(chartSize: number, batchesPerSecond: number) {
const { subscribe } = useWebSocketContext();
const { dataStreaming } = useGlobalContext();
const [renderData, setRenderData] = useState<DataPoint[]>([]);
const bufferRef = useRef<DataPoint[]>([]);

// Subscribe to incoming data points from the shared WebSocket
useEffect(() => {
const unsubscribe = subscribe((points) => {
bufferRef.current.push(...points);
});
return unsubscribe;
}, [subscribe]);

// Drain the buffer into renderData at the node's own rate
useEffect(() => {
if (!dataStreaming || batchesPerSecond <= 0) return;

const intervalTime = 1000 / batchesPerSecond;
const id = setInterval(() => {
if (bufferRef.current.length > 0) {
const batch = bufferRef.current.splice(
0,
Math.min(bufferRef.current.length, chartSize)
);
setRenderData((prev) => [...prev, ...batch].slice(-chartSize));
}
}, intervalTime);

return () => clearInterval(id);
}, [dataStreaming, batchesPerSecond, chartSize]);

// Always clear the buffer on start/stop to prevent backlog buildup.
// renderData is never cleared so the chart holds its last frame when paused
// and new data naturally replaces it as it arrives.
useEffect(() => {
bufferRef.current = [];
}, [dataStreaming]);

return { renderData };
}
Loading
Loading