Skip to content

Commit 7f6b4ec

Browse files
committed
feat(sse): add real-time task status updates (RML-716)
- Enhanced SSE endpoint to include current task status in events - Modified db.getRecentTaskEvents to join with tasks table - Updated frontend SSE types to include taskStatus field - Task store now updates task status in-place from SSE events - Full refresh only triggered for terminal events or new tasks This reduces API load and provides instant status updates in the dashboard without polling.
1 parent 7ecfdff commit 7f6b4ec

4 files changed

Lines changed: 51 additions & 14 deletions

File tree

packages/api/src/integrations/db.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -398,34 +398,42 @@ export const db = {
398398
},
399399
taskId?: string,
400400
limit: number = 50,
401-
): Promise<TaskEvent[]> {
401+
): Promise<(TaskEvent & { taskStatus?: string })[]> {
402402
const sql = getDb();
403403
let results;
404404

405+
// Join with tasks table to get current task status for SSE updates
405406
if (taskId) {
406407
results = await sql`
407-
SELECT * FROM task_events
408+
SELECT e.*, t.status as task_status
409+
FROM task_events e
410+
LEFT JOIN tasks t ON e.task_id = t.id
408411
WHERE (
409-
created_at > ${since.createdAt}
410-
OR (created_at = ${since.createdAt} AND id > ${since.id})
412+
e.created_at > ${since.createdAt}
413+
OR (e.created_at = ${since.createdAt} AND e.id > ${since.id})
411414
)
412-
AND task_id = ${taskId}
413-
ORDER BY created_at ASC, id ASC
415+
AND e.task_id = ${taskId}
416+
ORDER BY e.created_at ASC, e.id ASC
414417
LIMIT ${limit}
415418
`;
416419
} else {
417420
results = await sql`
418-
SELECT * FROM task_events
421+
SELECT e.*, t.status as task_status
422+
FROM task_events e
423+
LEFT JOIN tasks t ON e.task_id = t.id
419424
WHERE (
420-
created_at > ${since.createdAt}
421-
OR (created_at = ${since.createdAt} AND id > ${since.id})
425+
e.created_at > ${since.createdAt}
426+
OR (e.created_at = ${since.createdAt} AND e.id > ${since.id})
422427
)
423-
ORDER BY created_at ASC, id ASC
428+
ORDER BY e.created_at ASC, e.id ASC
424429
LIMIT ${limit}
425430
`;
426431
}
427432

428-
return results.map(this.mapTaskEvent);
433+
return results.map((row: any) => ({
434+
...this.mapTaskEvent(row),
435+
taskStatus: row.task_status,
436+
}));
429437
},
430438

431439
// ============================================

packages/api/src/router.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3629,6 +3629,8 @@ route("GET", "/api/logs/stream", async (req) => {
36293629
level: getLogLevel(event.eventType),
36303630
tokensUsed: event.tokensUsed,
36313631
durationMs: event.durationMs,
3632+
// Include current task status for real-time UI updates (RML-716)
3633+
taskStatus: (event as any).taskStatus,
36323634
})}\n\n`,
36333635
),
36343636
);

packages/web/src/services/sse.service.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ export interface SSEEvent {
99
level?: "info" | "warn" | "error" | "success";
1010
tokensUsed?: number;
1111
durationMs?: number;
12+
// Current task status for real-time updates (RML-716)
13+
taskStatus?: string;
1214
}
1315

1416
export type SSEEventHandler = (event: SSEEvent) => void;
@@ -94,7 +96,7 @@ export class SSEService {
9496
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
9597

9698
console.log(
97-
`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`
99+
`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
98100
);
99101

100102
setTimeout(() => {

packages/web/src/stores/task.store.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,35 @@ export class TaskStore {
435435
},
436436
...this.liveEvents,
437437
].slice(0, 100);
438+
439+
// RML-716: Update task status in-place if provided (avoids full refresh)
440+
if (event.taskStatus && event.taskId) {
441+
const taskIndex = this.tasks.findIndex((t) => t.id === event.taskId);
442+
if (taskIndex !== -1) {
443+
const currentStatus = this.tasks[taskIndex].status;
444+
if (currentStatus !== event.taskStatus) {
445+
this.tasks[taskIndex] = {
446+
...this.tasks[taskIndex],
447+
status: event.taskStatus as TaskStatus,
448+
updated_at: event.timestamp || new Date().toISOString(),
449+
};
450+
}
451+
}
452+
}
438453
});
439454

440-
// Debounce task refresh (avoid hammering API on rapid events)
441-
this.debouncedRefresh();
455+
// Only do full refresh for terminal events or if task not found locally
456+
const terminalEvents = [
457+
"PR_OPENED",
458+
"COMPLETED",
459+
"FAILED",
460+
"BATCH_PR_CREATED",
461+
];
462+
const taskExists = this.tasks.some((t) => t.id === event.taskId);
463+
464+
if (!taskExists || terminalEvents.includes(event.eventType || "")) {
465+
this.debouncedRefresh();
466+
}
442467
}
443468
}
444469

0 commit comments

Comments
 (0)