From 25c8ba77dae1a1c3d1af4630a7d75d12b0afa465 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 16:13:33 +0800 Subject: [PATCH 01/12] fix(hub,web): deduplicate sessions by agent session ID When multiple CLI wrappers independently resume the same Codex thread, each generates a random tag, causing the hub to create duplicate session records for a single underlying thread. This leads to duplicate conversations in the web UI and messages routing to the wrong session. Add two-layer deduplication: - Hub: when a metadata update sets an agent session ID (codexSessionId, claudeSessionId, etc.) that already exists on another session in the same namespace, automatically merge the duplicate into the current session using the existing mergeSessions logic. - Web: deduplicate the session list display by agentSessionId as a safety net, keeping the active/most-recent session visible. Closes #446 --- hub/src/sync/sessionCache.ts | 44 +++++++++++++ hub/src/sync/sessionModel.test.ts | 100 +++++++++++++++++++++++++++++ hub/src/sync/syncEngine.ts | 16 +++++ shared/src/sessionSummary.ts | 9 ++- web/src/components/SessionList.tsx | 31 ++++++++- 5 files changed, 198 insertions(+), 2 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index ea4d3d575..701e9e0e1 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -10,6 +10,7 @@ export class SessionCache { private readonly sessions: Map = new Map() private readonly lastBroadcastAtBySessionId: Map = new Map() private readonly todoBackfillAttemptedSessionIds: Set = new Set() + private readonly deduplicateInProgress: Set = new Set() constructor( private readonly store: Store, @@ -537,4 +538,47 @@ export class SessionCache { return changed ? merged : newMetadata } + + private extractAgentSessionId( + metadata: NonNullable + ): { field: 'codexSessionId' | 'claudeSessionId' | 'geminiSessionId' | 'opencodeSessionId' | 'cursorSessionId'; value: string } | null { + if (metadata.codexSessionId) return { field: 'codexSessionId', value: metadata.codexSessionId } + if (metadata.claudeSessionId) return { field: 'claudeSessionId', value: metadata.claudeSessionId } + if (metadata.geminiSessionId) return { field: 'geminiSessionId', value: metadata.geminiSessionId } + if (metadata.opencodeSessionId) return { field: 'opencodeSessionId', value: metadata.opencodeSessionId } + if (metadata.cursorSessionId) return { field: 'cursorSessionId', value: metadata.cursorSessionId } + return null + } + + async deduplicateByAgentSessionId(sessionId: string): Promise { + const session = this.sessions.get(sessionId) + if (!session?.metadata) return + + const agentId = this.extractAgentSessionId(session.metadata) + if (!agentId) return + + if (this.deduplicateInProgress.has(agentId.value)) return + this.deduplicateInProgress.add(agentId.value) + + try { + const duplicates: string[] = [] + for (const [existingId, existing] of this.sessions) { + if (existingId === sessionId) continue + if (existing.namespace !== session.namespace) continue + if (!existing.metadata) continue + if (existing.metadata[agentId.field] !== agentId.value) continue + duplicates.push(existingId) + } + + for (const duplicateId of duplicates) { + try { + await this.mergeSessions(duplicateId, sessionId, session.namespace) + } catch { + // best-effort: duplicate remains if merge fails + } + } + } finally { + this.deduplicateInProgress.delete(agentId.value) + } + } } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index a923118b2..775729638 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -440,4 +440,104 @@ describe('session model', () => { engine.stop() } }) + + describe('session dedup by agent session ID', () => { + it('merges duplicate when codexSessionId collides', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + // Add a message to s1 + store.messages.addMessage(s1.id, { type: 'text', text: 'hello from s1' }, 'local-1') + + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + expect(s1.id).not.toBe(s2.id) + + await cache.deduplicateByAgentSessionId(s2.id) + + expect(cache.getSession(s1.id)).toBeUndefined() + expect(cache.getSession(s2.id)).toBeDefined() + + const messages = store.messages.getMessages(s2.id, 100) + expect(messages.length).toBeGreaterThanOrEqual(1) + }) + + it('preserves sessions with different agent session IDs', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-Y' }, + null, + 'default' + ) + + await cache.deduplicateByAgentSessionId(s2.id) + + expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() + }) + + it('does not merge across namespaces', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'ns1' + ) + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'ns2' + ) + + await cache.deduplicateByAgentSessionId(s2.id) + + expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() + }) + + it('no-op when session has no agent session ID', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex' }, + null, + 'default' + ) + + await cache.deduplicateByAgentSessionId(s1.id) + + expect(cache.getSession(s1.id)).toBeDefined() + }) + }) }) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 112759520..a0ae824fa 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -163,7 +163,12 @@ export class SyncEngine { handleRealtimeEvent(event: SyncEvent): void { if (event.type === 'session-updated' && event.sessionId) { + const before = this.sessionCache.getSession(event.sessionId) this.sessionCache.refreshSession(event.sessionId) + const after = this.sessionCache.getSession(event.sessionId) + if (after?.metadata && !this.hasSameAgentSessionIds(before?.metadata ?? null, after.metadata)) { + void this.sessionCache.deduplicateByAgentSessionId(event.sessionId).catch(() => {}) + } return } @@ -441,6 +446,17 @@ export class SyncEngine { return { type: 'success', sessionId: spawnResult.sessionId } } + private hasSameAgentSessionIds( + prev: Session['metadata'] | null, + next: NonNullable + ): boolean { + return (prev?.codexSessionId ?? null) === (next.codexSessionId ?? null) + && (prev?.claudeSessionId ?? null) === (next.claudeSessionId ?? null) + && (prev?.geminiSessionId ?? null) === (next.geminiSessionId ?? null) + && (prev?.opencodeSessionId ?? null) === (next.opencodeSessionId ?? null) + && (prev?.cursorSessionId ?? null) === (next.cursorSessionId ?? null) + } + async waitForSessionActive(sessionId: string, timeoutMs: number = 15_000): Promise { const start = Date.now() while (Date.now() - start < timeoutMs) { diff --git a/shared/src/sessionSummary.ts b/shared/src/sessionSummary.ts index e717a57dd..86298208c 100644 --- a/shared/src/sessionSummary.ts +++ b/shared/src/sessionSummary.ts @@ -7,6 +7,7 @@ export type SessionSummaryMetadata = { summary?: { text: string } flavor?: string | null worktree?: WorktreeMetadata + agentSessionId?: string } export type SessionSummary = { @@ -31,7 +32,13 @@ export function toSessionSummary(session: Session): SessionSummary { machineId: session.metadata.machineId ?? undefined, summary: session.metadata.summary ? { text: session.metadata.summary.text } : undefined, flavor: session.metadata.flavor ?? null, - worktree: session.metadata.worktree + worktree: session.metadata.worktree, + agentSessionId: session.metadata.codexSessionId + ?? session.metadata.claudeSessionId + ?? session.metadata.geminiSessionId + ?? session.metadata.opencodeSessionId + ?? session.metadata.cursorSessionId + ?? undefined } : null const todoProgress = session.todos?.length ? { diff --git a/web/src/components/SessionList.tsx b/web/src/components/SessionList.tsx index 9fef3c97b..4b025647d 100644 --- a/web/src/components/SessionList.tsx +++ b/web/src/components/SessionList.tsx @@ -39,6 +39,35 @@ function getGroupDisplayName(directory: string): string { export const UNKNOWN_MACHINE_ID = '__unknown__' +function deduplicateSessionsByAgentId(sessions: SessionSummary[]): SessionSummary[] { + const byAgentId = new Map() + const result: SessionSummary[] = [] + + for (const session of sessions) { + const agentId = session.metadata?.agentSessionId + if (!agentId) { + result.push(session) + continue + } + const group = byAgentId.get(agentId) + if (group) { + group.push(session) + } else { + byAgentId.set(agentId, [session]) + } + } + + for (const group of byAgentId.values()) { + group.sort((a, b) => { + if (a.active !== b.active) return a.active ? -1 : 1 + return b.updatedAt - a.updatedAt + }) + result.push(group[0]) + } + + return result +} + function groupSessionsByDirectory(sessions: SessionSummary[]): SessionGroup[] { const groups = new Map() @@ -453,7 +482,7 @@ export function SessionList(props: { const { t } = useTranslation() const { renderHeader = true, api, selectedSessionId, machineLabelsById = {} } = props const groups = useMemo( - () => groupSessionsByDirectory(props.sessions), + () => groupSessionsByDirectory(deduplicateSessionsByAgentId(props.sessions)), [props.sessions] ) const [collapseOverrides, setCollapseOverrides] = useState>( From 8d4030f87c3c7ecb69327e5aba37f89b54d3f470 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 16:20:26 +0800 Subject: [PATCH 02/12] chore: add review-driven comments for dedup clarity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Explain single-threaded assumption in before/after metadata comparison - Document merge direction rationale (duplicate → active session) - Document deduplicateInProgress guard as known limitation - Add catch comment explaining web safety net fallback --- hub/src/sync/sessionCache.ts | 5 +++++ hub/src/sync/syncEngine.ts | 6 +++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 701e9e0e1..e0f825442 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -557,6 +557,8 @@ export class SessionCache { const agentId = this.extractAgentSessionId(session.metadata) if (!agentId) return + // Guard: skip if another dedup for this agent ID is already in progress. + // A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates. if (this.deduplicateInProgress.has(agentId.value)) return this.deduplicateInProgress.add(agentId.value) @@ -570,6 +572,9 @@ export class SessionCache { duplicates.push(existingId) } + // Merge direction: duplicate → current session. The current session is the one + // whose CLI just connected and set the agent session ID, so its Socket.IO room + // is active. Messages from duplicates are moved into it; no data is lost. for (const duplicateId of duplicates) { try { await this.mergeSessions(duplicateId, sessionId, session.namespace) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index a0ae824fa..79c6f4418 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -163,11 +163,15 @@ export class SyncEngine { handleRealtimeEvent(event: SyncEvent): void { if (event.type === 'session-updated' && event.sessionId) { + // Snapshot agent session IDs before refresh — safe because JS is single-threaded + // and refreshSession replaces the Map entry with a new object. const before = this.sessionCache.getSession(event.sessionId) this.sessionCache.refreshSession(event.sessionId) const after = this.sessionCache.getSession(event.sessionId) if (after?.metadata && !this.hasSameAgentSessionIds(before?.metadata ?? null, after.metadata)) { - void this.sessionCache.deduplicateByAgentSessionId(event.sessionId).catch(() => {}) + void this.sessionCache.deduplicateByAgentSessionId(event.sessionId).catch(() => { + // best-effort: dedup failure is harmless, web-side safety net hides remaining duplicates + }) } return } From 6a09b583367f1fc878f026746d8a878c9ed549ae Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 16:35:59 +0800 Subject: [PATCH 03/12] fix: address review feedback from bot, Opus, and Codex - Skip active duplicates during hub-side dedup to avoid deleting sessions with live CLI sockets and pending agent state - Pass selectedSessionId into web dedup sort to prevent hiding the session the user is currently viewing - Add test for active-duplicate-not-merged case --- hub/src/sync/sessionCache.ts | 4 ++++ hub/src/sync/sessionModel.test.ts | 29 +++++++++++++++++++++++++++++ web/src/components/SessionList.tsx | 9 ++++++--- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index e0f825442..33a5ad3c8 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -569,6 +569,10 @@ export class SessionCache { if (existing.namespace !== session.namespace) continue if (!existing.metadata) continue if (existing.metadata[agentId.field] !== agentId.value) continue + // Only merge inactive duplicates. Active ones still have a live CLI socket + // whose keepalive/messages would fail if we deleted their session record. + // The web-side display dedup hides active duplicates from the UI. + if (existing.active) continue duplicates.push(existingId) } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 775729638..1c8de9545 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -539,5 +539,34 @@ describe('session model', () => { expect(cache.getSession(s1.id)).toBeDefined() }) + + it('does not merge active duplicates', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + // Mark s1 as active (simulating a live CLI connection) + cache.handleSessionAlive({ sid: s1.id, time: Date.now(), thinking: false }) + + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + await cache.deduplicateByAgentSessionId(s2.id) + + // s1 is active, so it should NOT be merged/deleted + expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() + }) }) }) diff --git a/web/src/components/SessionList.tsx b/web/src/components/SessionList.tsx index 4b025647d..06be10729 100644 --- a/web/src/components/SessionList.tsx +++ b/web/src/components/SessionList.tsx @@ -39,7 +39,7 @@ function getGroupDisplayName(directory: string): string { export const UNKNOWN_MACHINE_ID = '__unknown__' -function deduplicateSessionsByAgentId(sessions: SessionSummary[]): SessionSummary[] { +function deduplicateSessionsByAgentId(sessions: SessionSummary[], selectedSessionId?: string | null): SessionSummary[] { const byAgentId = new Map() const result: SessionSummary[] = [] @@ -59,6 +59,9 @@ function deduplicateSessionsByAgentId(sessions: SessionSummary[]): SessionSummar for (const group of byAgentId.values()) { group.sort((a, b) => { + // Keep the currently selected session visible so the sidebar row doesn't vanish + if (a.id === selectedSessionId) return -1 + if (b.id === selectedSessionId) return 1 if (a.active !== b.active) return a.active ? -1 : 1 return b.updatedAt - a.updatedAt }) @@ -482,8 +485,8 @@ export function SessionList(props: { const { t } = useTranslation() const { renderHeader = true, api, selectedSessionId, machineLabelsById = {} } = props const groups = useMemo( - () => groupSessionsByDirectory(deduplicateSessionsByAgentId(props.sessions)), - [props.sessions] + () => groupSessionsByDirectory(deduplicateSessionsByAgentId(props.sessions, selectedSessionId)), + [props.sessions, selectedSessionId] ) const [collapseOverrides, setCollapseOverrides] = useState>( () => new Map() From 11084cfcd8259d1b264372ed151489830a2fb6c3 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 16:48:22 +0800 Subject: [PATCH 04/12] fix: retry dedup on session-end and preserve agentState in merge - Trigger dedup when a session ends (handleSessionEnd), so active duplicates skipped during earlier dedup get merged once they disconnect - Preserve agentState from old session during mergeSessions when the new session has no agentState (mirrors existing model/effort/todos preservation pattern) - Extract triggerDedupIfNeeded helper for reuse across trigger points --- hub/src/sync/sessionCache.ts | 9 +++++++++ hub/src/sync/syncEngine.ts | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 33a5ad3c8..c9e419797 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -471,6 +471,15 @@ export class SessionCache { ) } + if (oldStored.agentState !== null && newStored.agentState === null) { + this.store.sessions.updateSessionAgentState( + newSessionId, + oldStored.agentState, + newStored.agentStateVersion, + namespace + ) + } + if (oldStored.teamState !== null && oldStored.teamStateUpdatedAt !== null) { this.store.sessions.setSessionTeamState( newSessionId, diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 79c6f4418..814160e65 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -206,6 +206,9 @@ export class SyncEngine { handleSessionEnd(payload: { sid: string; time: number }): void { this.sessionCache.handleSessionEnd(payload) + // Retry dedup now that this session is inactive — a prior dedup may have + // skipped it because it was still active at the time. + this.triggerDedupIfNeeded(payload.sid) } handleBackgroundTaskDelta(sessionId: string, delta: { started: number; completed: number }): void { @@ -461,6 +464,15 @@ export class SyncEngine { && (prev?.cursorSessionId ?? null) === (next.cursorSessionId ?? null) } + private triggerDedupIfNeeded(sessionId: string): void { + const session = this.sessionCache.getSession(sessionId) + if (session?.metadata) { + void this.sessionCache.deduplicateByAgentSessionId(sessionId).catch(() => { + // best-effort: web-side safety net hides remaining duplicates + }) + } + } + async waitForSessionActive(sessionId: string, timeoutMs: number = 15_000): Promise { const start = Date.now() while (Date.now() - start < timeoutMs) { From 0f4536c62922c88f40fe8214eb804ef3998bd0be Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 17:22:44 +0800 Subject: [PATCH 05/12] fix(web): prefer active session over selected in dedup sort Active session always wins the dedup tie-break so the live connection is never hidden in favor of a selected inactive duplicate. Among inactive duplicates the selected one is still preferred. --- hub/src/sync/sessionCache.ts | 2 ++ web/src/components/SessionList.tsx | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index c9e419797..6dd34a3de 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -471,6 +471,8 @@ export class SessionCache { ) } + // Preserve agentState if the new session has none. Only inactive duplicates reach + // this point (active ones are skipped), so the old agentState is typically stale. if (oldStored.agentState !== null && newStored.agentState === null) { this.store.sessions.updateSessionAgentState( newSessionId, diff --git a/web/src/components/SessionList.tsx b/web/src/components/SessionList.tsx index 06be10729..79aa54c09 100644 --- a/web/src/components/SessionList.tsx +++ b/web/src/components/SessionList.tsx @@ -59,10 +59,11 @@ function deduplicateSessionsByAgentId(sessions: SessionSummary[], selectedSessio for (const group of byAgentId.values()) { group.sort((a, b) => { - // Keep the currently selected session visible so the sidebar row doesn't vanish + // Active session always wins — it's the live connection + if (a.active !== b.active) return a.active ? -1 : 1 + // Among inactive duplicates, keep the selected one visible if (a.id === selectedSessionId) return -1 if (b.id === selectedSessionId) return 1 - if (a.active !== b.active) return a.active ? -1 : 1 return b.updatedAt - a.updatedAt }) result.push(group[0]) From 5b970108e138152f4f11d15a2f8dc258c8473d0f Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 17:38:18 +0800 Subject: [PATCH 06/12] fix: dedup on inactivity timeout and deep-merge agentState - expireInactive now returns expired session IDs so SyncEngine can trigger dedup for sessions that timed out (crash/network drop) instead of only on explicit session-end - mergeSessions now deep-merges agentState requests/completedRequests from both sessions instead of only copying when new is null --- hub/src/sync/sessionCache.ts | 50 ++++++++++++++++++++++++++++-------- hub/src/sync/syncEngine.ts | 5 +++- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 6dd34a3de..4fc36c01c 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -281,16 +281,20 @@ export class SessionCache { this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false, thinking: false, backgroundTaskCount: 0 } }) } - expireInactive(now: number = Date.now()): void { + expireInactive(now: number = Date.now()): string[] { const sessionTimeoutMs = 30_000 + const expired: string[] = [] for (const session of this.sessions.values()) { if (!session.active) continue if (now - session.activeAt <= sessionTimeoutMs) continue session.active = false session.thinking = false + expired.push(session.id) this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false } }) } + + return expired } applySessionConfig( @@ -471,15 +475,20 @@ export class SessionCache { ) } - // Preserve agentState if the new session has none. Only inactive duplicates reach - // this point (active ones are skipped), so the old agentState is typically stale. - if (oldStored.agentState !== null && newStored.agentState === null) { - this.store.sessions.updateSessionAgentState( - newSessionId, - oldStored.agentState, - newStored.agentStateVersion, - namespace - ) + // Merge agentState: union requests/completedRequests from both sessions so pending + // approvals on the duplicate are not lost. Only inactive duplicates reach this point + // (active ones are skipped by deduplicateByAgentSessionId). + const mergedAgentState = this.mergeAgentState(oldStored.agentState, newStored.agentState) + if (mergedAgentState !== null && mergedAgentState !== newStored.agentState) { + const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) + if (latest) { + this.store.sessions.updateSessionAgentState( + newSessionId, + mergedAgentState, + latest.agentStateVersion, + namespace + ) + } } if (oldStored.teamState !== null && oldStored.teamStateUpdatedAt !== null) { @@ -550,6 +559,27 @@ export class SessionCache { return changed ? merged : newMetadata } + private mergeAgentState(oldState: unknown | null, newState: unknown | null): unknown | null { + if (oldState === null) return newState + if (newState === null) return oldState + + const oldObj = oldState as Record + const newObj = newState as Record + + return { + ...oldObj, + ...newObj, + requests: { + ...((oldObj.requests as Record | undefined) ?? {}), + ...((newObj.requests as Record | undefined) ?? {}) + }, + completedRequests: { + ...((oldObj.completedRequests as Record | undefined) ?? {}), + ...((newObj.completedRequests as Record | undefined) ?? {}) + } + } + } + private extractAgentSessionId( metadata: NonNullable ): { field: 'codexSessionId' | 'claudeSessionId' | 'geminiSessionId' | 'opencodeSessionId' | 'cursorSessionId'; value: string } | null { diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 814160e65..5105e780c 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -220,7 +220,10 @@ export class SyncEngine { } private expireInactive(): void { - this.sessionCache.expireInactive() + const expired = this.sessionCache.expireInactive() + for (const sessionId of expired) { + this.triggerDedupIfNeeded(sessionId) + } this.machineCache.expireInactive() } From 722cb161901efd7347e0a0ccc6e285b570b43e94 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 17:47:58 +0800 Subject: [PATCH 07/12] fix: exclude completed requests from merged pending set Filter out request IDs that already appear in completedRequests when merging agentState, preventing completed permission prompts from resurrecting as pending after session dedup. --- hub/src/sync/sessionCache.ts | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 4fc36c01c..c1d914e75 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -566,18 +566,20 @@ export class SessionCache { const oldObj = oldState as Record const newObj = newState as Record - return { - ...oldObj, - ...newObj, - requests: { + const completedRequests = { + ...((oldObj.completedRequests as Record | undefined) ?? {}), + ...((newObj.completedRequests as Record | undefined) ?? {}) + } + // Filter out requests that are already completed to avoid resurrecting them as pending + const completedIds = new Set(Object.keys(completedRequests)) + const requests = Object.fromEntries( + Object.entries({ ...((oldObj.requests as Record | undefined) ?? {}), ...((newObj.requests as Record | undefined) ?? {}) - }, - completedRequests: { - ...((oldObj.completedRequests as Record | undefined) ?? {}), - ...((newObj.completedRequests as Record | undefined) ?? {}) - } - } + }).filter(([id]) => !completedIds.has(id)) + ) + + return { ...oldObj, ...newObj, requests, completedRequests } } private extractAgentSessionId( From 55251af6b13670cb31ae7f80faadbdb40d82169d Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 17:56:38 +0800 Subject: [PATCH 08/12] fix: guard resume merge against prior auto-dedup The automatic dedup (triggered when the spawned CLI sets its agent session ID) can delete the old session before resumeSession reaches its own explicit mergeSessions call. Skip the merge if the old session no longer exists instead of failing the resume with a false error. --- hub/src/sync/syncEngine.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 5105e780c..6ad8ab75b 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -445,11 +445,17 @@ export class SyncEngine { } if (spawnResult.sessionId !== access.sessionId) { - try { - await this.sessionCache.mergeSessions(access.sessionId, spawnResult.sessionId, namespace) - } catch (error) { - const message = error instanceof Error ? error.message : 'Failed to merge resumed session' - return { type: 'error', message, code: 'resume_failed' } + // The old session may have already been merged by the automatic dedup path + // (triggered when the spawned CLI sets its agent session ID in metadata). + // Only attempt the explicit merge if the old session still exists. + const oldSession = this.sessionCache.getSessionByNamespace(access.sessionId, namespace) + if (oldSession) { + try { + await this.sessionCache.mergeSessions(access.sessionId, spawnResult.sessionId, namespace) + } catch (error) { + const message = error instanceof Error ? error.message : 'Failed to merge resumed session' + return { type: 'error', message, code: 'resume_failed' } + } } } From e3a17424334398f50b87f5feea6d9030ee09782d Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 18:36:43 +0800 Subject: [PATCH 09/12] test: add coverage for dedup retry paths and web dedup sort Hub tests: - session-end triggers dedup retry for previously-active duplicates - inactivity timeout expiry triggers dedup retry - agentState deep merge filters completed requests from pending set Web tests: - basic dedup by agentSessionId - active session wins over inactive duplicate - selected session preferred among inactive duplicates - active always wins over selected inactive - sessions without agentSessionId pass through - independent dedup across different agentSessionIds --- hub/src/sync/sessionModel.test.ts | 129 +++++++++++++++++++++++++ web/src/components/SessionList.test.ts | 82 ++++++++++++++++ web/src/components/SessionList.tsx | 2 +- 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 web/src/components/SessionList.test.ts diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 1c8de9545..9b7642d95 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -568,5 +568,134 @@ describe('session model', () => { expect(cache.getSession(s1.id)).toBeDefined() expect(cache.getSession(s2.id)).toBeDefined() }) + + it('merges duplicate after it becomes inactive via session-end', async () => { + const store = new Store(':memory:') + const engine = new SyncEngine( + store, + {} as never, + new RpcRegistry(), + { broadcast() {} } as never + ) + + try { + const s1 = engine.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + const s2 = engine.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + // Mark s1 as active + engine.handleSessionAlive({ sid: s1.id, time: Date.now() }) + + // s1 is active, dedup from s2 should skip it + const events: SyncEvent[] = [] + const cache = (engine as any).sessionCache as SessionCache + await cache.deduplicateByAgentSessionId(s2.id) + expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() + + // Now s1 ends — handleSessionEnd should trigger dedup retry + engine.handleSessionEnd({ sid: s1.id, time: Date.now() }) + + // Give the fire-and-forget dedup a tick to complete + await new Promise((r) => setTimeout(r, 50)) + + // One of them should be merged away + const s1Exists = cache.getSession(s1.id) + const s2Exists = cache.getSession(s2.id) + expect(!s1Exists || !s2Exists).toBe(true) + } finally { + engine.stop() + } + }) + + it('merges duplicate after inactivity timeout expires it', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + null, + 'default' + ) + + // Mark s1 as active now + cache.handleSessionAlive({ sid: s1.id, time: Date.now() }) + + // s1 is active — dedup skips it + await cache.deduplicateByAgentSessionId(s2.id) + expect(cache.getSession(s1.id)).toBeDefined() + + // Simulate time passing beyond the 30s timeout + const expired = cache.expireInactive(Date.now() + 60_000) + expect(expired).toContain(s1.id) + + // Now s1 is inactive — dedup should merge it + await cache.deduplicateByAgentSessionId(s2.id) + expect(cache.getSession(s1.id)).toBeUndefined() + expect(cache.getSession(s2.id)).toBeDefined() + }) + + it('deep-merges agentState and filters completed requests', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + { + requests: { + 'req-1': { tool: 'Bash', arguments: {} }, + 'req-2': { tool: 'Bash', arguments: {} } + }, + completedRequests: {} + }, + 'default' + ) + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, + { + requests: { + 'req-3': { tool: 'Bash', arguments: {} } + }, + completedRequests: { + 'req-1': { tool: 'Bash', arguments: {}, status: 'approved' } + } + }, + 'default' + ) + + await cache.deduplicateByAgentSessionId(s2.id) + + const session = cache.getSession(s2.id) + expect(session).toBeDefined() + const state = session!.agentState! + + // req-1 was completed in s2 — should NOT appear in requests + expect(state.requests?.['req-1']).toBeUndefined() + // req-2 and req-3 are still pending + expect(state.requests?.['req-2']).toBeDefined() + expect(state.requests?.['req-3']).toBeDefined() + // completedRequests has req-1 + expect(state.completedRequests?.['req-1']).toBeDefined() + }) }) }) diff --git a/web/src/components/SessionList.test.ts b/web/src/components/SessionList.test.ts new file mode 100644 index 000000000..b830e8018 --- /dev/null +++ b/web/src/components/SessionList.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it } from 'vitest' +import type { SessionSummary } from '@/types/api' +import { deduplicateSessionsByAgentId } from './SessionList' + +function makeSession(overrides: Partial & { id: string }): SessionSummary { + return { + active: false, + thinking: false, + activeAt: 0, + updatedAt: 0, + metadata: null, + todoProgress: null, + pendingRequestsCount: 0, + model: null, + effort: null, + ...overrides + } +} + +describe('deduplicateSessionsByAgentId', () => { + it('deduplicates sessions with the same agentSessionId', () => { + const sessions = [ + makeSession({ id: 'a', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 100 }), + makeSession({ id: 'b', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 200 }) + ] + const result = deduplicateSessionsByAgentId(sessions) + expect(result).toHaveLength(1) + expect(result[0].id).toBe('b') // more recent wins + }) + + it('keeps active session over inactive duplicate', () => { + const sessions = [ + makeSession({ id: 'a', active: true, metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 100 }), + makeSession({ id: 'b', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 200 }) + ] + const result = deduplicateSessionsByAgentId(sessions) + expect(result).toHaveLength(1) + expect(result[0].id).toBe('a') // active wins despite older updatedAt + }) + + it('prefers selected session among inactive duplicates', () => { + const sessions = [ + makeSession({ id: 'a', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 100 }), + makeSession({ id: 'b', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 200 }) + ] + const result = deduplicateSessionsByAgentId(sessions, 'a') + expect(result).toHaveLength(1) + expect(result[0].id).toBe('a') // selected wins despite older updatedAt + }) + + it('active always wins over selected inactive', () => { + const sessions = [ + makeSession({ id: 'a', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 200 }), + makeSession({ id: 'b', active: true, metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 100 }) + ] + const result = deduplicateSessionsByAgentId(sessions, 'a') + expect(result).toHaveLength(1) + expect(result[0].id).toBe('b') // active wins over selected + }) + + it('passes through sessions without agentSessionId', () => { + const sessions = [ + makeSession({ id: 'a', metadata: { path: '/p' } }), + makeSession({ id: 'b', metadata: { path: '/p', agentSessionId: 'thread-1' } }), + makeSession({ id: 'c', metadata: null }) + ] + const result = deduplicateSessionsByAgentId(sessions) + expect(result).toHaveLength(3) + }) + + it('deduplicates independently across different agentSessionIds', () => { + const sessions = [ + makeSession({ id: 'a', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 100 }), + makeSession({ id: 'b', metadata: { path: '/p', agentSessionId: 'thread-1' }, updatedAt: 200 }), + makeSession({ id: 'c', metadata: { path: '/p', agentSessionId: 'thread-2' }, updatedAt: 100 }), + makeSession({ id: 'd', metadata: { path: '/p', agentSessionId: 'thread-2' }, updatedAt: 200 }) + ] + const result = deduplicateSessionsByAgentId(sessions) + expect(result).toHaveLength(2) + expect(result.map(s => s.id).sort()).toEqual(['b', 'd']) + }) +}) diff --git a/web/src/components/SessionList.tsx b/web/src/components/SessionList.tsx index 79aa54c09..115cc266e 100644 --- a/web/src/components/SessionList.tsx +++ b/web/src/components/SessionList.tsx @@ -39,7 +39,7 @@ function getGroupDisplayName(directory: string): string { export const UNKNOWN_MACHINE_ID = '__unknown__' -function deduplicateSessionsByAgentId(sessions: SessionSummary[], selectedSessionId?: string | null): SessionSummary[] { +export function deduplicateSessionsByAgentId(sessions: SessionSummary[], selectedSessionId?: string | null): SessionSummary[] { const byAgentId = new Map() const result: SessionSummary[] = [] From e34e67c249ebfb558bf084d705b2deb3ff2caa01 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 19:20:05 +0800 Subject: [PATCH 10/12] fix: read latest agentState before merge write to avoid overwriting live updates Re-read the target session's agentState right before writing the merged result, with a version-mismatch retry loop, so concurrent update-state events from the active CLI are not lost during dedup merge. --- hub/src/sync/sessionCache.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index c1d914e75..2093fe146 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -478,16 +478,21 @@ export class SessionCache { // Merge agentState: union requests/completedRequests from both sessions so pending // approvals on the duplicate are not lost. Only inactive duplicates reach this point // (active ones are skipped by deduplicateByAgentSessionId). - const mergedAgentState = this.mergeAgentState(oldStored.agentState, newStored.agentState) - if (mergedAgentState !== null && mergedAgentState !== newStored.agentState) { - const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) - if (latest) { - this.store.sessions.updateSessionAgentState( + // Read the latest target state right before writing to avoid overwriting live updates. + if (oldStored.agentState !== null) { + for (let attempt = 0; attempt < 2; attempt += 1) { + const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) + if (!latest) break + const mergedAgentState = this.mergeAgentState(oldStored.agentState, latest.agentState) + if (mergedAgentState === null || mergedAgentState === latest.agentState) break + const result = this.store.sessions.updateSessionAgentState( newSessionId, mergedAgentState, latest.agentStateVersion, namespace ) + if (result.result !== 'version-mismatch') break + // version-mismatch: retry with fresh snapshot } } From ec1034d8c9400d7268a367c7473855c7da06d801 Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 19:30:12 +0800 Subject: [PATCH 11/12] fix: sort expired sessions by recency before dedup When multiple duplicates for the same agent thread expire in a single sweep, process the most recent one first so it becomes the merge target and survives, rather than keeping the oldest by arbitrary iteration order. --- hub/src/sync/syncEngine.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 6ad8ab75b..9708ae2bd 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -221,8 +221,14 @@ export class SyncEngine { private expireInactive(): void { const expired = this.sessionCache.expireInactive() - for (const sessionId of expired) { - this.triggerDedupIfNeeded(sessionId) + // Sort by most recent first so dedup keeps the newest session when multiple + // duplicates for the same agent thread expire in the same sweep. + const sorted = expired + .map((id) => this.sessionCache.getSession(id)) + .filter((s): s is NonNullable => s != null) + .sort((a, b) => (b.activeAt - a.activeAt) || (b.updatedAt - a.updatedAt)) + for (const session of sorted) { + this.triggerDedupIfNeeded(session.id) } this.machineCache.expireInactive() } From 1ee71991a3c14ff917cd2633aa5a126475055e3c Mon Sep 17 00:00:00 2001 From: Haoqing Wang <1506751656@qq.com> Date: Mon, 13 Apr 2026 19:37:00 +0800 Subject: [PATCH 12/12] fix: select most recent session as merge target in dedup deduplicateByAgentSessionId now collects all inactive candidates (including the caller) and picks the one with the highest activeAt (then updatedAt) as the merge target. This ensures the newest session survives regardless of which trigger point or ordering calls the dedup. --- hub/src/sync/sessionCache.ts | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 2093fe146..902bda9a6 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -611,7 +611,7 @@ export class SessionCache { this.deduplicateInProgress.add(agentId.value) try { - const duplicates: string[] = [] + const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }] for (const [existingId, existing] of this.sessions) { if (existingId === sessionId) continue if (existing.namespace !== session.namespace) continue @@ -621,15 +621,22 @@ export class SessionCache { // whose keepalive/messages would fail if we deleted their session record. // The web-side display dedup hides active duplicates from the UI. if (existing.active) continue - duplicates.push(existingId) + candidates.push({ id: existingId, session: existing }) } - // Merge direction: duplicate → current session. The current session is the one - // whose CLI just connected and set the agent session ID, so its Socket.IO room - // is active. Messages from duplicates are moved into it; no data is lost. - for (const duplicateId of duplicates) { + if (candidates.length <= 1) return + + // Keep the most recent session as the merge target so newer state survives. + candidates.sort((a, b) => + (b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt) + ) + const targetId = candidates[0].id + const targetNamespace = candidates[0].session.namespace + + for (const { id } of candidates.slice(1)) { + if (id === targetId) continue try { - await this.mergeSessions(duplicateId, sessionId, session.namespace) + await this.mergeSessions(id, targetId, targetNamespace) } catch { // best-effort: duplicate remains if merge fails }