Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 147 additions & 27 deletions apps/server/src/diagnostics/ProcessDiagnostics.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, it } from "@effect/vitest";
import * as DateTime from "effect/DateTime";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Schema from "effect/Schema";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import { ChildProcessSpawner } from "effect/unstable/process";
Expand All @@ -11,6 +12,7 @@ import { HostProcessPlatform } from "@t3tools/shared/hostProcess";
import * as ProcessDiagnostics from "./ProcessDiagnostics.ts";

const encoder = new TextEncoder();
const encodeUnknownJson = Schema.encodeUnknownSync(Schema.UnknownFromJsonString);

function mockHandle(result: {
readonly stdout?: string;
Expand Down Expand Up @@ -42,7 +44,7 @@ describe("ProcessDiagnostics", () => {
].join("\n"),
);

expect(rows).toEqual([
assert.deepStrictEqual(rows, [
{
pid: 10,
ppid: 1,
Expand Down Expand Up @@ -126,15 +128,21 @@ describe("ProcessDiagnostics", () => {
],
});

expect(diagnostics.serverPid).toBe(100);
expect(DateTime.formatIso(diagnostics.readAt)).toBe("2026-05-05T10:00:00.000Z");
expect(diagnostics.processCount).toBe(2);
expect(diagnostics.totalRssBytes).toBe(6_000);
expect(diagnostics.totalCpuPercent).toBe(4.75);
expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102]);
expect(diagnostics.processes.map((process) => process.depth)).toEqual([0, 1]);
expect(Option.getOrNull(diagnostics.processes[0]!.pgid)).toBe(100);
expect(diagnostics.processes[0]?.childPids).toEqual([102]);
assert.equal(diagnostics.serverPid, 100);
assert.equal(DateTime.formatIso(diagnostics.readAt), "2026-05-05T10:00:00.000Z");
assert.equal(diagnostics.processCount, 2);
assert.equal(diagnostics.totalRssBytes, 6_000);
assert.equal(diagnostics.totalCpuPercent, 4.75);
assert.deepStrictEqual(
diagnostics.processes.map((process) => process.pid),
[101, 102],
);
assert.deepStrictEqual(
diagnostics.processes.map((process) => process.depth),
[0, 1],
);
assert.equal(Option.getOrNull(diagnostics.processes[0]!.pgid), 100);
assert.deepStrictEqual(diagnostics.processes[0]?.childPids, [102]);
}),
);

Expand Down Expand Up @@ -177,7 +185,10 @@ describe("ProcessDiagnostics", () => {
],
});

expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102, 103]);
assert.deepStrictEqual(
diagnostics.processes.map((process) => process.pid),
[101, 102, 103],
);
}),
);

Expand Down Expand Up @@ -210,8 +221,11 @@ describe("ProcessDiagnostics", () => {
Effect.provide(layer),
);

expect(diagnostics.processes.map((process) => process.pid)).toEqual([4242]);
expect(commands).toEqual([
assert.deepStrictEqual(
diagnostics.processes.map((process) => process.pid),
[4242],
);
assert.deepStrictEqual(commands, [
{
command: "ps",
args: ["-axo", "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="],
Expand Down Expand Up @@ -241,23 +255,129 @@ describe("ProcessDiagnostics", () => {
Effect.flip,
);

expect(error).toMatchObject({
_tag: "ProcessDiagnosticsQueryFailedError",
command: "ps",
argCount: 2,
cwd: process.cwd(),
exitCode: 17,
stdoutBytes: 22,
stderrBytes: 21,
stdoutTruncated: false,
stderrTruncated: false,
});
expect(error.message).toBe(
if (error._tag !== "ProcessDiagnosticsQueryFailedError") {
assert.fail(`Expected ProcessDiagnosticsQueryFailedError, got ${error._tag}`);
}
assert.deepStrictEqual(
{
command: error.command,
argCount: error.argCount,
cwd: error.cwd,
exitCode: error.exitCode,
stdoutBytes: error.stdoutBytes,
stderrBytes: error.stderrBytes,
stdoutTruncated: error.stdoutTruncated,
stderrTruncated: error.stderrTruncated,
},
{
command: "ps",
argCount: 2,
cwd: process.cwd(),
exitCode: 17,
stdoutBytes: 22,
stderrBytes: 21,
stdoutTruncated: false,
stderrTruncated: false,
},
);
assert.equal(
error.message,
`Process diagnostics query 'ps' failed with exit code 17 in '${process.cwd()}'.`,
);
}),
);

it.effect("decodes Windows process JSON through Schema and skips invalid records", () =>
Effect.gen(function* () {
const commands: Array<{ readonly command: string; readonly args: ReadonlyArray<string> }> =
[];
const spawnerLayer = Layer.succeed(
ChildProcessSpawner.ChildProcessSpawner,
ChildProcessSpawner.make((command) => {
const childProcess = command as unknown as {
readonly command: string;
readonly args: ReadonlyArray<string>;
};
commands.push({ command: childProcess.command, args: childProcess.args });
return Effect.succeed(
mockHandle({
stdout: encodeUnknownJson([
{
ProcessId: process.pid,
ParentProcessId: 1,
Name: "node.exe",
CommandLine: "t3 server",
Status: "",
WorkingSetSize: 1024,
PercentProcessorTime: 0,
},
{
ProcessId: 4242,
ParentProcessId: process.pid,
Name: "agent.exe",
CommandLine: null,
Status: "Running",
WorkingSetSize: 2048.6,
PercentProcessorTime: 1.25,
},
{
ProcessId: "invalid",
ParentProcessId: process.pid,
Name: "bad.exe",
},
]),
}),
);
}),
);

const rows = yield* ProcessDiagnostics.readProcessRows.pipe(
Effect.provide(spawnerLayer),
Effect.provideService(HostProcessPlatform, "win32"),
);

assert.deepStrictEqual(commands, [
{
command: "powershell.exe",
args: [
"-NoProfile",
"-NonInteractive",
"-Command",
[
"$processes = Get-CimInstance Win32_Process | ForEach-Object {",
'$perf = Get-CimInstance Win32_PerfFormattedData_PerfProc_Process -Filter "IDProcess = $($_.ProcessId)" -ErrorAction SilentlyContinue;',
"[pscustomobject]@{ ProcessId = $_.ProcessId; ParentProcessId = $_.ParentProcessId; Name = $_.Name; CommandLine = $_.CommandLine; Status = $_.Status; WorkingSetSize = $_.WorkingSetSize; PercentProcessorTime = if ($perf) { $perf.PercentProcessorTime } else { 0 } }",
"};",
"$processes | ConvertTo-Json -Compress -Depth 3",
].join(" "),
],
},
]);
assert.deepStrictEqual(rows, [
{
pid: process.pid,
ppid: 1,
pgid: null,
status: "Live",
cpuPercent: 0,
rssBytes: 1024,
elapsed: "",
command: "t3 server",
},
{
pid: 4242,
ppid: process.pid,
pgid: null,
status: "Running",
cpuPercent: 1.25,
rssBytes: 2049,
elapsed: "",
command: "agent.exe",
},
]);
}),
);

it.effect("does not allow signaling the diagnostics query process", () =>
Effect.gen(function* () {
const spawnerLayer = Layer.succeed(
Expand All @@ -280,7 +400,7 @@ describe("ProcessDiagnostics", () => {
Effect.provide(layer),
);

expect(result).toEqual({
assert.deepStrictEqual(result, {
pid: 4242,
signal: "SIGINT",
signaled: false,
Expand Down
118 changes: 76 additions & 42 deletions apps/server/src/diagnostics/ProcessDiagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,22 @@ export interface ProcessRow {
readonly command: string;
}

const PROCESS_QUERY_TIMEOUT_MS = 1_000;
const PROCESS_QUERY_TIMEOUT = Duration.seconds(1);
const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command=";
const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024;
const WindowsProcessJson = Schema.fromJsonString(Schema.Unknown);
const WindowsProcessRecord = Schema.Struct({
ProcessId: Schema.Number,
ParentProcessId: Schema.Number,
CommandLine: Schema.optional(Schema.OptionFromNullOr(Schema.String)),
Name: Schema.optional(Schema.OptionFromNullOr(Schema.String)),
Status: Schema.optional(Schema.OptionFromNullOr(Schema.String)),
WorkingSetSize: Schema.optional(Schema.OptionFromNullOr(Schema.Number)),
PercentProcessorTime: Schema.optional(Schema.OptionFromNullOr(Schema.Number)),
});
type WindowsProcessRecord = typeof WindowsProcessRecord.Type;
const decodeWindowsProcessJson = Schema.decodeUnknownOption(WindowsProcessJson);
const decodeWindowsProcessRecord = Schema.decodeUnknownOption(WindowsProcessRecord);

export class ProcessDiagnostics extends Context.Service<
ProcessDiagnostics,
Expand Down Expand Up @@ -201,51 +214,72 @@ export function parsePosixProcessRows(output: string): ReadonlyArray<ProcessRow>
return rows;
}

function normalizeWindowsProcessRow(value: unknown): ProcessRow | null {
if (typeof value !== "object" || value === null) return null;
const record = value as Record<string, unknown>;
const pid = typeof record.ProcessId === "number" ? record.ProcessId : null;
const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null;
const commandLine =
typeof record.CommandLine === "string" && record.CommandLine.trim().length > 0
? record.CommandLine
: typeof record.Name === "string"
? record.Name
: null;
const workingSet =
typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize)
? Math.max(0, Math.round(record.WorkingSetSize))
: 0;
const cpuPercent =
typeof record.PercentProcessorTime === "number" && Number.isFinite(record.PercentProcessorTime)
? Math.max(0, record.PercentProcessorTime)
: 0;

if (!pid || pid <= 0 || ppid === null || ppid < 0 || !commandLine) return null;
return {
pid,
ppid,
function optionFromOptional<T>(value: Option.Option<T> | undefined): Option.Option<T> {
return value ?? Option.none();
}

function nonEmptyString(value: Option.Option<string> | undefined): Option.Option<string> {
return optionFromOptional(value).pipe(Option.filter((text) => text.trim().length > 0));
}

function optionalNumberOrElse(
value: Option.Option<number> | undefined,
onSome: (value: number) => number,
): number {
return Option.match(optionFromOptional(value), {
onNone: () => 0,
onSome,
});
}

function normalizeWindowsProcessRecord(record: WindowsProcessRecord): Option.Option<ProcessRow> {
if (
!Number.isInteger(record.ProcessId) ||
record.ProcessId <= 0 ||
!Number.isInteger(record.ParentProcessId) ||
record.ParentProcessId < 0
) {
return Option.none();
}

const commandLine = nonEmptyString(record.CommandLine).pipe(
Option.orElse(() => nonEmptyString(record.Name)),
);
if (Option.isNone(commandLine)) return Option.none();

return Option.some({
pid: record.ProcessId,
ppid: record.ParentProcessId,
pgid: null,
status: typeof record.Status === "string" && record.Status.length > 0 ? record.Status : "Live",
cpuPercent,
rssBytes: workingSet,
status: Option.getOrElse(nonEmptyString(record.Status), () => "Live"),
cpuPercent: optionalNumberOrElse(record.PercentProcessorTime, (value) => Math.max(0, value)),
rssBytes: optionalNumberOrElse(record.WorkingSetSize, (value) =>
Math.max(0, Math.round(value)),
),
elapsed: "",
command: commandLine,
};
command: commandLine.value,
});
}

function normalizeWindowsProcessRow(value: unknown): Option.Option<ProcessRow> {
return Option.match(decodeWindowsProcessRecord(value), {
onNone: () => Option.none(),
onSome: normalizeWindowsProcessRecord,
});
}

function parseWindowsProcessRows(output: string): ReadonlyArray<ProcessRow> {
if (output.trim().length === 0) return [];
try {
const parsed = JSON.parse(output) as unknown;
const records = Array.isArray(parsed) ? parsed : [parsed];
return records.flatMap((record) => {
const row = normalizeWindowsProcessRow(record);
return row ? [row] : [];
});
} catch {
return [];
}
return Option.match(decodeWindowsProcessJson(output), {
onNone: () => [],
onSome: (parsed) =>
(Array.isArray(parsed) ? parsed : [parsed]).flatMap((record) =>
Option.match(normalizeWindowsProcessRow(record), {
onNone: () => [],
onSome: (row) => [row],
}),
),
});
}

export function buildDescendantEntries(
Expand Down Expand Up @@ -381,7 +415,7 @@ const runProcess = Effect.fn("runProcess")(function* (input: {
} satisfies ProcessOutput;
}).pipe(
Effect.scoped,
Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)),
Effect.timeoutOption(PROCESS_QUERY_TIMEOUT),
Effect.flatMap((result) =>
Option.match(result, {
onNone: () =>
Expand All @@ -390,7 +424,7 @@ const runProcess = Effect.fn("runProcess")(function* (input: {
command: input.command,
argCount: input.args.length,
cwd,
timeoutMillis: PROCESS_QUERY_TIMEOUT_MS,
timeoutMillis: Duration.toMillis(PROCESS_QUERY_TIMEOUT),
}),
),
onSome: Effect.succeed,
Expand Down
Loading