diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts index 7d16a11c829..54f486faee8 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts @@ -1,8 +1,9 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, it } from "@effect/vitest"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; import { ChildProcessSpawner } from "effect/unstable/process"; @@ -11,6 +12,7 @@ import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; import * as ProcessDiagnostics from "./ProcessDiagnostics.ts"; const encoder = new TextEncoder(); +const encodeUnknownJson = Schema.encodeUnknownSync(Schema.UnknownFromJsonString); function mockHandle(result: { readonly stdout?: string; @@ -42,7 +44,7 @@ describe("ProcessDiagnostics", () => { ].join("\n"), ); - expect(rows).toEqual([ + assert.deepStrictEqual(rows, [ { pid: 10, ppid: 1, @@ -126,15 +128,21 @@ describe("ProcessDiagnostics", () => { ], }); - expect(diagnostics.serverPid).toBe(100); - expect(DateTime.formatIso(diagnostics.readAt)).toBe("2026-05-05T10:00:00.000Z"); - expect(diagnostics.processCount).toBe(2); - expect(diagnostics.totalRssBytes).toBe(6_000); - expect(diagnostics.totalCpuPercent).toBe(4.75); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102]); - expect(diagnostics.processes.map((process) => process.depth)).toEqual([0, 1]); - expect(Option.getOrNull(diagnostics.processes[0]!.pgid)).toBe(100); - expect(diagnostics.processes[0]?.childPids).toEqual([102]); + assert.equal(diagnostics.serverPid, 100); + assert.equal(DateTime.formatIso(diagnostics.readAt), "2026-05-05T10:00:00.000Z"); + assert.equal(diagnostics.processCount, 2); + assert.equal(diagnostics.totalRssBytes, 6_000); + assert.equal(diagnostics.totalCpuPercent, 4.75); + assert.deepStrictEqual( + diagnostics.processes.map((process) => process.pid), + [101, 102], + ); + assert.deepStrictEqual( + diagnostics.processes.map((process) => process.depth), + [0, 1], + ); + assert.equal(Option.getOrNull(diagnostics.processes[0]!.pgid), 100); + assert.deepStrictEqual(diagnostics.processes[0]?.childPids, [102]); }), ); @@ -177,7 +185,10 @@ describe("ProcessDiagnostics", () => { ], }); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102, 103]); + assert.deepStrictEqual( + diagnostics.processes.map((process) => process.pid), + [101, 102, 103], + ); }), ); @@ -210,8 +221,11 @@ describe("ProcessDiagnostics", () => { Effect.provide(layer), ); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([4242]); - expect(commands).toEqual([ + assert.deepStrictEqual( + diagnostics.processes.map((process) => process.pid), + [4242], + ); + assert.deepStrictEqual(commands, [ { command: "ps", args: ["-axo", "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="], @@ -241,23 +255,129 @@ describe("ProcessDiagnostics", () => { Effect.flip, ); - expect(error).toMatchObject({ - _tag: "ProcessDiagnosticsQueryFailedError", - command: "ps", - argCount: 2, - cwd: process.cwd(), - exitCode: 17, - stdoutBytes: 22, - stderrBytes: 21, - stdoutTruncated: false, - stderrTruncated: false, - }); - expect(error.message).toBe( + if (error._tag !== "ProcessDiagnosticsQueryFailedError") { + assert.fail(`Expected ProcessDiagnosticsQueryFailedError, got ${error._tag}`); + } + assert.deepStrictEqual( + { + command: error.command, + argCount: error.argCount, + cwd: error.cwd, + exitCode: error.exitCode, + stdoutBytes: error.stdoutBytes, + stderrBytes: error.stderrBytes, + stdoutTruncated: error.stdoutTruncated, + stderrTruncated: error.stderrTruncated, + }, + { + command: "ps", + argCount: 2, + cwd: process.cwd(), + exitCode: 17, + stdoutBytes: 22, + stderrBytes: 21, + stdoutTruncated: false, + stderrTruncated: false, + }, + ); + assert.equal( + error.message, `Process diagnostics query 'ps' failed with exit code 17 in '${process.cwd()}'.`, ); }), ); + it.effect("decodes Windows process JSON through Schema and skips invalid records", () => + Effect.gen(function* () { + const commands: Array<{ readonly command: string; readonly args: ReadonlyArray }> = + []; + const spawnerLayer = Layer.succeed( + ChildProcessSpawner.ChildProcessSpawner, + ChildProcessSpawner.make((command) => { + const childProcess = command as unknown as { + readonly command: string; + readonly args: ReadonlyArray; + }; + commands.push({ command: childProcess.command, args: childProcess.args }); + return Effect.succeed( + mockHandle({ + stdout: encodeUnknownJson([ + { + ProcessId: process.pid, + ParentProcessId: 1, + Name: "node.exe", + CommandLine: "t3 server", + Status: "", + WorkingSetSize: 1024, + PercentProcessorTime: 0, + }, + { + ProcessId: 4242, + ParentProcessId: process.pid, + Name: "agent.exe", + CommandLine: null, + Status: "Running", + WorkingSetSize: 2048.6, + PercentProcessorTime: 1.25, + }, + { + ProcessId: "invalid", + ParentProcessId: process.pid, + Name: "bad.exe", + }, + ]), + }), + ); + }), + ); + + const rows = yield* ProcessDiagnostics.readProcessRows.pipe( + Effect.provide(spawnerLayer), + Effect.provideService(HostProcessPlatform, "win32"), + ); + + assert.deepStrictEqual(commands, [ + { + command: "powershell.exe", + args: [ + "-NoProfile", + "-NonInteractive", + "-Command", + [ + "$processes = Get-CimInstance Win32_Process | ForEach-Object {", + '$perf = Get-CimInstance Win32_PerfFormattedData_PerfProc_Process -Filter "IDProcess = $($_.ProcessId)" -ErrorAction SilentlyContinue;', + "[pscustomobject]@{ ProcessId = $_.ProcessId; ParentProcessId = $_.ParentProcessId; Name = $_.Name; CommandLine = $_.CommandLine; Status = $_.Status; WorkingSetSize = $_.WorkingSetSize; PercentProcessorTime = if ($perf) { $perf.PercentProcessorTime } else { 0 } }", + "};", + "$processes | ConvertTo-Json -Compress -Depth 3", + ].join(" "), + ], + }, + ]); + assert.deepStrictEqual(rows, [ + { + pid: process.pid, + ppid: 1, + pgid: null, + status: "Live", + cpuPercent: 0, + rssBytes: 1024, + elapsed: "", + command: "t3 server", + }, + { + pid: 4242, + ppid: process.pid, + pgid: null, + status: "Running", + cpuPercent: 1.25, + rssBytes: 2049, + elapsed: "", + command: "agent.exe", + }, + ]); + }), + ); + it.effect("does not allow signaling the diagnostics query process", () => Effect.gen(function* () { const spawnerLayer = Layer.succeed( @@ -280,7 +400,7 @@ describe("ProcessDiagnostics", () => { Effect.provide(layer), ); - expect(result).toEqual({ + assert.deepStrictEqual(result, { pid: 4242, signal: "SIGINT", signaled: false, diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.ts b/apps/server/src/diagnostics/ProcessDiagnostics.ts index b39d560a228..b0a91eb3034 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.ts @@ -28,9 +28,22 @@ export interface ProcessRow { readonly command: string; } -const PROCESS_QUERY_TIMEOUT_MS = 1_000; +const PROCESS_QUERY_TIMEOUT = Duration.seconds(1); const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="; const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024; +const WindowsProcessJson = Schema.fromJsonString(Schema.Unknown); +const WindowsProcessRecord = Schema.Struct({ + ProcessId: Schema.Number, + ParentProcessId: Schema.Number, + CommandLine: Schema.optional(Schema.OptionFromNullOr(Schema.String)), + Name: Schema.optional(Schema.OptionFromNullOr(Schema.String)), + Status: Schema.optional(Schema.OptionFromNullOr(Schema.String)), + WorkingSetSize: Schema.optional(Schema.OptionFromNullOr(Schema.Number)), + PercentProcessorTime: Schema.optional(Schema.OptionFromNullOr(Schema.Number)), +}); +type WindowsProcessRecord = typeof WindowsProcessRecord.Type; +const decodeWindowsProcessJson = Schema.decodeUnknownOption(WindowsProcessJson); +const decodeWindowsProcessRecord = Schema.decodeUnknownOption(WindowsProcessRecord); export class ProcessDiagnostics extends Context.Service< ProcessDiagnostics, @@ -201,51 +214,72 @@ export function parsePosixProcessRows(output: string): ReadonlyArray return rows; } -function normalizeWindowsProcessRow(value: unknown): ProcessRow | null { - if (typeof value !== "object" || value === null) return null; - const record = value as Record; - const pid = typeof record.ProcessId === "number" ? record.ProcessId : null; - const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null; - const commandLine = - typeof record.CommandLine === "string" && record.CommandLine.trim().length > 0 - ? record.CommandLine - : typeof record.Name === "string" - ? record.Name - : null; - const workingSet = - typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize) - ? Math.max(0, Math.round(record.WorkingSetSize)) - : 0; - const cpuPercent = - typeof record.PercentProcessorTime === "number" && Number.isFinite(record.PercentProcessorTime) - ? Math.max(0, record.PercentProcessorTime) - : 0; - - if (!pid || pid <= 0 || ppid === null || ppid < 0 || !commandLine) return null; - return { - pid, - ppid, +function optionFromOptional(value: Option.Option | undefined): Option.Option { + return value ?? Option.none(); +} + +function nonEmptyString(value: Option.Option | undefined): Option.Option { + return optionFromOptional(value).pipe(Option.filter((text) => text.trim().length > 0)); +} + +function optionalNumberOrElse( + value: Option.Option | undefined, + onSome: (value: number) => number, +): number { + return Option.match(optionFromOptional(value), { + onNone: () => 0, + onSome, + }); +} + +function normalizeWindowsProcessRecord(record: WindowsProcessRecord): Option.Option { + if ( + !Number.isInteger(record.ProcessId) || + record.ProcessId <= 0 || + !Number.isInteger(record.ParentProcessId) || + record.ParentProcessId < 0 + ) { + return Option.none(); + } + + const commandLine = nonEmptyString(record.CommandLine).pipe( + Option.orElse(() => nonEmptyString(record.Name)), + ); + if (Option.isNone(commandLine)) return Option.none(); + + return Option.some({ + pid: record.ProcessId, + ppid: record.ParentProcessId, pgid: null, - status: typeof record.Status === "string" && record.Status.length > 0 ? record.Status : "Live", - cpuPercent, - rssBytes: workingSet, + status: Option.getOrElse(nonEmptyString(record.Status), () => "Live"), + cpuPercent: optionalNumberOrElse(record.PercentProcessorTime, (value) => Math.max(0, value)), + rssBytes: optionalNumberOrElse(record.WorkingSetSize, (value) => + Math.max(0, Math.round(value)), + ), elapsed: "", - command: commandLine, - }; + command: commandLine.value, + }); +} + +function normalizeWindowsProcessRow(value: unknown): Option.Option { + return Option.match(decodeWindowsProcessRecord(value), { + onNone: () => Option.none(), + onSome: normalizeWindowsProcessRecord, + }); } function parseWindowsProcessRows(output: string): ReadonlyArray { if (output.trim().length === 0) return []; - try { - const parsed = JSON.parse(output) as unknown; - const records = Array.isArray(parsed) ? parsed : [parsed]; - return records.flatMap((record) => { - const row = normalizeWindowsProcessRow(record); - return row ? [row] : []; - }); - } catch { - return []; - } + return Option.match(decodeWindowsProcessJson(output), { + onNone: () => [], + onSome: (parsed) => + (Array.isArray(parsed) ? parsed : [parsed]).flatMap((record) => + Option.match(normalizeWindowsProcessRow(record), { + onNone: () => [], + onSome: (row) => [row], + }), + ), + }); } export function buildDescendantEntries( @@ -381,7 +415,7 @@ const runProcess = Effect.fn("runProcess")(function* (input: { } satisfies ProcessOutput; }).pipe( Effect.scoped, - Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)), + Effect.timeoutOption(PROCESS_QUERY_TIMEOUT), Effect.flatMap((result) => Option.match(result, { onNone: () => @@ -390,7 +424,7 @@ const runProcess = Effect.fn("runProcess")(function* (input: { command: input.command, argCount: input.args.length, cwd, - timeoutMillis: PROCESS_QUERY_TIMEOUT_MS, + timeoutMillis: Duration.toMillis(PROCESS_QUERY_TIMEOUT), }), ), onSome: Effect.succeed,