From 91a1b8cef5f50795718bfef491cae1b9364a138c Mon Sep 17 00:00:00 2001 From: onyillto <56700691+onyillto@users.noreply.github.com> Date: Tue, 31 Mar 2026 07:25:47 +0100 Subject: [PATCH 1/2] Create useSSE.ts created the useSSE hook and ensure the UI correctly reflects the connection state. --- .../src/app/components/providers/useSSE.ts | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 frontend/src/app/components/providers/useSSE.ts diff --git a/frontend/src/app/components/providers/useSSE.ts b/frontend/src/app/components/providers/useSSE.ts new file mode 100644 index 00000000..28eb2eaf --- /dev/null +++ b/frontend/src/app/components/providers/useSSE.ts @@ -0,0 +1,143 @@ +import { useEffect, useRef, useState, useCallback } from "react"; + +/** + * SSEStatus represents the lifecycle of the real-time event stream. + */ +export type SSEStatus = "connecting" | "connected" | "fallback" | "disconnected"; + +interface UseSSEOptions { + /** The SSE endpoint URL. If null, the connection is closed. */ + url: string | null; + /** Callback triggered when a new message is received via SSE. */ + onMessage: (data: T) => void; + /** Optional callback for successful connection. */ + onOpen?: () => void; + /** Optional callback for connection errors. */ + onError?: (err: Event) => void; + /** Callback for fallback polling logic (e.g., refetching data). */ + onPoll?: () => void; + /** Polling interval in milliseconds. Defaults to 30s. */ + pollingInterval?: number; +} + +/** + * useSSE hook provides real-time updates with automatic exponential backoff + * and a polling fallback mechanism for resilience. + */ +export function useSSE({ + url, + onMessage, + onOpen, + onError, + onPoll, + pollingInterval = 30000, +}: UseSSEOptions) { + const [status, setStatus] = useState("disconnected"); + + const eventSourceRef = useRef(null); + const reconnectTimeoutRef = useRef(null); + const pollingIntervalRef = useRef(null); + const backoffRef = useRef(1000); // Start with 1s backoff + + // Store callbacks in a ref to avoid effect re-runs while keeping logic fresh + const callbacks = useRef({ onMessage, onOpen, onError, onPoll }); + useEffect(() => { + callbacks.current = { onMessage, onOpen, onError, onPoll }; + }, [onMessage, onOpen, onError, onPoll]); + + const cleanup = useCallback(() => { + if (eventSourceRef.current) { + eventSourceRef.current.close(); + eventSourceRef.current = null; + } + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + }, []); + + const startPolling = useCallback(() => { + if (pollingIntervalRef.current) return; + + setStatus("fallback"); + if (callbacks.current.onPoll) { + callbacks.current.onPoll(); // Trigger immediate refresh + pollingIntervalRef.current = setInterval(() => { + callbacks.current.onPoll?.(); + }, pollingInterval); + } + }, [pollingInterval]); + + useEffect(() => { + if (!url) { + cleanup(); + setStatus("disconnected"); + return; + } + + let isCancelled = false; + + const connect = () => { + if (isCancelled) return; + + // Close existing SSE but keep polling active while attempting reconnection + if (eventSourceRef.current) { + eventSourceRef.current.close(); + } + + setStatus("connecting"); + + const es = new EventSource(url, { withCredentials: true }); + eventSourceRef.current = es; + + es.onopen = () => { + if (isCancelled) return; + backoffRef.current = 1000; // Reset backoff on success + setStatus("connected"); + + // Stop polling fallback once SSE is live + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + callbacks.current.onOpen?.(); + }; + + es.onmessage = (event) => { + if (isCancelled) return; + try { + const data = JSON.parse(event.data); + callbacks.current.onMessage(data); + } catch { /* Ignore malformed JSON */ } + }; + + es.onerror = (err) => { + if (isCancelled) return; + es.close(); + eventSourceRef.current = null; + callbacks.current.onError?.(err); + + // Fallback to polling while SSE is down + startPolling(); + + // Reconnect SSE with exponential backoff (cap at 30s) + const delay = Math.min(backoffRef.current, 30000); + backoffRef.current = Math.min(delay * 2, 30000); + reconnectTimeoutRef.current = setTimeout(connect, delay); + }; + }; + + connect(); + + return () => { + isCancelled = true; + cleanup(); + }; + }, [url, pollingInterval, cleanup, startPolling]); + + return status; +} \ No newline at end of file From 080feacb6315f2d80bc3e3a5bbb2bfb3eacd61c2 Mon Sep 17 00:00:00 2001 From: onyillto <56700691+onyillto@users.noreply.github.com> Date: Tue, 31 Mar 2026 07:37:17 +0100 Subject: [PATCH 2/2] Update useSSE.ts --- .../src/app/components/providers/useSSE.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/frontend/src/app/components/providers/useSSE.ts b/frontend/src/app/components/providers/useSSE.ts index 28eb2eaf..1a9bcc64 100644 --- a/frontend/src/app/components/providers/useSSE.ts +++ b/frontend/src/app/components/providers/useSSE.ts @@ -21,7 +21,7 @@ interface UseSSEOptions { } /** - * useSSE hook provides real-time updates with automatic exponential backoff + * useSSE hook provides real-time updates with automatic exponential backoff * and a polling fallback mechanism for resilience. */ export function useSSE({ @@ -33,7 +33,7 @@ export function useSSE({ pollingInterval = 30000, }: UseSSEOptions) { const [status, setStatus] = useState("disconnected"); - + const eventSourceRef = useRef(null); const reconnectTimeoutRef = useRef(null); const pollingIntervalRef = useRef(null); @@ -62,7 +62,7 @@ export function useSSE({ const startPolling = useCallback(() => { if (pollingIntervalRef.current) return; - + setStatus("fallback"); if (callbacks.current.onPoll) { callbacks.current.onPoll(); // Trigger immediate refresh @@ -83,12 +83,12 @@ export function useSSE({ const connect = () => { if (isCancelled) return; - + // Close existing SSE but keep polling active while attempting reconnection if (eventSourceRef.current) { eventSourceRef.current.close(); } - + setStatus("connecting"); const es = new EventSource(url, { withCredentials: true }); @@ -98,7 +98,7 @@ export function useSSE({ if (isCancelled) return; backoffRef.current = 1000; // Reset backoff on success setStatus("connected"); - + // Stop polling fallback once SSE is live if (pollingIntervalRef.current) { clearInterval(pollingIntervalRef.current); @@ -112,7 +112,9 @@ export function useSSE({ try { const data = JSON.parse(event.data); callbacks.current.onMessage(data); - } catch { /* Ignore malformed JSON */ } + } catch { + /* Ignore malformed JSON */ + } }; es.onerror = (err) => { @@ -140,4 +142,4 @@ export function useSSE({ }, [url, pollingInterval, cleanup, startPolling]); return status; -} \ No newline at end of file +}