Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions frontend/src/hooks/useTaskSubscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import { useEffect, useRef, useState, useCallback } from 'react'
import { API_BASE, getTaskStatus } from '../api'

export interface UseTaskSubscriptionOptions {
taskId: string
onStatus?: (status: string) => void
onPhase?: (phase: string) => void
onOutput?: (chunk: string) => void
pollingInterval?: number
maxReconnectAttempts?: number
reconnectBaseDelay?: number
}

export interface UseTaskSubscriptionResult {
isConnected: boolean
isPolling: boolean
error: string | null
}

export function useTaskSubscription({
taskId,
onStatus,
onPhase,
onOutput,
pollingInterval = 5000,
maxReconnectAttempts = 5,
reconnectBaseDelay = 1000,
}: UseTaskSubscriptionOptions): UseTaskSubscriptionResult {
const [isConnected, setIsConnected] = useState(false)
const [isPolling, setIsPolling] = useState(false)
const [error, setError] = useState<string | null>(null)

const onStatusRef = useRef(onStatus)
const onPhaseRef = useRef(onPhase)
const onOutputRef = useRef(onOutput)
const esRef = useRef<EventSource | null>(null)
const pollIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const lastStatusRef = useRef<string | null>(null)
const seenOutputsRef = useRef<Set<string>>(new Set())
const cleanupRef = useRef(false)

onStatusRef.current = onStatus
onPhaseRef.current = onPhase
onOutputRef.current = onOutput

const cleanupAll = useCallback(() => {
cleanupRef.current = true
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
if (pollIntervalRef.current) {
clearInterval(pollIntervalRef.current)
pollIntervalRef.current = null
}
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])

const startPolling = useCallback(() => {
if (cleanupRef.current) return
setIsPolling(true)
setIsConnected(false)
pollIntervalRef.current = setInterval(async () => {
if (cleanupRef.current) return
try {
const data = await getTaskStatus(taskId) as { status?: string }
if (cleanupRef.current) return
if (data.status && data.status !== lastStatusRef.current) {
lastStatusRef.current = data.status
onStatusRef.current?.(data.status)
}
if (data.status && ['completed', 'failed', 'cancelled'].includes(data.status)) {
cleanupAll()
setIsPolling(false)
}
} catch {
}
}, pollingInterval)
}, [taskId, pollingInterval, cleanupAll])

const connectSSE = useCallback(() => {
if (cleanupRef.current) return
if (esRef.current) {
esRef.current.close()
esRef.current = null
}

const url = `${API_BASE}/task/${taskId}/stream`
const es = new EventSource(url)
esRef.current = es

es.addEventListener('status', (e: MessageEvent) => {
if (cleanupRef.current) return
try {
const data = JSON.parse(e.data) as { status: string; scan_phase?: string }
if (data.scan_phase) {
onPhaseRef.current?.(data.scan_phase)
}
if (data.status && data.status !== lastStatusRef.current) {
lastStatusRef.current = data.status
onStatusRef.current?.(data.status)
}
if (['completed', 'failed', 'cancelled'].includes(data.status)) {
cleanupAll()
setIsConnected(false)
setIsPolling(false)
}
} catch {
}
})

es.addEventListener('phase', (e: MessageEvent) => {
if (cleanupRef.current) return
try {
const data = JSON.parse(e.data) as { scan_phase: string }
if (data.scan_phase) {
onPhaseRef.current?.(data.scan_phase)
}
} catch {
}
})

es.addEventListener('output', (e: MessageEvent) => {
if (cleanupRef.current) return
try {
const data = JSON.parse(e.data) as { chunk: string }
if (data.chunk && !seenOutputsRef.current.has(data.chunk)) {
seenOutputsRef.current.add(data.chunk)
onOutputRef.current?.(data.chunk)
}
} catch {
}
})

es.onerror = () => {
if (cleanupRef.current) return
es.close()
esRef.current = null
setIsConnected(false)
setError('SSE connection lost')

if (reconnectAttemptRef.current < maxReconnectAttempts) {
const delay = reconnectBaseDelay * Math.pow(2, reconnectAttemptRef.current)
reconnectAttemptRef.current++
reconnectTimerRef.current = setTimeout(() => {
if (!cleanupRef.current) connectSSE()
}, delay)
} else {
startPolling()
}
}

es.onopen = () => {
if (cleanupRef.current) return
reconnectAttemptRef.current = 0
setIsConnected(true)
setIsPolling(false)
setError(null)
}
}, [taskId, maxReconnectAttempts, reconnectBaseDelay, cleanupAll, startPolling])

useEffect(() => {
cleanupRef.current = false
lastStatusRef.current = null
seenOutputsRef.current = new Set()
reconnectAttemptRef.current = 0

connectSSE()

return () => {
cleanupAll()
}
}, [taskId, connectSSE, cleanupAll])

return { isConnected, isPolling, error }
}
58 changes: 16 additions & 42 deletions frontend/src/pages/TaskDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Refresh01Icon,
} from '@hugeicons/core-free-icons'
import { API_BASE, getPluginSchema, getTaskResult, getTaskStatus, PluginFieldSchema, PluginSchemaResponse, startTask } from '../api'
import { useTaskSubscription } from '../hooks/useTaskSubscription'
import { routes, routePath } from '../routes'
import { parseDateSafe, formatDateLong, formatLocaleTime } from '../utils/date'
import {
Expand Down Expand Up @@ -361,50 +362,23 @@ export default function TaskDetails() {

useEffect(() => {
loadTask()
}, [taskId])

const es = new EventSource(`${API_BASE}/task/${taskId}/stream`)

es.addEventListener('status', (e) => {
try {
const data = JSON.parse(e.data)
setTask((prev: Task | null) => prev ? { ...prev, status: data.status } : null)
if (data.scan_phase) {
setScanPhase(data.scan_phase)
}
if (['completed', 'failed', 'cancelled'].includes(data.status)) {
es.close()
loadTask()
}
} catch (err) {
console.error("Status stream error", err)
}
})

es.addEventListener('phase', (e) => {
try {
const data = JSON.parse(e.data)
setScanPhase(data.scan_phase)
} catch (err) {
console.error("Phase stream error", err)
useTaskSubscription({
taskId: taskId!,
onStatus: (status) => {
setTask((prev: Task | null) => prev ? { ...prev, status } : null)
if (['completed', 'failed', 'cancelled'].includes(status)) {
loadTask()
}
})

es.addEventListener('output', (e) => {
try {
const data = JSON.parse(e.data)
setRawOutput(prev => prev + data.chunk)
} catch (err) {
console.error("Output stream error", err)
}
})

es.onerror = (err) => {
console.error("EventSource error:", err)
es.close()
}

return () => es.close()
}, [taskId])
},
onPhase: (phase) => {
setScanPhase(phase)
},
onOutput: (chunk) => {
setRawOutput((prev) => prev + chunk)
},
})

async function loadTask() {
try {
Expand Down
Loading
Loading