Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion hub/src/sync/sessionCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export class SessionCache {
private readonly sessions: Map<string, Session> = new Map()
private readonly lastBroadcastAtBySessionId: Map<string, number> = new Map()
private readonly todoBackfillAttemptedSessionIds: Set<string> = new Set()
private readonly deduplicateInProgress: Set<string> = new Set()

constructor(
private readonly store: Store,
Expand Down Expand Up @@ -280,16 +281,20 @@ export class SessionCache {
this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false, thinking: false, backgroundTaskCount: 0 } })
}

expireInactive(now: number = Date.now()): void {
expireInactive(now: number = Date.now()): string[] {
const sessionTimeoutMs = 30_000
const expired: string[] = []

for (const session of this.sessions.values()) {
if (!session.active) continue
if (now - session.activeAt <= sessionTimeoutMs) continue
session.active = false
session.thinking = false
expired.push(session.id)
this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false } })
}

return expired
}

applySessionConfig(
Expand Down Expand Up @@ -470,6 +475,27 @@ export class SessionCache {
)
}

// Merge agentState: union requests/completedRequests from both sessions so pending
// approvals on the duplicate are not lost. Only inactive duplicates reach this point
// (active ones are skipped by deduplicateByAgentSessionId).
// Read the latest target state right before writing to avoid overwriting live updates.
if (oldStored.agentState !== null) {
for (let attempt = 0; attempt < 2; attempt += 1) {
const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace)
if (!latest) break
const mergedAgentState = this.mergeAgentState(oldStored.agentState, latest.agentState)
if (mergedAgentState === null || mergedAgentState === latest.agentState) break
const result = this.store.sessions.updateSessionAgentState(
newSessionId,
mergedAgentState,
latest.agentStateVersion,
namespace
)
if (result.result !== 'version-mismatch') break
// version-mismatch: retry with fresh snapshot
}
}

if (oldStored.teamState !== null && oldStored.teamStateUpdatedAt !== null) {
this.store.sessions.setSessionTeamState(
newSessionId,
Expand Down Expand Up @@ -537,4 +563,86 @@ export class SessionCache {

return changed ? merged : newMetadata
}

private mergeAgentState(oldState: unknown | null, newState: unknown | null): unknown | null {
if (oldState === null) return newState
if (newState === null) return oldState

const oldObj = oldState as Record<string, unknown>
const newObj = newState as Record<string, unknown>

const completedRequests = {
...((oldObj.completedRequests as Record<string, unknown> | undefined) ?? {}),
...((newObj.completedRequests as Record<string, unknown> | undefined) ?? {})
}
// Filter out requests that are already completed to avoid resurrecting them as pending
const completedIds = new Set(Object.keys(completedRequests))
const requests = Object.fromEntries(
Object.entries({
...((oldObj.requests as Record<string, unknown> | undefined) ?? {}),
...((newObj.requests as Record<string, unknown> | undefined) ?? {})
}).filter(([id]) => !completedIds.has(id))
)

return { ...oldObj, ...newObj, requests, completedRequests }
}

private extractAgentSessionId(
metadata: NonNullable<Session['metadata']>
): { field: 'codexSessionId' | 'claudeSessionId' | 'geminiSessionId' | 'opencodeSessionId' | 'cursorSessionId'; value: string } | null {
if (metadata.codexSessionId) return { field: 'codexSessionId', value: metadata.codexSessionId }
if (metadata.claudeSessionId) return { field: 'claudeSessionId', value: metadata.claudeSessionId }
if (metadata.geminiSessionId) return { field: 'geminiSessionId', value: metadata.geminiSessionId }
if (metadata.opencodeSessionId) return { field: 'opencodeSessionId', value: metadata.opencodeSessionId }
if (metadata.cursorSessionId) return { field: 'cursorSessionId', value: metadata.cursorSessionId }
return null
}

async deduplicateByAgentSessionId(sessionId: string): Promise<void> {
const session = this.sessions.get(sessionId)
if (!session?.metadata) return

const agentId = this.extractAgentSessionId(session.metadata)
if (!agentId) return

// Guard: skip if another dedup for this agent ID is already in progress.
// A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates.
if (this.deduplicateInProgress.has(agentId.value)) return
this.deduplicateInProgress.add(agentId.value)

try {
const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }]
for (const [existingId, existing] of this.sessions) {
if (existingId === sessionId) continue
if (existing.namespace !== session.namespace) continue
if (!existing.metadata) continue
if (existing.metadata[agentId.field] !== agentId.value) continue
// Only merge inactive duplicates. Active ones still have a live CLI socket
// whose keepalive/messages would fail if we deleted their session record.
// The web-side display dedup hides active duplicates from the UI.
if (existing.active) continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] Skipping active duplicates here becomes permanent with the current trigger logic. Dedup only runs when the agent-session ID changes in SyncEngine.handleRealtimeEvent(), so if two wrappers overlap on the same thread, the older live duplicate is skipped now and its later session-end / expiry update never retries the merge. That leaves the duplicate row in place and routing can still stay split across two HAPI session IDs.

Suggested fix:

const becameInactive = Boolean(before?.active) && !after?.active
if (after?.metadata && (becameInactive || !this.hasSameAgentSessionIds(before?.metadata ?? null, after.metadata))) {
    void this.sessionCache.deduplicateByAgentSessionId(event.sessionId).catch(() => {})
}

const targetId = [sessionId, ...matches.map(([id]) => id)]
    .find((id) => this.sessions.get(id)?.active) ?? sessionId

candidates.push({ id: existingId, session: existing })
}

if (candidates.length <= 1) return

// Keep the most recent session as the merge target so newer state survives.
candidates.sort((a, b) =>
(b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt)
)
const targetId = candidates[0].id
const targetNamespace = candidates[0].session.namespace

for (const { id } of candidates.slice(1)) {
if (id === targetId) continue
try {
await this.mergeSessions(id, targetId, targetNamespace)
} catch {
// best-effort: duplicate remains if merge fails
}
}
} finally {
this.deduplicateInProgress.delete(agentId.value)
}
}
}
Loading
Loading