diff --git a/apps/emdash-desktop/src/main/core/conversations/conversation-session-supervisor.ts b/apps/emdash-desktop/src/main/core/conversations/conversation-session-supervisor.ts index 92ac4e713a..d8fd8ee3da 100644 --- a/apps/emdash-desktop/src/main/core/conversations/conversation-session-supervisor.ts +++ b/apps/emdash-desktop/src/main/core/conversations/conversation-session-supervisor.ts @@ -91,6 +91,17 @@ export class ConversationSessionSupervisor { return pty; } + detachActive(sessionId: string): Pty | undefined { + const runtime = this.runtimes.get(sessionId); + if (!runtime) return undefined; + + runtime.spawnInFlight = undefined; + this.clearRecoveryGraceTimer(runtime); + const pty = runtime.active?.pty; + runtime.active = undefined; + return pty; + } + isDesired(sessionId: string): boolean { return this.runtimes.get(sessionId)?.desired === true; } diff --git a/apps/emdash-desktop/src/main/core/conversations/impl/conversation-provider-respawn.test.ts b/apps/emdash-desktop/src/main/core/conversations/impl/conversation-provider-respawn.test.ts index 96ae778cee..a050fca5c4 100644 --- a/apps/emdash-desktop/src/main/core/conversations/impl/conversation-provider-respawn.test.ts +++ b/apps/emdash-desktop/src/main/core/conversations/impl/conversation-provider-respawn.test.ts @@ -20,6 +20,9 @@ const buildCommandMock = vi.hoisted(() => ); const installPluginMock = vi.hoisted(() => vi.fn(async () => [])); const writeHooksMock = vi.hoisted(() => vi.fn(async () => [])); +const sshConnectionManagerMock = vi.hoisted(() => ({ + handlers: [] as Array<(event: { type: string; connectionId: string }) => void>, +})); vi.mock('@main/core/dependencies/host-dependency-store', () => ({ hostDependencyStore: { @@ -94,6 +97,17 @@ vi.mock('@main/core/pty/ssh2-pty', () => ({ openSsh2Pty, })); +vi.mock('@main/core/ssh/lifecycle/production-ssh-connection-manager', () => ({ + sshConnectionManager: { + on: vi.fn( + (_event: string, handler: (event: { type: string; connectionId: string }) => void) => { + sshConnectionManagerMock.handlers.push(handler); + } + ), + off: vi.fn(), + }, +})); + vi.mock('./keystroke-injection', () => ({ scheduleInitialPromptInjection: vi.fn(), })); @@ -153,6 +167,8 @@ vi.mock('@main/core/settings/settings-service', () => ({ const { events } = await import('@main/lib/events'); const { agentHookService } = await import('@main/core/agent-hooks/agent-hook-service'); const { appSettingsService } = await import('@main/core/settings/settings-service'); +const { sshConnectionManager } = + await import('@main/core/ssh/lifecycle/production-ssh-connection-manager'); type RespawnState = { knownSessionIds: Set; @@ -192,9 +208,11 @@ function sshProvider( { tmux = false, ctx = {} as never, + connectionId = 'ssh-1', }: { tmux?: boolean; ctx?: ConstructorParameters[0]['ctx']; + connectionId?: string; } = {} ) { return new SshConversationProvider({ @@ -204,6 +222,7 @@ function sshProvider( tmux, ctx, proxy: proxy as never, + connectionId, }); } @@ -255,6 +274,8 @@ describe('conversation provider respawn state', () => { installPluginMock.mockResolvedValue([]); writeHooksMock.mockReset(); writeHooksMock.mockResolvedValue([]); + sshConnectionManagerMock.handlers.length = 0; + vi.mocked(sshConnectionManager.off).mockClear(); mockSettings(); vi.mocked(events.emit).mockClear(); vi.mocked(agentHookService.getPort).mockReturnValue(0); @@ -995,4 +1016,120 @@ describe('conversation provider respawn state', () => { expect((provider as unknown as RespawnState).sessions.get(sessionId)).toBe(secondPty); expect(events.emit).not.toHaveBeenCalledWith(agentSessionExitedChannel, expect.anything()); }); + + it('detaches stale SSH conversations on disconnect and resumes them when connected', async () => { + const firstExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const secondExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const firstPty = fakePty(firstExitHandlers); + const secondPty = fakePty(secondExitHandlers); + openSsh2Pty + .mockResolvedValueOnce({ success: true, data: firstPty }) + .mockResolvedValueOnce({ success: true, data: secondPty }); + const provider = sshProvider(undefined, { connectionId: 'ssh-1' }); + const item = conversation(); + const sessionId = makePtySessionId(item.projectId, item.taskId, item.id); + + await provider.startSession(item); + expect((provider as unknown as RespawnState).sessions.get(sessionId)).toBe(firstPty); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + + expect((provider as unknown as RespawnState).sessions.has(sessionId)).toBe(false); + expect(ptySessionRegistry.get(sessionId)).toBeUndefined(); + expect(firstPty.kill).toHaveBeenCalledOnce(); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'connected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + expect(openSsh2Pty).toHaveBeenCalledTimes(2); + expect((provider as unknown as RespawnState).sessions.get(sessionId)).toBe(secondPty); + expect(ptySessionRegistry.get(sessionId)).toBe(secondPty); + }); + + it('clears in-flight SSH conversation starts on disconnect so reconnect can resume', async () => { + const firstExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const secondExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const firstPty = fakePty(firstExitHandlers); + const secondPty = fakePty(secondExitHandlers); + let resolveFirstOpen: ((value: { success: true; data: Pty }) => void) | undefined; + openSsh2Pty + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFirstOpen = resolve; + }) + ) + .mockResolvedValueOnce({ success: true, data: secondPty }); + const provider = sshProvider(undefined, { connectionId: 'ssh-1' }); + const item = conversation(); + const sessionId = makePtySessionId(item.projectId, item.taskId, item.id); + + const firstStart = provider.startSession(item); + await new Promise((resolve) => setImmediate(resolve)); + expect(openSsh2Pty).toHaveBeenCalledTimes(1); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'reconnected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + expect(openSsh2Pty).toHaveBeenCalledTimes(2); + expect((provider as unknown as RespawnState).sessions.get(sessionId)).toBe(secondPty); + + resolveFirstOpen?.({ success: true, data: firstPty }); + await firstStart; + + expect(firstPty.kill).toHaveBeenCalledOnce(); + expect((provider as unknown as RespawnState).sessions.get(sessionId)).toBe(secondPty); + }); + + it('cancels in-flight SSH rehydrate starts after detachAll', async () => { + const firstExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const rehydratedExitHandlers: Array<(info: PtyExitInfo) => void> = []; + const firstPty = fakePty(firstExitHandlers); + const rehydratedPty = fakePty(rehydratedExitHandlers); + let resolveRehydrateOpen: ((value: { success: true; data: Pty }) => void) | undefined; + openSsh2Pty.mockResolvedValueOnce({ success: true, data: firstPty }).mockImplementationOnce( + () => + new Promise((resolve) => { + resolveRehydrateOpen = resolve; + }) + ); + const provider = sshProvider(undefined, { connectionId: 'ssh-1' }); + const item = conversation(); + const sessionId = makePtySessionId(item.projectId, item.taskId, item.id); + + await provider.startSession(item); + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'reconnected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + await provider.detachAll(); + resolveRehydrateOpen?.({ success: true, data: rehydratedPty }); + await new Promise((resolve) => setImmediate(resolve)); + + expect(openSsh2Pty).toHaveBeenCalledTimes(2); + expect(rehydratedPty.kill).toHaveBeenCalledOnce(); + expect((provider as unknown as RespawnState).sessions.has(sessionId)).toBe(false); + expect(ptySessionRegistry.get(sessionId)).toBeUndefined(); + }); + + it('unsubscribes SSH connection listeners when detached', async () => { + const provider = sshProvider(undefined, { connectionId: 'ssh-1' }); + + await provider.detachAll(); + + expect(sshConnectionManager.off).toHaveBeenCalledWith('connection-event', expect.any(Function)); + }); }); diff --git a/apps/emdash-desktop/src/main/core/conversations/impl/ssh-conversation.ts b/apps/emdash-desktop/src/main/core/conversations/impl/ssh-conversation.ts index 37d985cedc..b16b7f4d9e 100644 --- a/apps/emdash-desktop/src/main/core/conversations/impl/ssh-conversation.ts +++ b/apps/emdash-desktop/src/main/core/conversations/impl/ssh-conversation.ts @@ -13,7 +13,9 @@ import { openSsh2Pty } from '@main/core/pty/ssh2-pty'; import { getTerminalColorEnv } from '@main/core/pty/terminal-color-scheme'; import { killTmuxSession, makeTmuxSessionName } from '@main/core/pty/tmux-session-name'; import { providerOverrideSettings } from '@main/core/settings/provider-settings-service'; +import { sshConnectionManager } from '@main/core/ssh/lifecycle/production-ssh-connection-manager'; import type { SshClientProxy } from '@main/core/ssh/lifecycle/ssh-client-proxy'; +import type { SshConnectionManagerEvent } from '@main/core/ssh/lifecycle/ssh-connection-manager'; import { events } from '@main/lib/events'; import { log } from '@main/lib/logger'; import { telemetryService } from '@main/lib/telemetry'; @@ -36,7 +38,10 @@ function parseExtraArgs(value: string | undefined): string[] { export class SshConversationProvider implements ConversationProvider { private sessions = new Map(); private knownSessionIds = new Set(); + private conversations = new Map(); + private reconnectSizes = new Map(); private supervisor = new ConversationSessionSupervisor(); + private detached = false; private readonly projectId: string; private readonly taskPath: string; private readonly taskId: string; @@ -45,6 +50,8 @@ export class SshConversationProvider implements ConversationProvider { private readonly shellSetup?: string; private readonly ctx: IExecutionContext; private readonly proxy: SshClientProxy; + private readonly connectionId: string; + private readonly _handleReconnect: (evt: SshConnectionManagerEvent) => void; constructor({ projectId, @@ -55,6 +62,7 @@ export class SshConversationProvider implements ConversationProvider { shellSetup, ctx, proxy, + connectionId, }: { projectId: string; taskPath: string; @@ -64,6 +72,7 @@ export class SshConversationProvider implements ConversationProvider { shellSetup?: string; ctx: IExecutionContext; proxy: SshClientProxy; + connectionId: string; }) { this.projectId = projectId; this.taskPath = taskPath; @@ -73,6 +82,24 @@ export class SshConversationProvider implements ConversationProvider { this.shellSetup = shellSetup; this.ctx = ctx; this.proxy = proxy; + this.connectionId = connectionId; + this._handleReconnect = (evt: SshConnectionManagerEvent) => { + if (evt.connectionId !== this.connectionId) return; + if (evt.type === 'disconnected') { + this.detachStaleSessionsForReconnect(); + return; + } + if (evt.type === 'connected' || evt.type === 'reconnected') { + this.rehydrate().catch((e: unknown) => { + log.error('SshConversationProvider: rehydrate failed after reconnect', { + taskId: this.taskId, + connectionId: this.connectionId, + error: String(e), + }); + }); + } + }; + sshConnectionManager.on('connection-event', this._handleReconnect); } async startSession( @@ -97,12 +124,14 @@ export class SshConversationProvider implements ConversationProvider { requireDesired: boolean, options: { shellRefreshRetried: boolean } ): Promise { + if (this.detached) return; const sessionId = makePtySessionId( conversation.projectId, conversation.taskId, conversation.id ); this.knownSessionIds.add(sessionId); + this.conversations.set(sessionId, conversation); const spawnSize = ptySessionRegistry.getLastSize(sessionId) ?? initialSize; const spawnToken = this.supervisor.beginStart(sessionId, { @@ -271,6 +300,7 @@ export class SshConversationProvider implements ConversationProvider { }, }); this.sessions.set(sessionId, pty); + this.reconnectSizes.delete(sessionId); scheduleInitialPromptInjection({ pty, conversation, @@ -312,6 +342,8 @@ export class SshConversationProvider implements ConversationProvider { this.knownSessionIds.delete(sessionId); this.supervisor.forget(sessionId); } + this.conversations.delete(sessionId); + this.reconnectSizes.delete(sessionId); } async stopSession(conversationId: string): Promise { @@ -334,6 +366,8 @@ export class SshConversationProvider implements ConversationProvider { await killTmuxSession(this.ctx, makeTmuxSessionName(sessionId)); } this.supervisor.forget(sessionId); + this.conversations.delete(sessionId); + this.reconnectSizes.delete(sessionId); } async destroyAll(): Promise { @@ -346,11 +380,17 @@ export class SshConversationProvider implements ConversationProvider { this.supervisor.forget(sessionId); } this.knownSessionIds.clear(); + this.conversations.clear(); + this.reconnectSizes.clear(); } async detachAll(): Promise { - for (const [sessionId, pty] of this.sessions) { + this.detached = true; + sshConnectionManager.off('connection-event', this._handleReconnect); + for (const sessionId of this.knownSessionIds) { this.supervisor.stop(sessionId); + } + for (const [sessionId, pty] of this.sessions) { try { pty.kill(); } catch {} @@ -359,6 +399,47 @@ export class SshConversationProvider implements ConversationProvider { this.sessions.clear(); } + private detachStaleSessionsForReconnect(): void { + for (const [sessionId] of this.conversations) { + const lastSize = ptySessionRegistry.getLastSize(sessionId); + if (lastSize) this.reconnectSizes.set(sessionId, lastSize); + const pty = this.supervisor.detachActive(sessionId) ?? this.sessions.get(sessionId); + this.sessions.delete(sessionId); + if (!pty) continue; + ptySessionRegistry.unregister(sessionId, { pty }); + try { + pty.kill(); + } catch (e) { + log.warn('SshConversationProvider: error detaching stale PTY after disconnect', { + sessionId, + error: String(e), + }); + } + } + } + + private async rehydrate(): Promise { + if (this.detached) return; + await Promise.all( + Array.from(this.conversations.entries()).map(async ([sessionId, conversation]) => { + if (this.detached) return; + if (this.sessions.has(sessionId) || !this.supervisor.isDesired(sessionId)) return; + const initialSize = this.reconnectSizes.get(sessionId) ?? { + cols: DEFAULT_COLS, + rows: DEFAULT_ROWS, + }; + await this.startSessionInternal(conversation, initialSize, true, undefined, true, { + shellRefreshRetried: false, + }).catch((e) => { + log.error('SshConversationProvider: rehydrate failed', { + conversationId: conversation.id, + error: String(e), + }); + }); + }) + ); + } + private scheduleShellRefreshRetry({ conversation, sessionId, diff --git a/apps/emdash-desktop/src/main/core/pty/ssh2-pty.test.ts b/apps/emdash-desktop/src/main/core/pty/ssh2-pty.test.ts index 3de79a095b..977a4f2729 100644 --- a/apps/emdash-desktop/src/main/core/pty/ssh2-pty.test.ts +++ b/apps/emdash-desktop/src/main/core/pty/ssh2-pty.test.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'node:events'; import { describe, expect, it, vi } from 'vitest'; -import { Ssh2PtySession } from './ssh2-pty'; +import { openSsh2Pty, Ssh2PtySession } from './ssh2-pty'; class FakeClientChannel extends EventEmitter { writes: string[] = []; @@ -42,4 +42,90 @@ describe('Ssh2PtySession', () => { expect(channel.closed).toBe(true); expect(exitHandler).toHaveBeenCalledWith({ exitCode: 0, signal: undefined }); }); + + it('fails SSH channel opens that never call back', async () => { + vi.useFakeTimers(); + try { + const destroy = vi.fn(); + const proxy = { + client: { destroy }, + execPty: vi.fn(), + }; + + const resultPromise = openSsh2Pty(proxy as never, { + id: 'ssh-session', + command: 'bash', + cols: 80, + rows: 24, + }); + + await vi.advanceTimersByTimeAsync(15_000); + const result = await resultPromise; + + expect(result.success).toBe(false); + if (!result.success) { + expect(result.error.kind).toBe('channel-open-timeout'); + expect(result.error.message).toContain('timed out'); + } + expect(destroy).toHaveBeenCalledOnce(); + } finally { + vi.useRealTimers(); + } + }); + + it('destroys the client used to open the timed-out channel, not a later reconnect', async () => { + vi.useFakeTimers(); + try { + const originalClient = { destroy: vi.fn() }; + const reconnectedClient = { destroy: vi.fn() }; + let currentClient = originalClient; + const proxy = { + get client() { + return currentClient; + }, + execPty: vi.fn(() => { + currentClient = reconnectedClient; + }), + }; + + const resultPromise = openSsh2Pty(proxy as never, { + id: 'ssh-session', + command: 'bash', + cols: 80, + rows: 24, + }); + + await vi.advanceTimersByTimeAsync(15_000); + const result = await resultPromise; + + expect(result.success).toBe(false); + expect(originalClient.destroy).toHaveBeenCalledOnce(); + expect(reconnectedClient.destroy).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it('returns a failed open when execPty throws synchronously', async () => { + const proxy = { + execPty: vi.fn(() => { + throw new Error('SSH connection is not available'); + }), + }; + + const result = await openSsh2Pty(proxy as never, { + id: 'ssh-session', + command: 'bash', + cols: 80, + rows: 24, + }); + + expect(result.success).toBe(false); + if (!result.success) { + expect(result.error).toEqual({ + kind: 'channel-open-failed', + message: 'SSH connection is not available', + }); + } + }); }); diff --git a/apps/emdash-desktop/src/main/core/pty/ssh2-pty.ts b/apps/emdash-desktop/src/main/core/pty/ssh2-pty.ts index 36cf32a748..8a2232c3cf 100644 --- a/apps/emdash-desktop/src/main/core/pty/ssh2-pty.ts +++ b/apps/emdash-desktop/src/main/core/pty/ssh2-pty.ts @@ -1,15 +1,18 @@ import { err, ok, type Result } from '@emdash/shared'; -import type { ClientChannel } from 'ssh2'; +import type { Client, ClientChannel } from 'ssh2'; import type { SshClientProxy } from '@main/core/ssh/lifecycle/ssh-client-proxy'; import { log } from '@main/lib/logger'; import { normalizeSignal } from './exit-signals'; import type { Pty, PtyDimensions, PtyExitInfo } from './pty'; export type Ssh2OpenError = { - readonly kind: 'channel-open-failed'; + readonly kind: 'channel-open-failed' | 'channel-open-timeout'; readonly message: string; }; +const CHANNEL_OPEN_TIMEOUT_MS = 15_000; +const CHANNEL_OPEN_TIMEOUT_MESSAGE = `SSH channel open timed out after ${CHANNEL_OPEN_TIMEOUT_MS}ms`; + export interface Ssh2SpawnOptions extends PtyDimensions { id: string; command: string; @@ -66,25 +69,53 @@ export async function openSsh2Pty( ): Promise> { const { id, command, cols, rows } = options; return new Promise((resolve) => { - proxy.execPty( - command, - { - pty: { - term: 'xterm-256color', - cols, - rows, - // width/height in pixels — set to 0, terminal uses cols/rows instead - width: 0, - height: 0, + let settled = false; + let clientSnapshot: Client | null = null; + const timer = setTimeout(() => { + if (settled) return; + settled = true; + try { + clientSnapshot?.destroy(); + } catch {} + resolve( + err({ + kind: 'channel-open-timeout', + message: CHANNEL_OPEN_TIMEOUT_MESSAGE, + }) + ); + }, CHANNEL_OPEN_TIMEOUT_MS); + + try { + clientSnapshot = proxy.client; + proxy.execPty( + command, + { + pty: { + term: 'xterm-256color', + cols, + rows, + // width/height in pixels — set to 0, terminal uses cols/rows instead + width: 0, + height: 0, + }, }, - }, - (e, channel) => { - if (e) { - const message = e instanceof Error ? e.message : String(e); - return resolve(err({ kind: 'channel-open-failed', message })); + (e, channel) => { + if (settled) return; + settled = true; + clearTimeout(timer); + if (e) { + const message = e instanceof Error ? e.message : String(e); + return resolve(err({ kind: 'channel-open-failed', message })); + } + resolve(ok(new Ssh2PtySession(id, channel))); } - resolve(ok(new Ssh2PtySession(id, channel))); - } - ); + ); + } catch (e) { + if (settled) return; + settled = true; + clearTimeout(timer); + const message = e instanceof Error ? e.message : String(e); + resolve(err({ kind: 'channel-open-failed', message })); + } }); } diff --git a/apps/emdash-desktop/src/main/core/runtime/legacy/ssh-git.ts b/apps/emdash-desktop/src/main/core/runtime/legacy/ssh-git.ts index 1ed796a54a..d471461e13 100644 --- a/apps/emdash-desktop/src/main/core/runtime/legacy/ssh-git.ts +++ b/apps/emdash-desktop/src/main/core/runtime/legacy/ssh-git.ts @@ -69,6 +69,14 @@ type LegacyWorktreeResource = { repositoryLease: Lease; }; +async function liveModelResult(compute: () => Promise): Promise> { + try { + return ok(await compute()); + } catch (error) { + return err(error); + } +} + /** * Legacy SSH compatibility layer. SSH projects still execute Git through the main * process until the shared Git runtime can run on the remote machine. @@ -243,11 +251,11 @@ class LegacySshGitRepository implements IGitRepository { this.gitCommonDir = gitCommonDir; this.objectStoreDir = `${gitCommonDir}/objects`; this.refsModel = new LiveModel({ - compute: async () => ok(await this.computeRefs()), + compute: () => liveModelResult(() => this.computeRefs()), onError: (error) => log.warn('LegacySshGitRepository: refs refresh failed', { error }), }); this.remotesModel = new LiveModel({ - compute: async () => ok(await this.computeRemotes()), + compute: () => liveModelResult(() => this.computeRemotes()), onError: (error) => log.warn('LegacySshGitRepository: remotes refresh failed', { error }), }); this.timers = [ @@ -418,11 +426,11 @@ class LegacySshGitWorktree implements IGitWorktree { this.worktree = worktreePath; this.repository = repository; this.statusModel = new LiveModel({ - compute: async () => ok(await this.computeStatus()), + compute: () => liveModelResult(() => this.computeStatus()), onError: (error) => log.warn('LegacySshGitWorktree: status refresh failed', { error }), }); this.headModel = new LiveModel({ - compute: async () => ok(await this.computeHead()), + compute: () => liveModelResult(() => this.computeHead()), onError: (error) => log.warn('LegacySshGitWorktree: head refresh failed', { error }), }); this.timers = [ diff --git a/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.test.ts b/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.test.ts index 87fe0b4522..995eab4b5d 100644 --- a/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.test.ts +++ b/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.test.ts @@ -1,6 +1,6 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; import type { IExecutionContext } from '@main/core/execution-context/types'; -import type { PtyExitInfo } from '@main/core/pty/pty'; +import type { Pty, PtyExitInfo } from '@main/core/pty/pty'; import { ptySessionRegistry } from '@main/core/pty/pty-session-registry'; import type { SshClientProxy } from '@main/core/ssh/lifecycle/ssh-client-proxy'; import { makePtySessionId } from '@shared/core/pty/ptySessionId'; @@ -9,6 +9,33 @@ import { SshTerminalProvider } from './ssh-terminal-provider'; const ptyMock = vi.hoisted(() => ({ exitHandlers: [] as Array<(info: PtyExitInfo) => void>, + ptys: [] as Array<{ + write: ReturnType; + resize: ReturnType; + kill: ReturnType; + onData: ReturnType; + onExit: ReturnType; + }>, +})); + +const openSsh2PtyMock = vi.hoisted(() => + vi.fn(async () => { + const pty = { + write: vi.fn(), + resize: vi.fn(), + kill: vi.fn(), + onData: vi.fn(), + onExit: vi.fn((handler: (info: PtyExitInfo) => void) => { + ptyMock.exitHandlers.push(handler); + }), + }; + ptyMock.ptys.push(pty); + return { success: true, data: pty }; + }) +); + +const sshConnectionManagerMock = vi.hoisted(() => ({ + handlers: [] as Array<(event: { type: string; connectionId: string }) => void>, })); const previewServerServiceMock = vi.hoisted(() => ({ @@ -21,30 +48,24 @@ const terminalUrlDetectorMock = vi.hoisted(() => ({ })); vi.mock('@main/core/pty/ssh2-pty', () => ({ - openSsh2Pty: vi.fn(async () => ({ - success: true, - data: { - write: vi.fn(), - resize: vi.fn(), - kill: vi.fn(), - onData: vi.fn(), - onExit: vi.fn((handler: (info: PtyExitInfo) => void) => { - ptyMock.exitHandlers.push(handler); - }), - }, - })), + openSsh2Pty: openSsh2PtyMock, })); vi.mock('@main/core/pty/pty-session-registry', () => ({ ptySessionRegistry: { register: vi.fn(), unregister: vi.fn(), + getLastSize: vi.fn(), }, })); vi.mock('@main/core/ssh/lifecycle/production-ssh-connection-manager', () => ({ sshConnectionManager: { - on: vi.fn(), + on: vi.fn( + (_event: string, handler: (event: { type: string; connectionId: string }) => void) => { + sshConnectionManagerMock.handlers.push(handler); + } + ), off: vi.fn(), }, })); @@ -61,6 +82,9 @@ vi.mock('@main/core/pty/terminal-color-scheme', () => ({ getTerminalColorEnv: vi.fn().mockResolvedValue({}), })); +const { sshConnectionManager } = + await import('@main/core/ssh/lifecycle/production-ssh-connection-manager'); + const terminal: Terminal = { id: 'terminal-1', projectId: 'project-1', @@ -86,11 +110,17 @@ const proxy = { describe('SshTerminalProvider', () => { beforeEach(() => { ptyMock.exitHandlers.length = 0; + ptyMock.ptys.length = 0; + sshConnectionManagerMock.handlers.length = 0; + openSsh2PtyMock.mockClear(); + vi.mocked(sshConnectionManager.off).mockClear(); terminalUrlDetectorMock.wireTerminalUrlDetector.mockClear(); + vi.mocked(ptySessionRegistry.unregister).mockClear(); previewServerServiceMock.registerDetectedTarget.mockClear(); previewServerServiceMock.registerDetectedTarget.mockResolvedValue(undefined); previewServerServiceMock.handleTerminalSourceClosed.mockClear(); vi.mocked(ptySessionRegistry.register).mockClear(); + vi.mocked(ptySessionRegistry.getLastSize).mockReturnValue(undefined); proxy.getRemoteShellProfile = vi.fn(async () => ({ shell: '/bin/bash', env: { PATH: '/usr/bin', HOME: '/home/me' }, @@ -179,4 +209,161 @@ describe('SshTerminalProvider', () => { urlPath: '/', }); }); + + it('detaches stale sessions on disconnect so reconnect can rehydrate them', async () => { + const provider = new SshTerminalProvider({ + projectId: terminal.projectId, + scopeId: terminal.taskId, + taskPath: '/repo', + ctx, + proxy, + connectionId: 'ssh-1', + }); + + await provider.spawnTerminal(terminal, { cols: 120, rows: 40 }); + const sessionId = makePtySessionId(terminal.projectId, terminal.taskId, terminal.id); + expect( + (provider as unknown as { sessions: Map }).sessions.has(sessionId) + ).toBe(true); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + + expect( + (provider as unknown as { sessions: Map }).sessions.has(sessionId) + ).toBe(false); + expect(ptySessionRegistry.unregister).toHaveBeenCalledWith(sessionId, { pty: ptyMock.ptys[0] }); + expect(ptyMock.ptys[0]!.kill).toHaveBeenCalledOnce(); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'reconnected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + expect(openSsh2PtyMock).toHaveBeenCalledTimes(2); + expect( + (provider as unknown as { sessions: Map }).sessions.has(sessionId) + ).toBe(true); + }); + + it('rehydrates detached sessions when manually connected', async () => { + const provider = new SshTerminalProvider({ + projectId: terminal.projectId, + scopeId: terminal.taskId, + taskPath: '/repo', + ctx, + proxy, + connectionId: 'ssh-1', + }); + + await provider.spawnTerminal(terminal); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'connected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + expect(openSsh2PtyMock).toHaveBeenCalledTimes(2); + }); + + it('clears reconnect sizes when killing a terminal detached for reconnect', async () => { + const provider = new SshTerminalProvider({ + projectId: terminal.projectId, + scopeId: terminal.taskId, + taskPath: '/repo', + ctx, + proxy, + connectionId: 'ssh-1', + }); + + await provider.spawnTerminal(terminal, { cols: 120, rows: 40 }); + const sessionId = makePtySessionId(terminal.projectId, terminal.taskId, terminal.id); + vi.mocked(ptySessionRegistry.getLastSize).mockReturnValue({ cols: 120, rows: 40 }); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + + expect( + (provider as unknown as { reconnectSizes: Map }).reconnectSizes.has( + sessionId + ) + ).toBe(true); + + await provider.killTerminal(terminal.id); + + expect( + (provider as unknown as { reconnectSizes: Map }).reconnectSizes.has( + sessionId + ) + ).toBe(false); + }); + + it('cancels in-flight rehydrate starts after detachAll', async () => { + const provider = new SshTerminalProvider({ + projectId: terminal.projectId, + scopeId: terminal.taskId, + taskPath: '/repo', + ctx, + proxy, + connectionId: 'ssh-1', + }); + let resolveRehydrateOpen: ((value: { success: true; data: Pty }) => void) | undefined; + + await provider.spawnTerminal(terminal); + const firstPty = ptyMock.ptys[0]; + openSsh2PtyMock.mockImplementationOnce( + (() => + new Promise<{ success: true; data: Pty }>((resolve) => { + resolveRehydrateOpen = resolve; + })) as never + ); + + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'disconnected', connectionId: 'ssh-1' }); + } + for (const handler of sshConnectionManagerMock.handlers) { + handler({ type: 'reconnected', connectionId: 'ssh-1' }); + } + await new Promise((resolve) => setImmediate(resolve)); + + await provider.detachAll(); + const rehydratedPty = { + write: vi.fn(), + resize: vi.fn(), + kill: vi.fn(), + onData: vi.fn(), + onExit: vi.fn(), + } satisfies Pty; + resolveRehydrateOpen?.({ success: true, data: rehydratedPty }); + await new Promise((resolve) => setImmediate(resolve)); + + const sessionId = makePtySessionId(terminal.projectId, terminal.taskId, terminal.id); + expect(openSsh2PtyMock).toHaveBeenCalledTimes(2); + expect(firstPty?.kill).toHaveBeenCalledOnce(); + expect(rehydratedPty.kill).toHaveBeenCalledOnce(); + expect( + (provider as unknown as { sessions: Map }).sessions.has(sessionId) + ).toBe(false); + expect(ptySessionRegistry.register).toHaveBeenCalledTimes(1); + }); + + it('unsubscribes SSH connection listeners when detached', async () => { + const provider = new SshTerminalProvider({ + projectId: terminal.projectId, + scopeId: terminal.taskId, + taskPath: '/repo', + ctx, + proxy, + connectionId: 'ssh-1', + }); + + await provider.detachAll(); + + expect(sshConnectionManager.off).toHaveBeenCalledWith('connection-event', expect.any(Function)); + }); }); diff --git a/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.ts b/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.ts index 2f29d3c4d5..7fbd392c2f 100644 --- a/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.ts +++ b/apps/emdash-desktop/src/main/core/terminals/impl/ssh-terminal-provider.ts @@ -43,6 +43,8 @@ export class SshTerminalProvider implements TerminalProvider { private shellProfiles = new Map(); private respawnCounts = new Map(); private terminals = new Map(); + private reconnectSizes = new Map(); + private detached = false; private readonly projectId: string; private readonly workspaceId: string; private readonly scopeId: string; @@ -89,7 +91,12 @@ export class SshTerminalProvider implements TerminalProvider { this.proxy = proxy; this.connectionId = connectionId; this._handleReconnect = (evt: SshConnectionManagerEvent) => { - if (evt.type === 'reconnected' && evt.connectionId === this.connectionId) { + if (evt.connectionId !== this.connectionId) return; + if (evt.type === 'disconnected') { + this.detachStaleSessionsForReconnect(); + return; + } + if (evt.type === 'connected' || evt.type === 'reconnected') { this.rehydrate().catch((e: unknown) => { log.error('SshTerminalProvider: rehydrate failed after reconnect', { scopeId: this.scopeId, @@ -157,6 +164,7 @@ export class SshTerminalProvider implements TerminalProvider { metadata: PtySessionMetadata | undefined, policy: SpawnPolicy ): Promise { + if (this.detached) return; const sessionId = makePtySessionId(terminal.projectId, terminal.taskId, terminal.id); this.knownSessionIds.add(sessionId); if (this.sessions.has(sessionId)) return; @@ -200,6 +208,13 @@ export class SshTerminalProvider implements TerminalProvider { } const pty = result.data; + if (this.detached) { + try { + pty.kill(); + } catch {} + return; + } + if (policy.watchDevServer) { wireTerminalUrlDetector({ pty, @@ -288,6 +303,7 @@ export class SshTerminalProvider implements TerminalProvider { metadata, }); this.sessions.set(sessionId, pty); + this.reconnectSizes.delete(sessionId); } private async getSessionShellProfile( @@ -317,12 +333,14 @@ export class SshTerminalProvider implements TerminalProvider { * already running. */ async rehydrate(): Promise { + if (this.detached) return; const terminals = Array.from(this.terminals.values()); await Promise.all( terminals.map(async (terminal) => { + if (this.detached) return; const sessionId = makePtySessionId(terminal.projectId, terminal.taskId, terminal.id); if (this.sessions.has(sessionId)) return; - await this.spawnTerminal(terminal).catch((e) => { + await this.spawnTerminal(terminal, this.reconnectSizes.get(sessionId)).catch((e) => { log.error('SshTerminalProvider: rehydrate failed', { terminalId: terminal.id, error: String(e), @@ -345,13 +363,13 @@ export class SshTerminalProvider implements TerminalProvider { } this.terminals.delete(terminalId); this.shellProfiles.delete(sessionId); + this.reconnectSizes.delete(sessionId); if (this.tmux) { await killTmuxSession(this.ctx, makeTmuxSessionName(sessionId)); } } async destroyAll(): Promise { - sshConnectionManager.off('connection-event', this._handleReconnect); const sessionIds = Array.from(this.knownSessionIds); await this.detachAll(); if (this.tmux) { @@ -360,16 +378,41 @@ export class SshTerminalProvider implements TerminalProvider { this.knownSessionIds.clear(); this.terminals.clear(); this.shellProfiles.clear(); + this.reconnectSizes.clear(); } async detachAll(): Promise { + this.detached = true; + sshConnectionManager.off('connection-event', this._handleReconnect); + for (const sessionId of this.knownSessionIds) { + this.shellProfiles.delete(sessionId); + this.respawnCounts.delete(sessionId); + } for (const [sessionId, pty] of this.sessions) { try { pty.kill(); } catch {} ptySessionRegistry.unregister(sessionId); - this.shellProfiles.delete(sessionId); } this.sessions.clear(); } + + private detachStaleSessionsForReconnect(): void { + for (const [sessionId, pty] of this.sessions) { + const lastSize = ptySessionRegistry.getLastSize(sessionId); + if (lastSize) this.reconnectSizes.set(sessionId, lastSize); + this.sessions.delete(sessionId); + this.shellProfiles.delete(sessionId); + this.respawnCounts.delete(sessionId); + ptySessionRegistry.unregister(sessionId, { pty }); + try { + pty.kill(); + } catch (e) { + log.warn('SshTerminalProvider: error detaching stale PTY after disconnect', { + sessionId, + error: String(e), + }); + } + } + } } diff --git a/apps/emdash-desktop/src/main/core/workspaces/workspace-factory.ts b/apps/emdash-desktop/src/main/core/workspaces/workspace-factory.ts index 62229c07ba..df9ca226d7 100644 --- a/apps/emdash-desktop/src/main/core/workspaces/workspace-factory.ts +++ b/apps/emdash-desktop/src/main/core/workspaces/workspace-factory.ts @@ -316,6 +316,7 @@ export async function buildTaskProviders( shellSetup: opts.shellSetup, ctx, proxy: type.proxy, + connectionId: type.connectionId, taskEnvVars: opts.taskEnvVars, }), terminals: new SshTerminalProvider({