From 21888c4f45fd810a54a0f883b65f80b3f5d2e2f7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 12 Jun 2026 12:56:13 +0100 Subject: [PATCH 1/2] feat(wallet-cli): add daemon JSON-RPC socket transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the daemon's IPC transport layer for `@metamask/wallet-cli`. This is purely additive plumbing — no `mm` subcommand consumes it yet (commands land in a later slice), so there is no user-visible behavior change. - socket-line: newline-delimited read/write framing over a `net.Socket`. - daemon-client: `sendCommand` (one request per connection, id-correlated, retries once on transient connection errors) and `pingDaemon` (a `getStatus` health probe that classifies unreachable daemons by failure mode: refused / timeout / permission / protocol / other). - rpc-socket-server: `startRpcSocketServer` listens on a Unix socket and dispatches one JSON-RPC request per connection to a handler map, with a built-in `shutdown` method and idle-connection timeout. - daemon-spawn: `ensureDaemon` spawns a detached daemon and polls until the socket is responsive, refusing to take over a wedged or foreign socket. - stop-daemon: `stopDaemon` escalates from a graceful `shutdown` RPC through SIGTERM to SIGKILL, then cleans up the PID and socket files. - prompts: `confirmPurge` wraps the ESM-only `@inquirer/confirm`. - types: add `RpcHandler`, `RpcHandlerMap`, `DaemonStatusInfo`, and `DaemonSpawnConfig`. Adds `@inquirer/confirm` and `@metamask/rpc-errors` dependencies. All daemon modules are covered to the package's 100% thresholds, plus an end-to-end socket integration test exercising both halves over a real Unix socket. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/wallet-cli/CHANGELOG.md | 1 + packages/wallet-cli/package.json | 2 + .../src/daemon/daemon-client.test.ts | 341 +++++++++ .../wallet-cli/src/daemon/daemon-client.ts | 203 ++++++ .../src/daemon/daemon-spawn.test.ts | 282 ++++++++ .../wallet-cli/src/daemon/daemon-spawn.ts | 134 ++++ .../wallet-cli/src/daemon/prompts.test.ts | 38 + packages/wallet-cli/src/daemon/prompts.ts | 16 + .../src/daemon/rpc-socket-server.test.ts | 669 ++++++++++++++++++ .../src/daemon/rpc-socket-server.ts | 326 +++++++++ .../src/daemon/socket-integration.test.ts | 224 ++++++ .../wallet-cli/src/daemon/socket-line.test.ts | 120 ++++ packages/wallet-cli/src/daemon/socket-line.ts | 86 +++ .../wallet-cli/src/daemon/stop-daemon.test.ts | 306 ++++++++ packages/wallet-cli/src/daemon/stop-daemon.ts | 114 +++ packages/wallet-cli/src/daemon/types.ts | 33 + packages/wallet-cli/src/daemon/utils.test.ts | 185 +++++ packages/wallet-cli/src/daemon/utils.ts | 115 +++ yarn.lock | 63 ++ 19 files changed, 3258 insertions(+) create mode 100644 packages/wallet-cli/src/daemon/daemon-client.test.ts create mode 100644 packages/wallet-cli/src/daemon/daemon-client.ts create mode 100644 packages/wallet-cli/src/daemon/daemon-spawn.test.ts create mode 100644 packages/wallet-cli/src/daemon/daemon-spawn.ts create mode 100644 packages/wallet-cli/src/daemon/prompts.test.ts create mode 100644 packages/wallet-cli/src/daemon/prompts.ts create mode 100644 packages/wallet-cli/src/daemon/rpc-socket-server.test.ts create mode 100644 packages/wallet-cli/src/daemon/rpc-socket-server.ts create mode 100644 packages/wallet-cli/src/daemon/socket-integration.test.ts create mode 100644 packages/wallet-cli/src/daemon/socket-line.test.ts create mode 100644 packages/wallet-cli/src/daemon/socket-line.ts create mode 100644 packages/wallet-cli/src/daemon/stop-daemon.test.ts create mode 100644 packages/wallet-cli/src/daemon/stop-daemon.ts create mode 100644 packages/wallet-cli/src/daemon/utils.test.ts create mode 100644 packages/wallet-cli/src/daemon/utils.ts diff --git a/packages/wallet-cli/CHANGELOG.md b/packages/wallet-cli/CHANGELOG.md index 587b49c08f..4c86127e79 100644 --- a/packages/wallet-cli/CHANGELOG.md +++ b/packages/wallet-cli/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add the daemon transport layer: a newline-delimited JSON-RPC client and server over a Unix socket, plus daemon spawn/stop lifecycle helpers - Add SQLite-backed persistence for wallet controller state ([#9067](https://github.com/MetaMask/core/pull/9067)) - A `KeyValueStore` backed by `better-sqlite3` for synchronous reads and writes. - `loadState` to rehydrate persist-flagged controller state from the store and `subscribeToChanges` to write persist-flagged controller state through to disk on every `stateChanged` event. diff --git a/packages/wallet-cli/package.json b/packages/wallet-cli/package.json index aa5d527127..0db71443aa 100644 --- a/packages/wallet-cli/package.json +++ b/packages/wallet-cli/package.json @@ -43,7 +43,9 @@ "test:watch": "NODE_OPTIONS=--experimental-vm-modules jest --watch" }, "dependencies": { + "@inquirer/confirm": "^6.0.11", "@metamask/base-controller": "^9.1.0", + "@metamask/rpc-errors": "^7.0.2", "@metamask/utils": "^11.11.0", "@metamask/wallet": "^3.0.0", "@oclif/core": "^4.10.5", diff --git a/packages/wallet-cli/src/daemon/daemon-client.test.ts b/packages/wallet-cli/src/daemon/daemon-client.test.ts new file mode 100644 index 0000000000..783413213b --- /dev/null +++ b/packages/wallet-cli/src/daemon/daemon-client.test.ts @@ -0,0 +1,341 @@ +import type { JsonRpcResponse } from '@metamask/utils'; +import { EventEmitter } from 'node:events'; +import { createConnection } from 'node:net'; +import type { Socket } from 'node:net'; + +import { sendCommand, pingDaemon } from './daemon-client'; +import { readLine, writeLine } from './socket-line'; + +jest.mock('node:net'); +jest.mock('./socket-line'); + +const mockCreateConnection = jest.mocked(createConnection); +const mockReadLine = jest.mocked(readLine); +const mockWriteLine = jest.mocked(writeLine); + +/** + * Create a mock Socket and wire up createConnection to return it. + * The connection callback is deferred via process.nextTick to match + * real behavior (the `socket` const must be assigned before the callback + * references it). + * + * @returns The mock socket. + */ +function setupMockSocket(): Socket { + const emitter = new EventEmitter(); + const socket = Object.assign(emitter, { + destroy: jest.fn(), + write: jest.fn(), + removeListener: emitter.removeListener.bind(emitter), + }) as unknown as Socket; + + mockCreateConnection.mockImplementation( + (_path: unknown, callback: unknown) => { + process.nextTick(() => (callback as () => void)()); + return socket; + }, + ); + + return socket; +} + +/** + * Build a JSON-RPC response that mirrors back the request id from the most + * recent `mockWriteLine` call. `sendCommand` now verifies id correlation, so + * static fixtures no longer work — the response must echo the generated id. + * + * @param overrides - Optional fields to override on the response. + * @returns A function suitable for `mockReadLine.mockImplementation`. + */ +function respondWithMatchingId( + overrides: Partial = {}, +): () => Promise { + return async () => { + const lastWrite = mockWriteLine.mock.calls.at(-1)?.[1]; + const sentId = + typeof lastWrite === 'string' + ? (JSON.parse(lastWrite).id as string) + : 'test-id'; + return JSON.stringify({ + jsonrpc: '2.0', + id: sentId, + result: { status: 'ok' }, + ...overrides, + }); + }; +} + +describe('sendCommand', () => { + it('sends a JSON-RPC request and returns the response', async () => { + const socket = setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockImplementation(respondWithMatchingId()); + + const response = await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'getStatus', + }); + + expect(mockCreateConnection).toHaveBeenCalledWith( + '/tmp/test.sock', + expect.any(Function), + ); + expect(mockWriteLine).toHaveBeenCalledWith( + socket, + expect.stringContaining('"method":"getStatus"'), + ); + expect(response.result).toStrictEqual({ status: 'ok' }); + expect(socket.destroy).toHaveBeenCalled(); + }); + + it('includes params when provided', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockImplementation(respondWithMatchingId()); + + await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'test', + params: { key: 'value' }, + }); + + const written = mockWriteLine.mock.calls[0][1]; + expect(JSON.parse(written)).toHaveProperty('params', { key: 'value' }); + }); + + it('omits params when undefined', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockImplementation(respondWithMatchingId()); + + await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'test', + }); + + const written = mockWriteLine.mock.calls[0][1]; + expect(JSON.parse(written)).not.toHaveProperty('params'); + }); + + it('passes timeoutMs to readLine', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockImplementation(respondWithMatchingId()); + + await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'test', + timeoutMs: 5000, + }); + + expect(mockReadLine).toHaveBeenCalledWith(expect.anything(), 5000); + }); + + it('throws when the response id does not match the request id', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockResolvedValue( + JSON.stringify({ + jsonrpc: '2.0', + id: 'unrelated-id', + result: { status: 'ok' }, + }), + ); + + await expect( + sendCommand({ socketPath: '/tmp/test.sock', method: 'test' }), + ).rejects.toThrow(/does not match request id/u); + }); + + it('retries once on ECONNREFUSED', async () => { + const socket = setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine + .mockRejectedValueOnce( + Object.assign(new Error('refused'), { code: 'ECONNREFUSED' }), + ) + .mockImplementationOnce(respondWithMatchingId()); + + const response = await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'test', + }); + + expect(response.result).toStrictEqual({ status: 'ok' }); + expect(socket.destroy).toHaveBeenCalledTimes(2); + }); + + it('retries once on ECONNRESET', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine + .mockRejectedValueOnce( + Object.assign(new Error('reset'), { code: 'ECONNRESET' }), + ) + .mockImplementationOnce(respondWithMatchingId()); + + const response = await sendCommand({ + socketPath: '/tmp/test.sock', + method: 'test', + }); + + expect(response).toHaveProperty('result'); + }); + + it('does not retry on other errors', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockRejectedValue(new Error('parse error')); + + await expect( + sendCommand({ socketPath: '/tmp/test.sock', method: 'test' }), + ).rejects.toThrow('parse error'); + + expect(mockReadLine).toHaveBeenCalledTimes(1); + }); + + it('destroys socket even when attempt throws', async () => { + const socket = setupMockSocket(); + mockWriteLine.mockRejectedValue(new Error('write error')); + + await expect( + sendCommand({ socketPath: '/tmp/test.sock', method: 'test' }), + ).rejects.toThrow('write error'); + + expect(socket.destroy).toHaveBeenCalled(); + }); +}); + +describe('pingDaemon', () => { + /** + * Configure `createConnection` to emit a connection error synchronously. + * + * @param code - The Node errno code (e.g. ENOENT, ECONNREFUSED) the mock + * socket should emit on the next attempt. + */ + function mockConnectionError(code: string): void { + mockCreateConnection.mockImplementation((_path: unknown) => { + const emitter = new EventEmitter(); + const socket = Object.assign(emitter, { + destroy: jest.fn(), + write: jest.fn(), + removeListener: emitter.removeListener.bind(emitter), + }) as unknown as Socket; + process.nextTick(() => + socket.emit('error', Object.assign(new Error(code), { code })), + ); + return socket; + }); + } + + it('returns responsive when daemon responds', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockImplementation(respondWithMatchingId()); + + expect(await pingDaemon('/tmp/test.sock')).toStrictEqual({ + status: 'responsive', + }); + }); + + it('returns absent when the socket file does not exist', async () => { + mockConnectionError('ENOENT'); + + expect(await pingDaemon('/tmp/test.sock')).toStrictEqual({ + status: 'absent', + }); + }); + + it('returns unreachable with reason=refused when the socket refuses connection', async () => { + // ECONNREFUSED is retried once; both attempts will reject with the same + // mock implementation. + mockConnectionError('ECONNREFUSED'); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toStrictEqual({ + status: 'unreachable', + reason: 'refused', + error: expect.any(Error), + }); + }); + + it('returns unreachable with reason=permission on EACCES', async () => { + mockConnectionError('EACCES'); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toMatchObject({ + status: 'unreachable', + reason: 'permission', + }); + }); + + it('returns unreachable with reason=timeout when the socket read times out', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockRejectedValue(new Error('Socket read timed out')); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toMatchObject({ + status: 'unreachable', + reason: 'timeout', + }); + }); + + it('returns unreachable with reason=protocol on a JSON-RPC id mismatch', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockResolvedValue( + JSON.stringify({ + jsonrpc: '2.0', + id: 'unrelated-id', + result: { status: 'ok' }, + }), + ); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toMatchObject({ + status: 'unreachable', + reason: 'protocol', + }); + }); + + it('returns unreachable with reason=protocol on a JSON parse error', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockResolvedValue('not json'); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toMatchObject({ + status: 'unreachable', + reason: 'protocol', + }); + }); + + it('returns unreachable with reason=other for unclassified errors', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + mockReadLine.mockRejectedValue(new Error('something weird')); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toMatchObject({ + status: 'unreachable', + reason: 'other', + }); + }); + + it('normalizes non-Error throws into an Error instance', async () => { + setupMockSocket(); + mockWriteLine.mockResolvedValue(undefined); + // Simulate a non-Error throw; the producer must normalize it. + mockReadLine.mockImplementation(async () => + Promise.reject('string-throw' as unknown as Error), + ); + + const result = await pingDaemon('/tmp/test.sock'); + expect(result).toStrictEqual({ + status: 'unreachable', + reason: 'other', + error: expect.objectContaining({ message: 'string-throw' }), + }); + }); +}); diff --git a/packages/wallet-cli/src/daemon/daemon-client.ts b/packages/wallet-cli/src/daemon/daemon-client.ts new file mode 100644 index 0000000000..932e8aa8f4 --- /dev/null +++ b/packages/wallet-cli/src/daemon/daemon-client.ts @@ -0,0 +1,203 @@ +import type { JsonRpcParams, JsonRpcResponse } from '@metamask/utils'; +import { assertIsJsonRpcResponse } from '@metamask/utils'; +import { randomUUID } from 'node:crypto'; +import { createConnection } from 'node:net'; +import type { Socket } from 'node:net'; + +import { readLine, writeLine } from './socket-line'; +import { isErrorWithCode } from './utils'; + +const DEFAULT_TIMEOUT_MS = 30_000; + +/** + * Options for {@link sendCommand}. + */ +type SendCommandOptions = { + /** The Unix socket path. */ + socketPath: string; + /** The RPC method name. */ + method: string; + /** Optional method parameters (object or positional array). */ + params?: JsonRpcParams | undefined; + /** Response read timeout in milliseconds (default: 30 000). */ + timeoutMs?: number | undefined; +}; + +async function connectSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const socket = createConnection(socketPath, () => { + socket.removeListener('error', reject); + resolve(socket); + }); + socket.on('error', reject); + }); +} + +/** + * Send a JSON-RPC request to the daemon over a Unix socket and return the + * response. + * + * Opens a connection, writes one JSON-RPC request line, reads one JSON-RPC + * response line, then closes the connection. Retries once after a short delay + * on transient connection errors (ECONNREFUSED, ECONNRESET). Verifies that the + * response `id` matches the outgoing request `id`. + * + * @param options - Command options. + * @param options.socketPath - The Unix socket path. + * @param options.method - The RPC method name. + * @param options.params - Optional method parameters. + * @param options.timeoutMs - Read timeout in milliseconds. + * @returns The parsed JSON-RPC response. + */ +export async function sendCommand({ + socketPath, + method, + params, + timeoutMs, +}: SendCommandOptions): Promise { + const id = randomUUID(); + const request = { + jsonrpc: '2.0', + id, + method, + ...(params === undefined ? {} : { params }), + }; + + const effectiveTimeout = timeoutMs ?? DEFAULT_TIMEOUT_MS; + + const attempt = async (): Promise => { + const socket = await connectSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket, effectiveTimeout); + const parsed: unknown = JSON.parse(responseLine); + assertIsJsonRpcResponse(parsed); + if (parsed.id !== id) { + throw new Error( + `JSON-RPC response id ${JSON.stringify(parsed.id)} does not match request id ${JSON.stringify(id)}`, + ); + } + return parsed; + } finally { + socket.destroy(); + } + }; + + try { + return await attempt(); + } catch (error: unknown) { + if ( + !isErrorWithCode(error, 'ECONNREFUSED') && + !isErrorWithCode(error, 'ECONNRESET') + ) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + return attempt(); + } +} + +/** + * Why an unreachable daemon cannot be queried. + * + * - `'refused'`: connection refused after retry (`ECONNREFUSED` / `ECONNRESET`). + * Typical of a daemon that has crashed or is mid-restart. + * - `'timeout'`: the daemon accepted the connection but did not respond within + * the read timeout — most likely wedged on a long-running operation. + * - `'permission'`: the socket exists but cannot be opened (`EACCES` / `EPERM`). + * The daemon almost certainly belongs to another user. + * - `'protocol'`: the daemon responded but the response did not parse as a + * valid JSON-RPC response, or the response id did not match. + * - `'other'`: anything else. + */ +export type PingUnreachableReason = + | 'refused' + | 'timeout' + | 'permission' + | 'protocol' + | 'other'; + +/** + * Outcome of a daemon health check. + * + * - `'responsive'`: the daemon answered a `getStatus` RPC. + * - `'absent'`: the socket connect attempt failed with `ENOENT`, i.e. no + * socket file exists at the path. No daemon present. + * - `'unreachable'`: any other non-success outcome. The daemon may still be + * alive but is not responding. Callers should not silently take over the + * slot or assume the daemon is dead. The `reason` field categorises the + * failure so callers can distinguish a wedged sibling daemon from a + * foreign-user daemon from a transient crash. User-initiated stop / purge + * flows may still escalate to signals against the recorded PID. + */ +export type PingResult = + | { status: 'responsive' } + | { status: 'absent' } + | { status: 'unreachable'; reason: PingUnreachableReason; error: Error }; + +/** + * Normalize an unknown throw into a real Error instance so that downstream + * consumers (which read `.message`) cannot crash on string/object throws. + * + * @param error - The caught value. + * @returns An Error mirroring the caught value. + */ +function toError(error: unknown): Error { + return error instanceof Error ? error : new Error(String(error)); +} + +/** + * Categorise an unreachable error by Node errno / message shape so callers can + * make decisions per failure mode rather than parsing message strings. + * + * @param error - The caught value. + * @returns A {@link PingUnreachableReason} label. + */ +function classifyUnreachable(error: unknown): PingUnreachableReason { + if ( + isErrorWithCode(error, 'ECONNREFUSED') || + isErrorWithCode(error, 'ECONNRESET') + ) { + return 'refused'; + } + if (isErrorWithCode(error, 'EACCES') || isErrorWithCode(error, 'EPERM')) { + return 'permission'; + } + if (error instanceof Error && error.message === 'Socket read timed out') { + return 'timeout'; + } + if ( + error instanceof Error && + (error.message.includes('JSON-RPC response id') || + /Expected .* JSON-RPC/u.test(error.message) || + error.name === 'SyntaxError') + ) { + return 'protocol'; + } + return 'other'; +} + +/** + * Check whether the daemon is running by sending a lightweight `getStatus` + * RPC call. Distinguishes "no daemon present" (socket file missing) from + * "daemon present but unreachable" (socket file exists but the daemon is + * wedged, mid-shutdown, or owned by a different user). + * + * @param socketPath - The Unix socket path. + * @returns A {@link PingResult} describing the daemon's reachability. + */ +export async function pingDaemon(socketPath: string): Promise { + try { + await sendCommand({ socketPath, method: 'getStatus', timeoutMs: 3_000 }); + return { status: 'responsive' }; + } catch (error: unknown) { + if (isErrorWithCode(error, 'ENOENT')) { + return { status: 'absent' }; + } + return { + status: 'unreachable', + reason: classifyUnreachable(error), + error: toError(error), + }; + } +} diff --git a/packages/wallet-cli/src/daemon/daemon-spawn.test.ts b/packages/wallet-cli/src/daemon/daemon-spawn.test.ts new file mode 100644 index 0000000000..4828f4d33b --- /dev/null +++ b/packages/wallet-cli/src/daemon/daemon-spawn.test.ts @@ -0,0 +1,282 @@ +import { spawn } from 'node:child_process'; +import { existsSync } from 'node:fs'; + +import { pingDaemon } from './daemon-client'; +import { ensureDaemon } from './daemon-spawn'; +import { getDaemonPaths } from './paths'; +import type { DaemonSpawnConfig } from './types'; + +jest.mock('node:child_process'); +jest.mock('node:fs'); +jest.mock('./daemon-client'); +jest.mock('./paths'); + +const mockSpawn = jest.mocked(spawn); +const mockExistsSync = jest.mocked(existsSync); +const mockPingDaemon = jest.mocked(pingDaemon); +const mockGetDaemonPaths = jest.mocked(getDaemonPaths); + +const CONFIG: DaemonSpawnConfig = { + dataDir: '/tmp/data', + infuraProjectId: 'test-key', + password: 'test-pass', + srp: 'test test test test test test test test test test test ball', + packageRoot: '/pkg', +}; + +const ABSENT = { status: 'absent' as const }; +const RESPONSIVE = { status: 'responsive' as const }; +const UNREACHABLE = { + status: 'unreachable' as const, + reason: 'refused' as const, + error: new Error('wedged'), +}; + +const UNREACHABLE_PERMISSION = { + status: 'unreachable' as const, + reason: 'permission' as const, + error: new Error('EACCES'), +}; + +/** + * Build a minimal mock for the `ChildProcess` returned by `spawn`. The `on` + * handler captures `'exit'`/`'error'` listeners so tests can fire them. + */ +type SpawnMock = { + unref: jest.Mock; + on: jest.Mock; + fireExit: (code: number | null, signal: NodeJS.Signals | null) => void; +}; + +/** + * Build a fresh spawn mock and wire it as the return value of `mockSpawn`. + * + * @returns The captured handles so tests can fire lifecycle events. + */ +function setupSpawnMock(): SpawnMock { + const listeners = new Map void>(); + const on = jest.fn((event: string, handler: (...args: unknown[]) => void) => { + listeners.set(event, handler); + }); + const result: SpawnMock = { + unref: jest.fn(), + on, + fireExit: (code, signal) => { + listeners.get('exit')?.(code, signal); + }, + }; + mockSpawn.mockReturnValue(result as never); + return result; +} + +describe('ensureDaemon', () => { + beforeEach(() => { + jest.resetAllMocks(); + jest.spyOn(process.stderr, 'write').mockImplementation(() => true); + mockGetDaemonPaths.mockReturnValue({ + socketPath: '/tmp/test.sock', + pidPath: '/tmp/test.pid', + logPath: '/tmp/test.log', + dbPath: '/tmp/wallet.db', + }); + setupSpawnMock(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('returns already-running when a responsive daemon already exists', async () => { + mockPingDaemon.mockResolvedValue(RESPONSIVE); + + const result = await ensureDaemon(CONFIG); + expect(result).toStrictEqual({ + state: 'already-running', + socketPath: '/tmp/test.sock', + }); + expect(mockSpawn).not.toHaveBeenCalled(); + }); + + it('refuses to start when the socket exists but is unreachable', async () => { + mockPingDaemon.mockResolvedValue(UNREACHABLE); + + await expect(ensureDaemon(CONFIG)).rejects.toThrow( + /a daemon socket already exists.*unresponsive/u, + ); + expect(mockSpawn).not.toHaveBeenCalled(); + }); + + it('reports a foreign-user daemon distinctly when the ping reason is permission', async () => { + mockPingDaemon.mockResolvedValue(UNREACHABLE_PERMISSION); + + await expect(ensureDaemon(CONFIG)).rejects.toThrow( + /owned by another user/u, + ); + expect(mockSpawn).not.toHaveBeenCalled(); + }); + + it('spawns daemon as detached child with correct env vars', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(true); + + await ensureDaemon(CONFIG); + + expect(mockSpawn).toHaveBeenCalledWith( + process.execPath, + ['/pkg/dist/daemon/daemon-entry.mjs'], + expect.objectContaining({ + detached: true, + stdio: 'ignore', + env: expect.objectContaining({ + MM_DAEMON_DATA_DIR: '/tmp/data', + MM_DAEMON_SOCKET_PATH: '/tmp/test.sock', + INFURA_PROJECT_ID: 'test-key', + MM_WALLET_PASSWORD: 'test-pass', + MM_WALLET_SRP: + 'test test test test test test test test test test test ball', + }), + }), + ); + }); + + it('returns started when the spawned daemon becomes responsive', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(true); + + const result = await ensureDaemon(CONFIG); + + expect(result).toStrictEqual({ + state: 'started', + socketPath: '/tmp/test.sock', + }); + }); + + it('uses dist entry when it exists', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(true); + + await ensureDaemon(CONFIG); + + const spawnArgs = mockSpawn.mock.calls[0][1] as string[]; + expect(spawnArgs).toStrictEqual(['/pkg/dist/daemon/daemon-entry.mjs']); + }); + + it('falls back to src entry with tsx when dist missing', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(false); + + await ensureDaemon(CONFIG); + + const spawnArgs = mockSpawn.mock.calls[0][1] as string[]; + expect(spawnArgs).toStrictEqual([ + '--import', + 'tsx', + '/pkg/src/daemon/daemon-entry.ts', + ]); + }); + + it('polls until daemon is ready', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) // initial check + .mockResolvedValueOnce(ABSENT) // poll 1 + .mockResolvedValueOnce(ABSENT) // poll 2 + .mockResolvedValueOnce(RESPONSIVE); // poll 3 + mockExistsSync.mockReturnValue(true); + + await ensureDaemon(CONFIG); + + expect(mockPingDaemon).toHaveBeenCalledTimes(4); + expect(process.stderr.write).toHaveBeenCalledWith('Daemon ready.\n'); + }); + + it('throws after timeout when daemon never responds', async () => { + jest.useFakeTimers(); + mockPingDaemon.mockResolvedValue(ABSENT); + mockExistsSync.mockReturnValue(true); + + const promise = ensureDaemon(CONFIG); + const rejection = promise.catch((thrown: unknown) => thrown); + + await jest.advanceTimersByTimeAsync(30_100); + const thrownError = await rejection; + expect(thrownError).toBeInstanceOf(Error); + expect((thrownError as Error).message).toBe( + 'Daemon did not start within 30s', + ); + jest.useRealTimers(); + }); + + it('throws early when the child process exits during the readiness poll', async () => { + mockPingDaemon.mockResolvedValue(ABSENT); + mockExistsSync.mockReturnValue(true); + // Fire exit at the moment the daemon-spawn code registers the listener, + // so the very first poll iteration sees exitInfo set. + const on = jest.fn( + (event: string, handler: (...args: unknown[]) => void) => { + if (event === 'exit') { + handler(1, null); + } + }, + ); + mockSpawn.mockReturnValue({ unref: jest.fn(), on } as never); + + jest.useFakeTimers(); + const promise = ensureDaemon(CONFIG); + const rejection = promise.catch((thrown: unknown) => thrown); + await jest.advanceTimersByTimeAsync(200); + + const thrownError = await rejection; + expect(thrownError).toBeInstanceOf(Error); + expect((thrownError as Error).message).toContain( + 'Daemon process exited during startup', + ); + expect((thrownError as Error).message).toContain('code=1'); + expect((thrownError as Error).message).toContain('/tmp/test.log'); + }); + + it('calls unref on spawned child and registers error + exit handlers', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(true); + const spawnMock = setupSpawnMock(); + + await ensureDaemon(CONFIG); + + expect(spawnMock.unref).toHaveBeenCalled(); + expect(spawnMock.on).toHaveBeenCalledWith('error', expect.any(Function)); + expect(spawnMock.on).toHaveBeenCalledWith('exit', expect.any(Function)); + }); + + it('writes spawn errors to stderr', async () => { + mockPingDaemon + .mockResolvedValueOnce(ABSENT) + .mockResolvedValueOnce(RESPONSIVE); + mockExistsSync.mockReturnValue(true); + + let errorHandler: ((error: Error) => void) | undefined; + const on = jest.fn( + (event: string, handler: (error: Error) => void): void => { + if (event === 'error') { + errorHandler = handler; + } + }, + ); + mockSpawn.mockReturnValue({ unref: jest.fn(), on } as never); + + await ensureDaemon(CONFIG); + errorHandler?.(new Error('spawn ENOENT')); + + expect(process.stderr.write).toHaveBeenCalledWith( + expect.stringContaining('Failed to spawn daemon process'), + ); + }); +}); diff --git a/packages/wallet-cli/src/daemon/daemon-spawn.ts b/packages/wallet-cli/src/daemon/daemon-spawn.ts new file mode 100644 index 0000000000..834ed8b267 --- /dev/null +++ b/packages/wallet-cli/src/daemon/daemon-spawn.ts @@ -0,0 +1,134 @@ +import { spawn } from 'node:child_process'; +import { existsSync } from 'node:fs'; +import { join } from 'node:path'; + +import { pingDaemon } from './daemon-client'; +import { getDaemonPaths } from './paths'; +import type { DaemonSpawnConfig } from './types'; + +const POLL_INTERVAL_MS = 100; +const MAX_POLLS = 300; // 30 seconds + +/** + * Outcome of {@link ensureDaemon}. + * + * - `'already-running'`: a responsive daemon was found at the configured + * socket path. The supplied flags (`infuraProjectId`, `password`, `srp`) + * were NOT applied to that daemon; the caller should surface this so a + * user who is trying to change them isn't silently ignored. + * - `'started'`: a new daemon was spawned and is now responsive. + */ +export type EnsureDaemonResult = { + state: 'already-running' | 'started'; + socketPath: string; +}; + +/** + * Ensure the daemon is running. If a responsive daemon already exists, return + * `'already-running'` (caller decides how to surface that). Otherwise spawn + * one as a detached process and wait until the socket becomes responsive. + * + * Refuses to spawn when pinging the existing socket fails with anything other + * than `ENOENT` (wedged or foreign daemon) — taking over could orphan the + * existing process and corrupt its PID file. + * + * @param config - Spawn configuration. + * @returns The state of the daemon and the socket path it's listening on. + */ +export async function ensureDaemon( + config: DaemonSpawnConfig, +): Promise { + const { socketPath } = getDaemonPaths(config.dataDir); + + const initialPing = await pingDaemon(socketPath); + if (initialPing.status === 'responsive') { + return { state: 'already-running', socketPath }; + } + if (initialPing.status === 'unreachable') { + if (initialPing.reason === 'permission') { + throw new Error( + `Refusing to start: the socket at ${socketPath} is owned by another user. ` + + `Choose a different data directory (MM_DAEMON_DATA_DIR) or remove the socket manually. ` + + `(${initialPing.error.message})`, + ); + } + throw new Error( + `Refusing to start: a daemon socket already exists at ${socketPath} but is unresponsive. ` + + `Run \`mm daemon stop\` (or \`mm daemon purge\`) before starting a new daemon. ` + + `(${initialPing.error.message})`, + ); + } + + process.stderr.write('Starting daemon...\n'); + + const { entryPath, args } = resolveEntryPoint(config.packageRoot); + + const child = spawn(process.execPath, [...args, entryPath], { + detached: true, + stdio: 'ignore', + env: { + ...process.env, + MM_DAEMON_DATA_DIR: config.dataDir, + MM_DAEMON_SOCKET_PATH: socketPath, + INFURA_PROJECT_ID: config.infuraProjectId, + MM_WALLET_PASSWORD: config.password, + MM_WALLET_SRP: config.srp, + }, + }); + + type ExitInfo = { code: number | null; signal: NodeJS.Signals | null }; + const exitInfo: { value: ExitInfo | null } = { value: null }; + + child.on('error', (error) => { + process.stderr.write(`Failed to spawn daemon process: ${String(error)}\n`); + }); + child.on('exit', (code, signal) => { + exitInfo.value = { code, signal }; + }); + child.unref(); + + for (let i = 0; i < MAX_POLLS; i++) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); + if (exitInfo.value !== null) { + const { code, signal } = exitInfo.value; + throw new Error( + `Daemon process exited during startup (code=${String(code)}, signal=${String(signal)}). ` + + `Check the daemon log at ${getDaemonPaths(config.dataDir).logPath}.`, + ); + } + const ping = await pingDaemon(socketPath); + if (ping.status === 'responsive') { + process.stderr.write('Daemon ready.\n'); + return { state: 'started', socketPath }; + } + } + + throw new Error( + `Daemon did not start within ${(MAX_POLLS * POLL_INTERVAL_MS) / 1000}s`, + ); +} + +/** + * Resolve the daemon entry point path and any extra Node.js args needed. + * + * In production, uses the compiled dist output. In development, uses tsx + * to run TypeScript source directly. + * + * @param packageRoot - The root directory of the wallet-cli package. + * @returns The entry path and any extra node args. + */ +function resolveEntryPoint(packageRoot: string): { + entryPath: string; + args: string[]; +} { + const distEntry = join(packageRoot, 'dist', 'daemon', 'daemon-entry.mjs'); + if (existsSync(distEntry)) { + return { entryPath: distEntry, args: [] }; + } + + const srcEntry = join(packageRoot, 'src', 'daemon', 'daemon-entry.ts'); + return { + entryPath: srcEntry, + args: ['--import', 'tsx'], + }; +} diff --git a/packages/wallet-cli/src/daemon/prompts.test.ts b/packages/wallet-cli/src/daemon/prompts.test.ts new file mode 100644 index 0000000000..6ec63c50d2 --- /dev/null +++ b/packages/wallet-cli/src/daemon/prompts.test.ts @@ -0,0 +1,38 @@ +// `@inquirer/confirm` is ESM-only and `prompts.ts` reaches it via a dynamic +// `import()`. Use jest's ESM mock API and dynamic imports to mirror that. +// The import statement below is what tags this file as a module for the +// `import-x/unambiguous` lint rule, even though it imports only the type. +import type Confirm from '@inquirer/confirm'; + +jest.unstable_mockModule('@inquirer/confirm', () => ({ + __esModule: true, + default: jest.fn(), +})); + +type ConfirmMock = jest.MockedFunction; + +describe('confirmPurge', () => { + it('invokes @inquirer/confirm with the purge prompt and returns its result', async () => { + const confirm = (await import('@inquirer/confirm')) + .default as unknown as ConfirmMock; + confirm.mockResolvedValue(true); + const { confirmPurge } = await import('./prompts'); + + const result = await confirmPurge(); + + expect(result).toBe(true); + expect(confirm).toHaveBeenCalledWith({ + message: 'This will stop the daemon and delete all state. Continue?', + default: false, + }); + }); + + it('returns false when the user declines', async () => { + const confirm = (await import('@inquirer/confirm')) + .default as unknown as ConfirmMock; + confirm.mockResolvedValue(false); + const { confirmPurge } = await import('./prompts'); + + expect(await confirmPurge()).toBe(false); + }); +}); diff --git a/packages/wallet-cli/src/daemon/prompts.ts b/packages/wallet-cli/src/daemon/prompts.ts new file mode 100644 index 0000000000..4245b68050 --- /dev/null +++ b/packages/wallet-cli/src/daemon/prompts.ts @@ -0,0 +1,16 @@ +/** + * Ask the user to confirm the destructive `daemon purge` operation. + * + * Wraps `@inquirer/confirm` in a dynamic import so this CommonJS-compiled + * package can interop with that ESM-only dependency, and so tests can mock + * the prompt without going through jest's ESM mock machinery. + * + * @returns True if the user confirmed. + */ +export async function confirmPurge(): Promise { + const { default: confirm } = await import('@inquirer/confirm'); + return confirm({ + message: 'This will stop the daemon and delete all state. Continue?', + default: false, + }); +} diff --git a/packages/wallet-cli/src/daemon/rpc-socket-server.test.ts b/packages/wallet-cli/src/daemon/rpc-socket-server.test.ts new file mode 100644 index 0000000000..2fcea6ff95 --- /dev/null +++ b/packages/wallet-cli/src/daemon/rpc-socket-server.test.ts @@ -0,0 +1,669 @@ +import { EventEmitter } from 'node:events'; +import { unlink } from 'node:fs/promises'; +import { createServer } from 'node:net'; +import type { Server, Socket } from 'node:net'; + +import { startRpcSocketServer } from './rpc-socket-server'; +import type { RpcHandlerMap } from './types'; + +jest.mock('node:fs/promises'); +jest.mock('node:net'); + +const mockUnlink = jest.mocked(unlink); +const mockCreateServer = jest.mocked(createServer); + +type ConnectionCallback = (socket: Socket) => void; + +/** + * Flush pending microtasks/promises by awaiting multiple ticks. + */ +async function flushPromises(): Promise { + for (let i = 0; i < 10; i++) { + await new Promise((resolve) => process.nextTick(resolve)); + } +} + +/** + * Create a mock net.Server. + * + * @returns The mock server and a function to simulate incoming connections. + */ +function createMockServer(): { + server: Server; + simulateConnection: (socket: Socket) => void; +} { + const emitter = new EventEmitter(); + let connectionCallback: ConnectionCallback | undefined; + + const server = Object.assign(emitter, { + listen: jest.fn((_path: string, onListening: () => void) => { + onListening(); + }), + close: jest.fn((onClose: (closeError?: Error) => void) => { + onClose(); + }), + removeListener: emitter.removeListener.bind(emitter), + }) as unknown as Server; + + mockCreateServer.mockImplementation((handler: unknown) => { + connectionCallback = handler as ConnectionCallback; + return server; + }); + + return { + server, + simulateConnection: (socket: Socket): void => { + connectionCallback?.(socket); + }, + }; +} + +/** + * Create a mock Socket. + * + * @returns A mock socket. + */ +function createMockSocket(): Socket { + const emitter = new EventEmitter(); + return Object.assign(emitter, { + end: jest.fn(), + destroy: jest.fn(), + write: jest.fn(), + removeListener: emitter.removeListener.bind(emitter), + }) as unknown as Socket; +} + +/** + * Parse the JSON-RPC response written to socket.end(). + * + * @param socket - The mock socket. + * @returns The parsed response. + */ +function getResponse(socket: Socket): Record { + const endCall = (socket.end as jest.Mock).mock.calls[0][0] as string; + return JSON.parse(endCall.trim()) as Record; +} + +/** + * Send a JSON-RPC request to a mock socket by emitting data. + * + * @param socket - The mock socket. + * @param request - The request object. + */ +function sendRequest(socket: Socket, request: Record): void { + socket.emit('data', Buffer.from(`${JSON.stringify(request)}\n`)); +} + +describe('startRpcSocketServer', () => { + beforeEach(() => { + mockUnlink.mockResolvedValue(undefined); + }); + + it('removes stale socket file before listening', async () => { + createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + expect(mockUnlink).toHaveBeenCalledWith('/tmp/test.sock'); + }); + + it('ignores ENOENT unlink errors for missing files', async () => { + mockUnlink.mockRejectedValue( + Object.assign(new Error('ENOENT'), { code: 'ENOENT' }), + ); + createMockServer(); + + const handle = await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + expect(handle).toBeDefined(); + }); + + it('propagates non-ENOENT unlink errors', async () => { + mockUnlink.mockRejectedValue( + Object.assign(new Error('EACCES'), { code: 'EACCES' }), + ); + createMockServer(); + + await expect( + startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }), + ).rejects.toThrow('EACCES'); + }); + + it('returns a handle with close()', async () => { + const { server } = createMockServer(); + const handle = await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + await handle.close(); + expect(server.close).toHaveBeenCalled(); + }); + + it('rejects close() when server.close errors', async () => { + createMockServer(); + const handle = await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const { server } = createMockServer(); + (server.close as jest.Mock).mockImplementation( + (onClose: (closeError?: Error) => void) => { + onClose(new Error('close failed')); + }, + ); + + const handle2 = await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + await expect(handle2.close()).rejects.toThrow('close failed'); + await handle.close(); + }); + + describe('request handling', () => { + it('dispatches valid request to handler and returns result', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + getStatus: jest.fn().mockResolvedValue({ status: 'ok' }), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { + jsonrpc: '2.0', + id: '1', + method: 'getStatus', + }); + + await flushPromises(); + + expect(getResponse(socket)).toStrictEqual({ + jsonrpc: '2.0', + id: '1', + result: { status: 'ok' }, + }); + }); + + it('returns null result when handler returns undefined', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + noop: jest.fn().mockResolvedValue(undefined), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'noop' }); + + await flushPromises(); + + expect(getResponse(socket).result).toBeNull(); + }); + + it('returns -32600 for missing method', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1' }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32600, + message: 'Invalid JSON-RPC request', + }), + ); + }); + + it('returns -32600 for a request whose id is an object', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { + jsonrpc: '2.0', + id: { nested: 'bad' }, + method: 'getStatus', + }); + + await flushPromises(); + + const response = getResponse(socket); + // Per JSON-RPC 2.0, id must be string/number/null. We cannot echo the + // object back, so respond with id: null. + expect(response.id).toBeNull(); + expect(response.error).toStrictEqual( + expect.objectContaining({ code: -32600 }), + ); + }); + + it('returns -32601 for unknown method', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { + jsonrpc: '2.0', + id: '1', + method: 'nonexistent', + }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32601, + message: 'Method not found: nonexistent', + }), + ); + }); + + it.each(['toString', 'constructor', 'hasOwnProperty', '__proto__'])( + 'returns -32601 for the Object.prototype name %p instead of invoking the inherited member', + async (method) => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32601, + }), + ); + }, + ); + + it('returns -32603 when handler throws an Error', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + failing: jest.fn().mockRejectedValue(new Error('handler failed')), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'failing' }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32603, + message: 'handler failed', + }), + ); + }); + + it('passes through RPC error objects when handler throws one', async () => { + const { simulateConnection } = createMockServer(); + const rpcError = { code: -32001, message: 'custom rpc' }; + const handlers: RpcHandlerMap = { + failing: jest.fn().mockRejectedValue(rpcError), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'failing' }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual({ + code: -32001, + message: 'custom rpc', + }); + }); + + it('returns Internal error when handler throws a non-Error value', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + failing: jest.fn().mockRejectedValue('string error'), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'failing' }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32603, + message: 'Internal error', + }), + ); + }); + + it('intercepts shutdown method and calls onShutdown', async () => { + jest.useFakeTimers(); + const { simulateConnection } = createMockServer(); + const onShutdown = jest.fn().mockResolvedValue(undefined); + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + onShutdown, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'shutdown' }); + + await jest.advanceTimersByTimeAsync(0); + + expect(getResponse(socket).result).toStrictEqual({ + status: 'shutting down', + }); + expect(onShutdown).toHaveBeenCalled(); + jest.useRealTimers(); + }); + + it('handles onShutdown rejection and logs to stderr', async () => { + jest.useFakeTimers(); + const stderrSpy = jest + .spyOn(process.stderr, 'write') + .mockImplementation(() => true); + const { simulateConnection } = createMockServer(); + const onShutdown = jest.fn().mockRejectedValue(new Error('shutdown err')); + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + onShutdown, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'shutdown' }); + + await jest.advanceTimersByTimeAsync(0); + + expect(getResponse(socket).result).toStrictEqual({ + status: 'shutting down', + }); + expect(stderrSpy).toHaveBeenCalledWith( + expect.stringContaining('onShutdown callback failed'), + ); + jest.useRealTimers(); + stderrSpy.mockRestore(); + }); + + it('responds to shutdown even without onShutdown callback', async () => { + const { simulateConnection } = createMockServer(); + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'shutdown' }); + + await flushPromises(); + + expect(getResponse(socket).result).toStrictEqual({ + status: 'shutting down', + }); + }); + + it('rejects multiple requests per connection', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + + // Send a valid request followed by extra data after the newline. + socket.emit( + 'data', + Buffer.from( + `${JSON.stringify({ jsonrpc: '2.0', id: '1', method: 'a' })}\nextra`, + ), + ); + + const response = getResponse(socket); + expect(response.error).toStrictEqual( + expect.objectContaining({ + code: -32600, + message: 'Only one request per connection is allowed', + }), + ); + }); + + it('accumulates partial data across multiple events', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + test: jest.fn().mockResolvedValue('ok'), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + + const full = JSON.stringify({ + jsonrpc: '2.0', + id: '1', + method: 'test', + }); + socket.emit('data', Buffer.from(full.slice(0, 10))); + socket.emit('data', Buffer.from(`${full.slice(10)}\n`)); + + await flushPromises(); + + expect(getResponse(socket).result).toBe('ok'); + }); + + it('silently ignores EPIPE and ECONNRESET socket errors', async () => { + const stderrSpy = jest + .spyOn(process.stderr, 'write') + .mockImplementation(() => true); + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + + socket.emit( + 'error', + Object.assign(new Error('broken pipe'), { code: 'EPIPE' }), + ); + socket.emit( + 'error', + Object.assign(new Error('reset'), { code: 'ECONNRESET' }), + ); + + expect(stderrSpy).not.toHaveBeenCalled(); + stderrSpy.mockRestore(); + }); + + it('logs unexpected socket errors to stderr', async () => { + const stderrSpy = jest + .spyOn(process.stderr, 'write') + .mockImplementation(() => true); + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + + socket.emit( + 'error', + Object.assign(new Error('unexpected'), { code: 'ENOMEM' }), + ); + + expect(stderrSpy).toHaveBeenCalledWith( + expect.stringContaining('Unexpected socket error'), + ); + stderrSpy.mockRestore(); + }); + + it('sends internal error when response serialization fails', async () => { + const { simulateConnection } = createMockServer(); + const circular: Record = {}; + circular.self = circular; + const handlers: RpcHandlerMap = { + bad: jest.fn().mockResolvedValue(circular), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'bad' }); + + await flushPromises(); + + const endCall = (socket.end as jest.Mock).mock.calls[0][0] as string; + const response = JSON.parse(endCall.trim()) as Record; + expect(response.error).toBeDefined(); + }); + + it('handles invalid JSON gracefully', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + socket.emit('data', Buffer.from('not-json\n')); + + await flushPromises(); + + expect((getResponse(socket).error as { code: number }).code).toBe(-32700); + }); + + it('uses null id when request has no id', async () => { + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', method: 'unknown' }); + + await flushPromises(); + + expect(getResponse(socket).id).toBeNull(); + }); + + it('destroys socket when no complete request arrives within timeout', async () => { + jest.useFakeTimers(); + const { simulateConnection } = createMockServer(); + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers: {}, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + + // Send partial data (no newline). + socket.emit('data', Buffer.from('partial')); + + expect(socket.destroy).not.toHaveBeenCalled(); + + await jest.advanceTimersByTimeAsync(30_000); + + expect(socket.destroy).toHaveBeenCalled(); + jest.useRealTimers(); + }); + + it('wraps thrown object with code but no message as internal error', async () => { + const { simulateConnection } = createMockServer(); + const handlers: RpcHandlerMap = { + failing: jest.fn().mockRejectedValue({ code: 42 }), + }; + + await startRpcSocketServer({ + socketPath: '/tmp/test.sock', + handlers, + }); + + const socket = createMockSocket(); + simulateConnection(socket); + sendRequest(socket, { jsonrpc: '2.0', id: '1', method: 'failing' }); + + await flushPromises(); + + expect(getResponse(socket).error).toStrictEqual( + expect.objectContaining({ + code: -32603, + message: 'Internal error', + }), + ); + }); + }); +}); diff --git a/packages/wallet-cli/src/daemon/rpc-socket-server.ts b/packages/wallet-cli/src/daemon/rpc-socket-server.ts new file mode 100644 index 0000000000..07c8a3ed7b --- /dev/null +++ b/packages/wallet-cli/src/daemon/rpc-socket-server.ts @@ -0,0 +1,326 @@ +import { rpcErrors } from '@metamask/rpc-errors'; +import type { + JsonRpcId, + JsonRpcParams, + JsonRpcResponse, +} from '@metamask/utils'; +import { hasProperty, isJsonRpcRequest } from '@metamask/utils'; +import { unlink } from 'node:fs/promises'; +import { createServer } from 'node:net'; +import type { Server } from 'node:net'; + +import type { RpcHandlerMap } from './types'; +import { isErrorWithCode } from './utils'; + +const CONNECTION_TIMEOUT_MS = 30_000; + +/** + * Handle returned by {@link startRpcSocketServer}. + */ +export type RpcSocketServerHandle = { + close: () => Promise; +}; + +/** + * Options for {@link startRpcSocketServer}. + */ +export type StartRpcSocketServerOptions = { + /** The Unix socket path to listen on. */ + socketPath: string; + /** Map of RPC method names to handler functions. */ + handlers: RpcHandlerMap; + /** Callback invoked when a `shutdown` RPC is received. */ + onShutdown?: (() => Promise) | undefined; + /** + * Optional logger for server-side diagnostics (unexpected socket errors, + * unhandled handler rejections, `onShutdown` callback failures). Without + * this, failures fall back to `process.stderr.write`, which is discarded + * when the daemon is spawned with `stdio: 'ignore'`. + */ + log?: ((message: string) => void) | undefined; +}; + +/** + * Start a Unix socket server that processes JSON-RPC requests. + * + * Each connection reads one newline-delimited JSON-RPC request, processes it + * via the provided handler map, writes a JSON-RPC response, and closes. + * + * The special `shutdown` method is intercepted before handler dispatch and + * triggers the provided {@link StartRpcSocketServerOptions.onShutdown} callback + * after responding. + * + * @param options - Server options. + * @param options.socketPath - The Unix socket path to listen on. + * @param options.handlers - Map of RPC method names to handler functions. + * @param options.onShutdown - Optional callback invoked when a `shutdown` RPC is received. + * @param options.log - Optional logger for server-side diagnostics. + * @returns A handle with a `close()` function for cleanup. + */ +export async function startRpcSocketServer({ + socketPath, + handlers, + onShutdown, + log, +}: StartRpcSocketServerOptions): Promise { + const logFn = log ?? defaultLog; + + const server = createServer((socket) => { + let buffer = ''; + + // Destroy connections that never send a complete request line. `unref` so + // the timer alone cannot keep the event loop alive at shutdown. + const timer = setTimeout(() => { + socket.destroy(); + }, CONNECTION_TIMEOUT_MS); + timer.unref(); + + /** + * Clear the idle-connection timer. Called from data, close, and error + * paths so the timer never outlives the connection itself. + */ + const clearIdleTimer = (): void => { + clearTimeout(timer); + }; + + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx === -1) { + return; + } + + clearIdleTimer(); + + // One request per connection. + socket.removeListener('data', onData); + + const line = buffer.slice(0, idx); + const remaining = buffer.slice(idx + 1); + buffer = ''; + + if (remaining.length > 0) { + socket.end( + `${JSON.stringify({ + jsonrpc: '2.0', + error: rpcErrors + .invalidRequest({ + message: 'Only one request per connection is allowed', + }) + .serialize(), + })}\n`, + ); + return; + } + + handleRequest(handlers, line, onShutdown, logFn) + .then((response) => { + socket.end(`${JSON.stringify(response)}\n`); + return undefined; + }) + .catch((dispatchError: unknown) => { + logFn(`Unhandled RPC dispatch error: ${String(dispatchError)}`); + socket.end( + `${JSON.stringify({ + jsonrpc: '2.0', + error: rpcErrors + .internal({ message: 'Internal error' }) + .serialize(), + })}\n`, + ); + }); + }; + socket.on('data', onData); + socket.once('close', clearIdleTimer); + socket.on('error', (socketError: NodeJS.ErrnoException) => { + clearIdleTimer(); + const { code } = socketError; + if (code === 'EPIPE' || code === 'ECONNRESET') { + return; // Expected during probe/disconnect. + } + logFn(`Unexpected socket error: ${String(socketError)}`); + }); + }); + + await listen(server, socketPath); + + return { + close: async (): Promise => { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + }; +} + +/** + * Default fallback logger: writes to stderr. Daemons spawned with + * `stdio: 'ignore'` should always pass an explicit `log`. + * + * @param message - The message to log. + */ +function defaultLog(message: string): void { + process.stderr.write(`${message}\n`); +} + +/** + * Handle a single JSON-RPC request line, intercepting the `shutdown` method. + * + * @param handlers - The RPC handler map. + * @param line - The raw JSON line from the socket. + * @param onShutdown - Optional shutdown callback. + * @param log - Logger for diagnostic messages. + * @returns A JSON-RPC response object. + */ +async function handleRequest( + handlers: RpcHandlerMap, + line: string, + onShutdown: (() => Promise) | undefined, + log: (message: string) => void, +): Promise { + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return { + jsonrpc: '2.0', + id: null, + error: rpcErrors.parse({ message: 'Parse error' }).serialize(), + }; + } + + if (!isJsonRpcRequest(parsed)) { + const id: JsonRpcId = + typeof parsed === 'object' && + parsed !== null && + hasProperty(parsed, 'id') && + isValidJsonRpcId(parsed.id) + ? parsed.id + : null; + return { + jsonrpc: '2.0', + id, + error: rpcErrors + .invalidRequest({ message: 'Invalid JSON-RPC request' }) + .serialize(), + }; + } + + const { id, method, params } = parsed; + + try { + // Intercept shutdown before handler dispatch. + if (method === 'shutdown') { + if (onShutdown) { + setTimeout(() => { + onShutdown().catch((error: unknown) => { + log(`onShutdown callback failed: ${String(error)}`); + }); + }, 0); + } + return { jsonrpc: '2.0', id, result: { status: 'shutting down' } }; + } + + const handler = Object.prototype.hasOwnProperty.call(handlers, method) + ? handlers[method] + : undefined; + if (!handler) { + return { + jsonrpc: '2.0', + id, + error: rpcErrors + .methodNotFound({ message: `Method not found: ${method}` }) + .serialize(), + }; + } + + const result = await handler(coerceHandlerParams(params)); + return { jsonrpc: '2.0', id, result: result ?? null }; + } catch (error) { + log(`RPC handler "${method}" failed: ${String(error)}`); + if (isRpcError(error)) { + return { jsonrpc: '2.0', id, error }; + } + const message = error instanceof Error ? error.message : 'Internal error'; + return { + jsonrpc: '2.0', + id, + error: rpcErrors.internal({ message }).serialize(), + }; + } +} + +/** + * Narrow `params` to the shape handlers expect. JSON-RPC 2.0 requires + * `params`, when present, to be an array or object; both are valid `Json`. + * + * @param params - The validated `params` field from a JSON-RPC request. + * @returns The same value, or `null` when absent. + */ +function coerceHandlerParams( + params: JsonRpcParams | undefined, +): JsonRpcParams | null { + return params ?? null; +} + +/** + * Per JSON-RPC 2.0, `id` must be a string, number, or null. Used when + * salvaging an `id` from a parse-success-but-not-valid-request payload. + * + * @param value - The candidate id. + * @returns True if the value is an acceptable JSON-RPC id. + */ +function isValidJsonRpcId(value: unknown): value is JsonRpcId { + return ( + value === null || typeof value === 'string' || typeof value === 'number' + ); +} + +/** + * Check if an error is an RPC error with a numeric code. + * + * @param error - The error to check. + * @returns True if the error has a numeric code property. + */ +function isRpcError( + error: unknown, +): error is { code: number; message: string } { + return ( + typeof error === 'object' && + error !== null && + hasProperty(error, 'code') && + typeof error.code === 'number' && + hasProperty(error, 'message') && + typeof error.message === 'string' + ); +} + +/** + * Start listening on a Unix socket path, removing any stale socket file. + * + * @param server - The net.Server instance. + * @param socketPath - The Unix socket path. + */ +async function listen(server: Server, socketPath: string): Promise { + try { + await unlink(socketPath); + } catch (error) { + if (!isErrorWithCode(error, 'ENOENT')) { + throw error; + } + } + + return new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(socketPath, () => { + server.removeListener('error', reject); + resolve(); + }); + }); +} diff --git a/packages/wallet-cli/src/daemon/socket-integration.test.ts b/packages/wallet-cli/src/daemon/socket-integration.test.ts new file mode 100644 index 0000000000..0327ed918d --- /dev/null +++ b/packages/wallet-cli/src/daemon/socket-integration.test.ts @@ -0,0 +1,224 @@ +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { pingDaemon, sendCommand } from './daemon-client'; +import { startRpcSocketServer } from './rpc-socket-server'; +import type { RpcSocketServerHandle } from './rpc-socket-server'; + +/** + * End-to-end integration tests for the daemon's IPC layer: real + * `startRpcSocketServer` listening on a real Unix socket, real `sendCommand` + * speaking newline-delimited JSON-RPC over `net.createConnection`. Every + * other test in this package mocks one side of this boundary; these tests + * guard against bugs that only surface when both halves run together + * (framing, response-id correlation, the "one request per connection" + * invariant, real shutdown timing). + */ +describe('socket integration', () => { + const openHandles: RpcSocketServerHandle[] = []; + let socketPath: string; + + beforeEach(() => { + socketPath = join( + tmpdir(), + `mm-cli-it-${process.pid}-${Date.now()}-${Math.random()}.sock`, + ); + }); + + afterEach(async () => { + while (openHandles.length > 0) { + const handle = openHandles.pop(); + await handle?.close().catch(() => undefined); + } + }); + + /** + * Start an RPC server, register its handle for afterEach cleanup, and + * return it. Avoids the `require-atomic-updates` shape lint complains + * about when assigning to a let-bound variable across awaits. + * + * @param options - Options forwarded to `startRpcSocketServer`. + * @returns The started server's handle. + */ + async function startServer( + options: Parameters[0], + ): Promise { + const handle = await startRpcSocketServer(options); + openHandles.push(handle); + return handle; + } + + it('round-trips a JSON-RPC request between sendCommand and startRpcSocketServer', async () => { + await startServer({ + socketPath, + handlers: { + getStatus: async () => ({ pid: 42, uptime: 7 }), + }, + }); + + const response = await sendCommand({ + socketPath, + method: 'getStatus', + timeoutMs: 2_000, + }); + + expect(response).toMatchObject({ + jsonrpc: '2.0', + result: { pid: 42, uptime: 7 }, + }); + }); + + it('returns responsive from pingDaemon when the server is up', async () => { + await startServer({ + socketPath, + handlers: { + getStatus: async () => ({ pid: 1, uptime: 0 }), + }, + }); + + expect(await pingDaemon(socketPath)).toStrictEqual({ + status: 'responsive', + }); + }); + + it('returns absent from pingDaemon when no socket exists', async () => { + expect(await pingDaemon(socketPath)).toStrictEqual({ status: 'absent' }); + }); + + it('surfaces handler errors to the client as JSON-RPC errors', async () => { + await startServer({ + socketPath, + handlers: { + boom: async () => { + throw new Error('handler exploded'); + }, + }, + }); + + const response = await sendCommand({ + socketPath, + method: 'boom', + timeoutMs: 2_000, + }); + + expect(response).toMatchObject({ + jsonrpc: '2.0', + error: expect.objectContaining({ + code: -32603, + message: 'handler exploded', + }), + }); + }); + + it('returns methodNotFound for unknown methods', async () => { + await startServer({ + socketPath, + handlers: { + getStatus: async () => ({ pid: 1, uptime: 0 }), + }, + }); + + const response = await sendCommand({ + socketPath, + method: 'doesNotExist', + timeoutMs: 2_000, + }); + + expect(response).toMatchObject({ + jsonrpc: '2.0', + error: expect.objectContaining({ code: -32601 }), + }); + }); + + it('handles concurrent in-flight requests without bleeding buffers across connections', async () => { + await startServer({ + socketPath, + handlers: { + echo: async (params) => ({ params }), + }, + }); + + const responses = await Promise.all( + Array.from({ length: 8 }, async (_value, index) => + sendCommand({ + socketPath, + method: 'echo', + params: [`request-${index}`], + timeoutMs: 2_000, + }), + ), + ); + + for (const [index, response] of responses.entries()) { + expect(response).toMatchObject({ + result: { params: [`request-${index}`] }, + }); + } + }); + + it('intercepts the `shutdown` method and fires the onShutdown callback', async () => { + const onShutdown = jest.fn().mockResolvedValue(undefined); + await startServer({ + socketPath, + handlers: {}, + onShutdown, + }); + + const response = await sendCommand({ + socketPath, + method: 'shutdown', + timeoutMs: 2_000, + }); + + expect(response).toMatchObject({ + result: { status: 'shutting down' }, + }); + + // onShutdown fires via setTimeout(..., 0) after the response is written. + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(onShutdown).toHaveBeenCalledTimes(1); + }); + + it('rejects pipelined requests with an invalidRequest error', async () => { + await startServer({ + socketPath, + handlers: { + getStatus: async () => ({ pid: 1, uptime: 0 }), + }, + }); + + // Open a raw connection and write two requests at once. + const { createConnection } = await import('node:net'); + const socket = createConnection(socketPath); + await new Promise((resolve, reject) => { + socket.once('connect', () => resolve()); + socket.once('error', reject); + }); + + socket.write( + `${JSON.stringify({ jsonrpc: '2.0', id: '1', method: 'getStatus' })}\n` + + `${JSON.stringify({ jsonrpc: '2.0', id: '2', method: 'getStatus' })}\n`, + ); + + const responseLine = await new Promise((resolve, reject) => { + let buffer = ''; + const onData = (chunk: Buffer): void => { + buffer += chunk.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + socket.once('error', reject); + }); + socket.destroy(); + + const response: unknown = JSON.parse(responseLine); + expect(response).toMatchObject({ + jsonrpc: '2.0', + error: expect.objectContaining({ code: -32600 }), + }); + }); +}); diff --git a/packages/wallet-cli/src/daemon/socket-line.test.ts b/packages/wallet-cli/src/daemon/socket-line.test.ts new file mode 100644 index 0000000000..b91e4b9724 --- /dev/null +++ b/packages/wallet-cli/src/daemon/socket-line.test.ts @@ -0,0 +1,120 @@ +import { EventEmitter } from 'node:events'; +import type { Socket } from 'node:net'; + +import { readLine, writeLine } from './socket-line'; + +/** + * Create a mock Socket backed by EventEmitter. + * + * @returns A mock socket. + */ +function createMockSocket(): Socket { + const emitter = new EventEmitter(); + const socket = Object.assign(emitter, { + write: jest.fn(), + destroy: jest.fn(), + }); + return socket as unknown as Socket; +} + +describe('writeLine', () => { + it('writes the line with a trailing newline', async () => { + const socket = createMockSocket(); + (socket.write as jest.Mock).mockImplementation( + (_data: string, callback: (writeError?: Error) => void) => callback(), + ); + + await writeLine(socket, 'hello'); + expect(socket.write).toHaveBeenCalledWith('hello\n', expect.any(Function)); + }); + + it('rejects when socket.write returns an error', async () => { + const socket = createMockSocket(); + const writeError = new Error('write failed'); + (socket.write as jest.Mock).mockImplementation( + (_data: string, callback: (e?: Error) => void) => callback(writeError), + ); + + await expect(writeLine(socket, 'hello')).rejects.toThrow('write failed'); + }); +}); + +describe('readLine', () => { + it('resolves with the line when data contains a newline', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('data', Buffer.from('hello\n')); + expect(await promise).toBe('hello'); + }); + + it('accumulates data across multiple events', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('data', Buffer.from('hel')); + socket.emit('data', Buffer.from('lo\n')); + expect(await promise).toBe('hello'); + }); + + it('rejects on socket error', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('error', new Error('socket error')); + await expect(promise).rejects.toThrow('socket error'); + }); + + it('rejects on socket end', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('end'); + await expect(promise).rejects.toThrow( + 'Socket closed before response received', + ); + }); + + it('rejects on socket close', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('close'); + await expect(promise).rejects.toThrow( + 'Socket closed before response received', + ); + }); + + it('rejects after timeout when no complete line received', async () => { + jest.useFakeTimers(); + const socket = createMockSocket(); + const promise = readLine(socket, 500); + + jest.advanceTimersByTime(500); + await expect(promise).rejects.toThrow('Socket read timed out'); + jest.useRealTimers(); + }); + + it('resolves before timeout when data arrives in time', async () => { + jest.useFakeTimers(); + const socket = createMockSocket(); + const promise = readLine(socket, 5000); + + socket.emit('data', Buffer.from('hello\n')); + expect(await promise).toBe('hello'); + jest.useRealTimers(); + }); + + it('cleans up listeners after resolving', async () => { + const socket = createMockSocket(); + const promise = readLine(socket); + + socket.emit('data', Buffer.from('hello\n')); + await promise; + + expect(socket.listenerCount('data')).toBe(0); + expect(socket.listenerCount('error')).toBe(0); + expect(socket.listenerCount('end')).toBe(0); + expect(socket.listenerCount('close')).toBe(0); + }); +}); diff --git a/packages/wallet-cli/src/daemon/socket-line.ts b/packages/wallet-cli/src/daemon/socket-line.ts new file mode 100644 index 0000000000..c061174fb8 --- /dev/null +++ b/packages/wallet-cli/src/daemon/socket-line.ts @@ -0,0 +1,86 @@ +import type { Socket } from 'node:net'; + +/** + * Write a newline-delimited line to a socket. + * + * @param socket - The socket to write to. + * @param line - The line to write (without trailing newline). + */ +export async function writeLine(socket: Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +/** + * Read a single newline-delimited line from a socket. + * + * @param socket - The socket to read from. + * @param timeoutMs - Optional timeout in milliseconds. Rejects with a timeout + * error if no complete line is received within the limit. + * @returns The line read (without trailing newline). + */ +export async function readLine( + socket: Socket, + timeoutMs?: number, +): Promise { + return new Promise((resolve, reject) => { + let buffer = ''; + let timer: ReturnType | undefined; + + if (timeoutMs !== undefined) { + timer = setTimeout(() => { + cleanup(); + reject(new Error('Socket read timed out')); + }, timeoutMs); + } + + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + cleanup(); + resolve(buffer.slice(0, idx)); + } + }; + + const onError = (error: Error): void => { + cleanup(); + reject(error); + }; + + const onEnd = (): void => { + cleanup(); + reject(new Error('Socket closed before response received')); + }; + + const onClose = (): void => { + cleanup(); + reject(new Error('Socket closed before response received')); + }; + + /** + * Remove listeners registered by this call and clear the timeout. + */ + function cleanup(): void { + if (timer !== undefined) { + clearTimeout(timer); + } + socket.removeListener('data', onData); + socket.removeListener('error', onError); + socket.removeListener('end', onEnd); + socket.removeListener('close', onClose); + } + + socket.on('data', onData); + socket.once('error', onError); + socket.once('end', onEnd); + socket.once('close', onClose); + }); +} diff --git a/packages/wallet-cli/src/daemon/stop-daemon.test.ts b/packages/wallet-cli/src/daemon/stop-daemon.test.ts new file mode 100644 index 0000000000..f96c9257a8 --- /dev/null +++ b/packages/wallet-cli/src/daemon/stop-daemon.test.ts @@ -0,0 +1,306 @@ +import { rm } from 'node:fs/promises'; + +import { pingDaemon, sendCommand } from './daemon-client'; +import { stopDaemon } from './stop-daemon'; +import { isProcessAlive, readPidFile, sendSignal, waitFor } from './utils'; + +jest.mock('node:fs/promises'); +jest.mock('./daemon-client'); +jest.mock('./utils'); + +const mockRm = jest.mocked(rm); +const mockPingDaemon = jest.mocked(pingDaemon); +const mockSendCommand = jest.mocked(sendCommand); +const mockReadPidFile = jest.mocked(readPidFile); +const mockIsProcessAlive = jest.mocked(isProcessAlive); +const mockSendSignal = jest.mocked(sendSignal); +const mockWaitFor = jest.mocked(waitFor); + +const ABSENT = { status: 'absent' as const }; +const RESPONSIVE = { status: 'responsive' as const }; +const UNREACHABLE = { + status: 'unreachable' as const, + reason: 'refused' as const, + error: new Error('refused'), +}; + +describe('stopDaemon', () => { + beforeEach(() => { + mockRm.mockResolvedValue(undefined); + mockIsProcessAlive.mockReturnValue(false); + }); + + it('returns true when daemon is not running (no PID file)', async () => { + mockReadPidFile.mockResolvedValue(undefined); + mockPingDaemon.mockResolvedValue(ABSENT); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + }); + + it('cleans up stale PID file when daemon is not running', async () => { + mockReadPidFile.mockResolvedValue(123); + mockPingDaemon.mockResolvedValue(ABSENT); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockRm).toHaveBeenCalledWith('/tmp/test.pid', { force: true }); + // Critically: do NOT signal the recorded PID when the socket is absent + // (PID may have been recycled to an unrelated process). + expect(mockSendSignal).not.toHaveBeenCalled(); + }); + + it('signals the recorded PID when the socket is absent but the process is still alive', async () => { + mockReadPidFile.mockResolvedValue(123); + mockPingDaemon.mockResolvedValue(ABSENT); + mockIsProcessAlive.mockReturnValue(true); + mockSendSignal.mockReturnValue(true); + mockWaitFor.mockResolvedValueOnce(true); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + + expect(result).toBe(true); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGTERM'); + expect(log).toHaveBeenCalledWith( + expect.stringContaining('Socket at /tmp/test.sock is absent'), + ); + }); + + it('stops daemon via graceful RPC shutdown', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: { status: 'shutting down' }, + }); + mockWaitFor.mockImplementation(async (check) => { + await check(); + return true; + }); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + + expect(result).toBe(true); + expect(mockSendCommand).toHaveBeenCalledWith({ + socketPath: '/tmp/test.sock', + method: 'shutdown', + }); + expect(log).toHaveBeenCalledWith('Stopping daemon...'); + expect(log).toHaveBeenCalledWith('Daemon stopped.'); + expect(mockRm).toHaveBeenCalledWith('/tmp/test.pid', { force: true }); + expect(mockRm).toHaveBeenCalledWith('/tmp/test.sock', { force: true }); + }); + + it('falls through to SIGTERM when graceful shutdown times out', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: null, + }); + mockSendSignal.mockReturnValue(true); + mockWaitFor + .mockImplementationOnce(async (check) => { + await check(); + return false; + }) + .mockImplementationOnce(async (check) => { + await check(); + return true; + }); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGTERM'); + }); + + it('falls through to SIGKILL when SIGTERM times out', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: null, + }); + mockSendSignal.mockReturnValue(true); + mockWaitFor + .mockImplementationOnce(async (check) => { + await check(); + return false; + }) + .mockImplementationOnce(async (check) => { + await check(); + return false; + }) + .mockImplementationOnce(async (check) => { + await check(); + return true; + }); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGKILL'); + }); + + it('returns false when all strategies fail', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: null, + }); + mockSendSignal.mockReturnValue(true); + mockWaitFor.mockResolvedValue(false); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(false); + }); + + it('skips graceful shutdown when the socket is unreachable and signals directly', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(UNREACHABLE); + mockSendSignal.mockReturnValue(true); + mockWaitFor.mockResolvedValueOnce(true); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockSendCommand).not.toHaveBeenCalled(); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGTERM'); + }); + + it('treats ESRCH on SIGTERM as stopped', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(UNREACHABLE); + mockSendSignal.mockReturnValue(false); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + }); + + it('treats ESRCH on SIGKILL as stopped', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(UNREACHABLE); + mockSendSignal.mockReturnValueOnce(true).mockReturnValueOnce(false); + mockWaitFor.mockResolvedValueOnce(false); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGKILL'); + }); + + it('falls through to SIGKILL when SIGTERM throws EPERM', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(UNREACHABLE); + mockSendSignal + .mockImplementationOnce(() => { + throw Object.assign(new Error('eperm'), { code: 'EPERM' }); + }) + .mockReturnValueOnce(true); + mockWaitFor.mockResolvedValueOnce(true); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(true); + expect(mockSendSignal).toHaveBeenCalledWith(123, 'SIGKILL'); + }); + + it('returns false when both SIGTERM and SIGKILL throw EPERM', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(UNREACHABLE); + mockSendSignal.mockImplementation(() => { + throw Object.assign(new Error('eperm'), { code: 'EPERM' }); + }); + + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid'); + expect(result).toBe(false); + }); + + it('treats sendCommand error as graceful shutdown failure and falls through', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockRejectedValue(new Error('socket error')); + mockWaitFor.mockResolvedValue(true); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + expect(result).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining('Graceful shutdown request failed'), + ); + }); + + it('logs rather than throws when post-stop cleanup of the PID file fails', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: null, + }); + mockWaitFor.mockResolvedValue(true); + mockRm.mockImplementation((path) => + path === '/tmp/test.pid' + ? Promise.reject(new Error('pid rm failed')) + : Promise.resolve(undefined), + ); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + expect(result).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining('Failed to remove PID file'), + ); + }); + + it('logs rather than throws when post-stop cleanup of the socket file fails', async () => { + mockReadPidFile.mockResolvedValue(123); + mockIsProcessAlive.mockReturnValue(true); + mockPingDaemon.mockResolvedValue(RESPONSIVE); + mockSendCommand.mockResolvedValue({ + jsonrpc: '2.0', + id: '1', + result: null, + }); + mockWaitFor.mockResolvedValue(true); + mockRm.mockImplementation((path) => + path === '/tmp/test.sock' + ? Promise.reject(new Error('socket rm failed')) + : Promise.resolve(undefined), + ); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + expect(result).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining('Failed to remove socket file'), + ); + }); + + it('logs rather than throws when stale-PID cleanup fails', async () => { + mockReadPidFile.mockResolvedValue(123); + mockPingDaemon.mockResolvedValue(ABSENT); + mockRm.mockRejectedValue(new Error('rm denied')); + + const log = jest.fn(); + const result = await stopDaemon('/tmp/test.sock', '/tmp/test.pid', log); + expect(result).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining('Failed to remove PID file'), + ); + }); +}); diff --git a/packages/wallet-cli/src/daemon/stop-daemon.ts b/packages/wallet-cli/src/daemon/stop-daemon.ts new file mode 100644 index 0000000000..f7a50b19f6 --- /dev/null +++ b/packages/wallet-cli/src/daemon/stop-daemon.ts @@ -0,0 +1,114 @@ +import { rm } from 'node:fs/promises'; + +import { pingDaemon, sendCommand } from './daemon-client'; +import { isProcessAlive, readPidFile, sendSignal, waitFor } from './utils'; + +/** + * Stop the daemon via a `shutdown` RPC call. Falls back to PID + SIGTERM if + * the socket is unresponsive, and escalates to SIGKILL if SIGTERM is ignored. + * + * Signals are sent when EITHER the socket was observed (`responsive` or + * `unreachable`) OR the recorded PID is still alive on its own. The + * socket-absent + alive-PID branch trades a small risk of signalling a + * recycled PID for the larger risk of leaving an orphan daemon holding the + * SQLite database — which `daemon purge` would otherwise wipe out from + * under it. + * + * @param socketPath - The daemon socket path. + * @param pidPath - The daemon PID file path. + * @param log - Optional logging function for status messages. + * @returns True if the daemon was stopped (or was not running). + */ +export async function stopDaemon( + socketPath: string, + pidPath: string, + log?: (message: string) => void, +): Promise { + const pid = await readPidFile(pidPath); + const ping = await pingDaemon(socketPath); + const socketObserved = + ping.status === 'responsive' || ping.status === 'unreachable'; + const processAlive = pid !== undefined && isProcessAlive(pid); + + if (!socketObserved && !processAlive) { + // No live daemon evidence. Just remove the stale PID file if any. + await cleanupFile(pidPath, 'PID file', log); + return true; + } + + log?.('Stopping daemon...'); + + let stopped = false; + + // Strategy 1: Graceful socket-based shutdown. + if (ping.status === 'responsive') { + try { + await sendCommand({ socketPath, method: 'shutdown' }); + } catch (error) { + log?.(`Graceful shutdown request failed: ${String(error)}`); + } + stopped = await waitFor( + async () => (await pingDaemon(socketPath)).status !== 'responsive', + 5_000, + ); + } + + // Strategy 2: SIGTERM. Signal when either the socket was observed or the + // recorded PID is alive; the absent+alive case typically means someone + // removed the socket from under a live daemon. + if (!stopped && processAlive && pid !== undefined) { + if (!socketObserved) { + log?.( + `Socket at ${socketPath} is absent but recorded pid ${pid} is alive; signalling anyway.`, + ); + } + try { + if (sendSignal(pid, 'SIGTERM')) { + stopped = await waitFor(() => !isProcessAlive(pid), 5_000); + } else { + stopped = true; // Process already gone (ESRCH). + } + } catch (error) { + log?.(`SIGTERM failed: ${String(error)}`); + } + } + + // Strategy 3: SIGKILL. + if (!stopped && processAlive && pid !== undefined) { + try { + if (sendSignal(pid, 'SIGKILL')) { + stopped = await waitFor(() => !isProcessAlive(pid), 2_000); + } else { + stopped = true; // Process already gone (ESRCH). + } + } catch (error) { + log?.(`SIGKILL failed: ${String(error)}`); + } + } + + if (stopped) { + await cleanupFile(pidPath, 'PID file', log); + await cleanupFile(socketPath, 'socket file', log); + log?.('Daemon stopped.'); + } + + return stopped; +} + +/** + * Remove a file best-effort, logging any failure rather than letting it + * propagate. ENOENT is silently ignored via `force: true`. + * + * @param path - The file path to remove. + * @param label - Human-readable label for log messages. + * @param log - Optional log sink. + */ +async function cleanupFile( + path: string, + label: string, + log: ((message: string) => void) | undefined, +): Promise { + await rm(path, { force: true }).catch((error: unknown) => { + log?.(`Failed to remove ${label}: ${String(error)}`); + }); +} diff --git a/packages/wallet-cli/src/daemon/types.ts b/packages/wallet-cli/src/daemon/types.ts index 3e0e246c95..8e9e8be50e 100644 --- a/packages/wallet-cli/src/daemon/types.ts +++ b/packages/wallet-cli/src/daemon/types.ts @@ -1,3 +1,17 @@ +import type { Json } from '@metamask/utils'; + +/** + * A function that handles a JSON-RPC method call. + * + * The `params` argument will be `null` if the client did not provide params. + */ +export type RpcHandler = (params: Json) => Promise; + +/** + * A map of RPC method names to their handler functions. + */ +export type RpcHandlerMap = Record; + /** * Resolved paths for daemon state files. */ @@ -7,3 +21,22 @@ export type DaemonPaths = { logPath: string; dbPath: string; }; + +/** + * Status information returned by the daemon's `getStatus` RPC method. + */ +export type DaemonStatusInfo = { + pid: number; + uptime: number; +}; + +/** + * Configuration passed to the daemon spawner. + */ +export type DaemonSpawnConfig = { + dataDir: string; + infuraProjectId: string; + password: string; + srp: string; + packageRoot: string; +}; diff --git a/packages/wallet-cli/src/daemon/utils.test.ts b/packages/wallet-cli/src/daemon/utils.test.ts new file mode 100644 index 0000000000..08efdc4f69 --- /dev/null +++ b/packages/wallet-cli/src/daemon/utils.test.ts @@ -0,0 +1,185 @@ +import { readFile } from 'node:fs/promises'; + +import { + isErrorWithCode, + isProcessAlive, + readPidFile, + sendSignal, + waitFor, +} from './utils'; + +jest.mock('node:fs/promises'); + +const mockReadFile = jest.mocked(readFile); + +describe('isErrorWithCode', () => { + it('returns true for an Error with a matching code', () => { + const error = Object.assign(new Error('fail'), { code: 'ENOENT' }); + expect(isErrorWithCode(error, 'ENOENT')).toBe(true); + }); + + it('returns false for an Error with a different code', () => { + const error = Object.assign(new Error('fail'), { code: 'EPERM' }); + expect(isErrorWithCode(error, 'ENOENT')).toBe(false); + }); + + it('returns false for an Error without a code', () => { + expect(isErrorWithCode(new Error('fail'), 'ENOENT')).toBe(false); + }); + + it('returns false for non-Error values', () => { + expect(isErrorWithCode('not an error', 'ENOENT')).toBe(false); + expect(isErrorWithCode(null, 'ENOENT')).toBe(false); + expect(isErrorWithCode(undefined, 'ENOENT')).toBe(false); + }); +}); + +describe('readPidFile', () => { + it('returns the PID number from a single-line file', async () => { + mockReadFile.mockResolvedValue('12345'); + expect(await readPidFile('/tmp/test.pid')).toBe(12345); + }); + + it('returns the PID from the first line when the file contains daemon metadata', async () => { + // The daemon writes `${pid}\n${startTime}\n` so it can verify ownership + // on cleanup; only the first line is the PID. + mockReadFile.mockResolvedValue('12345\n1715553908123\n'); + expect(await readPidFile('/tmp/test.pid')).toBe(12345); + }); + + it('returns undefined for ENOENT', async () => { + mockReadFile.mockRejectedValue( + Object.assign(new Error('not found'), { code: 'ENOENT' }), + ); + expect(await readPidFile('/tmp/test.pid')).toBeUndefined(); + }); + + it('returns undefined for NaN content', async () => { + mockReadFile.mockResolvedValue('not-a-number'); + expect(await readPidFile('/tmp/test.pid')).toBeUndefined(); + }); + + it('returns undefined for zero', async () => { + mockReadFile.mockResolvedValue('0'); + expect(await readPidFile('/tmp/test.pid')).toBeUndefined(); + }); + + it('returns undefined for negative numbers', async () => { + mockReadFile.mockResolvedValue('-1'); + expect(await readPidFile('/tmp/test.pid')).toBeUndefined(); + }); + + it('returns undefined for an empty file', async () => { + mockReadFile.mockResolvedValue(''); + expect(await readPidFile('/tmp/test.pid')).toBeUndefined(); + }); + + it('rethrows non-ENOENT errors', async () => { + mockReadFile.mockRejectedValue( + Object.assign(new Error('permission denied'), { code: 'EACCES' }), + ); + await expect(readPidFile('/tmp/test.pid')).rejects.toThrow( + 'permission denied', + ); + }); +}); + +describe('isProcessAlive', () => { + it('returns true when process.kill(pid, 0) succeeds', () => { + jest.spyOn(process, 'kill').mockImplementation(() => true); + expect(isProcessAlive(123)).toBe(true); + }); + + it('returns true on EPERM (process exists but no permission)', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw Object.assign(new Error('eperm'), { code: 'EPERM' }); + }); + expect(isProcessAlive(123)).toBe(true); + }); + + it('returns false on ESRCH (process gone)', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw Object.assign(new Error('esrch'), { code: 'ESRCH' }); + }); + expect(isProcessAlive(123)).toBe(false); + }); + + it('rethrows unknown errors instead of guessing the process is dead', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw Object.assign(new Error('einval'), { code: 'EINVAL' }); + }); + expect(() => isProcessAlive(123)).toThrow('einval'); + }); + + it('rethrows non-system errors', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw new Error('unexpected'); + }); + expect(() => isProcessAlive(123)).toThrow('unexpected'); + }); +}); + +describe('sendSignal', () => { + it('returns true when signal is delivered', () => { + jest.spyOn(process, 'kill').mockImplementation(() => true); + expect(sendSignal(123, 'SIGTERM')).toBe(true); + }); + + it('returns false on ESRCH (process gone)', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw Object.assign(new Error('esrch'), { code: 'ESRCH' }); + }); + expect(sendSignal(123, 'SIGTERM')).toBe(false); + }); + + it('rethrows other errors', () => { + jest.spyOn(process, 'kill').mockImplementation(() => { + throw Object.assign(new Error('eperm'), { code: 'EPERM' }); + }); + expect(() => sendSignal(123, 'SIGTERM')).toThrow('eperm'); + }); +}); + +describe('waitFor', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('returns true when check passes immediately', async () => { + expect(await waitFor(() => true, 1000)).toBe(true); + }); + + it('returns true when check passes after polling', async () => { + let calls = 0; + const check = (): boolean => { + calls += 1; + return calls >= 3; + }; + + const promise = waitFor(check, 5000); + await jest.advanceTimersByTimeAsync(500); + expect(await promise).toBe(true); + }); + + it('returns false on timeout', async () => { + const promise = waitFor(() => false, 500); + await jest.advanceTimersByTimeAsync(750); + expect(await promise).toBe(false); + }); + + it('works with async check functions', async () => { + let calls = 0; + const check = async (): Promise => { + calls += 1; + return calls >= 2; + }; + + const promise = waitFor(check, 5000); + await jest.advanceTimersByTimeAsync(500); + expect(await promise).toBe(true); + }); +}); diff --git a/packages/wallet-cli/src/daemon/utils.ts b/packages/wallet-cli/src/daemon/utils.ts new file mode 100644 index 0000000000..a4cdefb78e --- /dev/null +++ b/packages/wallet-cli/src/daemon/utils.ts @@ -0,0 +1,115 @@ +import { hasProperty } from '@metamask/utils'; +import { readFile } from 'node:fs/promises'; + +/** + * Check whether an unknown error is a Node.js system error with the given code. + * + * @param error - The error to check. + * @param code - The expected error code (e.g. 'ENOENT', 'EPERM'). + * @returns True if the error matches the code. + */ +export function isErrorWithCode(error: unknown, code: string): boolean { + // Duck-typed rather than `instanceof Error` so error values that crossed a + // realm boundary (e.g. from Node built-ins under jest's + // `--experimental-vm-modules`) still pass. + return ( + typeof error === 'object' && + error !== null && + hasProperty(error, 'code') && + error.code === code + ); +} + +/** + * Read a PID from a file. The file may contain just the PID, or the PID on + * the first line followed by additional metadata (e.g. start time written by + * the daemon). + * + * @param pidPath - The PID file path. + * @returns The PID, or undefined if the file is missing or its first line is + * not a positive integer. + */ +export async function readPidFile( + pidPath: string, +): Promise { + let contents: string; + try { + contents = await readFile(pidPath, 'utf-8'); + } catch (error: unknown) { + if (isErrorWithCode(error, 'ENOENT')) { + return undefined; + } + throw error; + } + // String.prototype.split always returns at least one element, so [0] is safe. + const pid = Number(contents.split('\n')[0].trim()); + return Number.isInteger(pid) && pid > 0 ? pid : undefined; +} + +/** + * Check whether a process is alive by sending signal 0. + * + * Treats `ESRCH` as "process is gone", `EPERM` as "process exists but we + * cannot signal it" (still alive from our perspective), and rethrows + * anything else so the caller can surface unexpected failures rather than + * silently assuming the process is dead. + * + * @param pid - The process ID to check. + * @returns True if the process exists. + */ +export function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch (error: unknown) { + if (isErrorWithCode(error, 'ESRCH')) { + return false; + } + if (isErrorWithCode(error, 'EPERM')) { + return true; + } + throw error; + } +} + +/** + * Send a signal to a process. Returns true if the signal was sent, false if + * the process does not exist (ESRCH). Re-throws on permission errors and + * other failures. + * + * @param pid - The process ID. + * @param signal - The signal to send. + * @returns True if the signal was delivered, false if the process is gone. + */ +export function sendSignal(pid: number, signal: NodeJS.Signals): boolean { + try { + process.kill(pid, signal); + return true; + } catch (error: unknown) { + if (isErrorWithCode(error, 'ESRCH')) { + return false; + } + throw error; + } +} + +/** + * Poll until a condition is met or the timeout elapses. + * + * @param check - A function that returns true when the condition is met. + * @param timeoutMs - Maximum time to wait in milliseconds. + * @returns True if the condition was met, false on timeout. + */ +export async function waitFor( + check: () => boolean | Promise, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await check()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, 250)); + } + return false; +} diff --git a/yarn.lock b/yarn.lock index ad04761e4c..aac17a66d0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4405,6 +4405,13 @@ __metadata: languageName: node linkType: hard +"@inquirer/ansi@npm:^2.0.7": + version: 2.0.7 + resolution: "@inquirer/ansi@npm:2.0.7" + checksum: 10/ae4ff228412f1f67d78aa9a7410e07e692eeb7c9b75034825f1039898821b02505dfe934be53d6ee4ce5bde9e7ff6b4ce7547bacc1c9aa441396cb9b99717e0f + languageName: node + linkType: hard + "@inquirer/checkbox@npm:^5.0.5": version: 5.0.7 resolution: "@inquirer/checkbox@npm:5.0.7" @@ -4422,6 +4429,21 @@ __metadata: languageName: node linkType: hard +"@inquirer/confirm@npm:^6.0.11": + version: 6.1.1 + resolution: "@inquirer/confirm@npm:6.1.1" + dependencies: + "@inquirer/core": "npm:^11.2.1" + "@inquirer/type": "npm:^4.0.7" + peerDependencies: + "@types/node": ">=18" + peerDependenciesMeta: + "@types/node": + optional: true + checksum: 10/631d35403438949abe073ba6b7823682d4bd995a582226df0c8d18fd2b069a42e415bc40988fd10b138984701d585da32086da6d90814ec50eaa8a2778dc6bfb + languageName: node + linkType: hard + "@inquirer/confirm@npm:^6.0.5": version: 6.0.7 resolution: "@inquirer/confirm@npm:6.0.7" @@ -4457,6 +4479,26 @@ __metadata: languageName: node linkType: hard +"@inquirer/core@npm:^11.2.1": + version: 11.2.1 + resolution: "@inquirer/core@npm:11.2.1" + dependencies: + "@inquirer/ansi": "npm:^2.0.7" + "@inquirer/figures": "npm:^2.0.7" + "@inquirer/type": "npm:^4.0.7" + cli-width: "npm:^4.1.0" + fast-wrap-ansi: "npm:^0.2.0" + mute-stream: "npm:^3.0.0" + signal-exit: "npm:^4.1.0" + peerDependencies: + "@types/node": ">=18" + peerDependenciesMeta: + "@types/node": + optional: true + checksum: 10/687c2ee50985a8eb3142c902e99dd888176e66af2eed0781238e7b4b1f327737d4db053345a20b74c7fefa47da539238eea0afcd1e59e577409ca5ead94aa6d1 + languageName: node + linkType: hard + "@inquirer/editor@npm:^5.0.5": version: 5.0.7 resolution: "@inquirer/editor@npm:5.0.7" @@ -4510,6 +4552,13 @@ __metadata: languageName: node linkType: hard +"@inquirer/figures@npm:^2.0.7": + version: 2.0.7 + resolution: "@inquirer/figures@npm:2.0.7" + checksum: 10/145d74307b17712807ccd561787ecd1a72d574165b4d07a146e0d815f8e0320ffd39fb07f81a33910a4d2f0e098862b35b87614c4d3da740a29fa42c38dcb768 + languageName: node + linkType: hard + "@inquirer/input@npm:^5.0.5": version: 5.0.7 resolution: "@inquirer/input@npm:5.0.7" @@ -4639,6 +4688,18 @@ __metadata: languageName: node linkType: hard +"@inquirer/type@npm:^4.0.7": + version: 4.0.7 + resolution: "@inquirer/type@npm:4.0.7" + peerDependencies: + "@types/node": ">=18" + peerDependenciesMeta: + "@types/node": + optional: true + checksum: 10/97769b74264a2575c12c231e3d4acf98b4ae237fc987cec6b459f88b907b1729623403b8d9fe0cf2ca21743114777c64e3031fbeff72e9da37fbf4c8985f587f + languageName: node + linkType: hard + "@isaacs/cliui@npm:^8.0.2": version: 8.0.2 resolution: "@isaacs/cliui@npm:8.0.2" @@ -8763,8 +8824,10 @@ __metadata: version: 0.0.0-use.local resolution: "@metamask/wallet-cli@workspace:packages/wallet-cli" dependencies: + "@inquirer/confirm": "npm:^6.0.11" "@metamask/auto-changelog": "npm:^6.1.0" "@metamask/base-controller": "npm:^9.1.0" + "@metamask/rpc-errors": "npm:^7.0.2" "@metamask/utils": "npm:^11.11.0" "@metamask/wallet": "npm:^3.0.0" "@oclif/core": "npm:^4.10.5" From 646dc7ab8e565b765082506ebe9a9496c78ad8f8 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 12 Jun 2026 12:56:57 +0100 Subject: [PATCH 2/2] docs(wallet-cli): link PR #9108 in daemon transport changelog entry Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/wallet-cli/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/wallet-cli/CHANGELOG.md b/packages/wallet-cli/CHANGELOG.md index 4c86127e79..19cc9a1c8c 100644 --- a/packages/wallet-cli/CHANGELOG.md +++ b/packages/wallet-cli/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Add the daemon transport layer: a newline-delimited JSON-RPC client and server over a Unix socket, plus daemon spawn/stop lifecycle helpers +- Add the daemon transport layer: a newline-delimited JSON-RPC client and server over a Unix socket, plus daemon spawn/stop lifecycle helpers ([#9108](https://github.com/MetaMask/core/pull/9108)) - Add SQLite-backed persistence for wallet controller state ([#9067](https://github.com/MetaMask/core/pull/9067)) - A `KeyValueStore` backed by `better-sqlite3` for synchronous reads and writes. - `loadState` to rehydrate persist-flagged controller state from the store and `subscribeToChanges` to write persist-flagged controller state through to disk on every `stateChanged` event.