Skip to content
Open
9 changes: 9 additions & 0 deletions .claude/rules/10-e2e-verification.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ Before marking a feature complete:
- [ ] Any untestable gaps are documented with manual verification steps
- [ ] **Port-of-pattern coverage** — when porting a multi-step pattern (VM boot, credential rotation, agent session lifecycle) from an existing consumer to a new one, the new consumer's tests MUST mock each cross-boundary target and assert **every step of the pattern fired** with the correct payload. A test that asserts "step 1 fired" but not "step 3 fired" does not prove the port is complete. See `docs/notes/2026-04-19-trial-orchestrator-agent-boot-postmortem.md` for the class of bug this prevents.

### Compatibility Constraints In Selection Logic

When selection logic has compatibility constraints such as VM size, provider, region, credential type, workspace profile, or protocol support, tests MUST prove the constraint is enforced as a gate before preference sorting:

- Include at least one incompatible candidate that would otherwise rank well and assert it is rejected.
- Include at least one compatible but non-exact candidate when the product semantics allow substitution, such as a larger VM satisfying a smaller requested size.
- Exercise the production selector or step handler with representative mocked storage/service responses. Helper-only tests and source-contract assertions are not sufficient for this class of behavior.
- If the same selection rule exists in multiple runtime paths, such as an API service path and a Durable Object path, each path needs behavioral coverage or an explicit documented reason it cannot diverge.

## Data Flow Tracing (Mandatory for Multi-Component Features)

Before marking any multi-component feature complete, you MUST trace the primary data path from user input to final output. This trace must cite **specific code paths** (file:function or file:line) at each system boundary.
Expand Down
188 changes: 116 additions & 72 deletions apps/api/src/durable-objects/task-runner/node-steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* plus node selection helper functions (warm pool, capacity finding, health).
*/
import {
canSatisfyVmSize,
DEFAULT_MAX_WORKSPACES_PER_NODE,
DEFAULT_TASK_RUN_NODE_CPU_THRESHOLD_PERCENT,
DEFAULT_TASK_RUN_NODE_MEMORY_THRESHOLD_PERCENT,
Expand All @@ -21,7 +22,7 @@

export async function handleNodeSelection(
state: TaskRunnerState,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<void> {
await rc.updateD1ExecutionStep(state.taskId, 'node_selection');

Expand All @@ -33,12 +34,19 @@
if (state.config.preferredNodeId) {
// Validate the preferred node
const node = await rc.env.DATABASE.prepare(
`SELECT id, status FROM nodes WHERE id = ? AND user_id = ?`
).bind(state.config.preferredNodeId, state.userId).first<{ id: string; status: string }>();
`SELECT id, status, vm_size FROM nodes WHERE id = ? AND user_id = ?`
)
.bind(state.config.preferredNodeId, state.userId)
.first<{ id: string; status: string; vm_size: string }>();

if (!node || node.status !== 'running') {
throw Object.assign(new Error('Specified node is not available'), { permanent: true });
}
if (!canSatisfyVmSize(node.vm_size, state.config.vmSize)) {
throw Object.assign(new Error('Specified node is smaller than the requested VM size'), {
permanent: true,
});
}

// Verify the VM agent is actually reachable before reusing
if (await verifyNodeAgentHealthy(node.id, rc)) {
Expand Down Expand Up @@ -89,15 +97,17 @@

export async function handleNodeProvisioning(
state: TaskRunnerState,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<void> {
await rc.updateD1ExecutionStep(state.taskId, 'node_provisioning');

// If we already created the node (retry scenario), check its status
if (state.stepResults.nodeId) {
const node = await rc.env.DATABASE.prepare(
`SELECT id, status, error_message FROM nodes WHERE id = ?`
).bind(state.stepResults.nodeId).first<{ id: string; status: string; error_message: string | null }>();
)
.bind(state.stepResults.nodeId)
.first<{ id: string; status: string; error_message: string | null }>();

if (node?.status === 'running') {
// Already provisioned — advance
Expand All @@ -108,23 +118,22 @@
throw new Error(node.error_message || 'Node provisioning failed');
}
// Still creating — schedule another poll
await rc.ctx.storage.setAlarm(
Date.now() + rc.getProvisionPollIntervalMs()
);
await rc.ctx.storage.setAlarm(Date.now() + rc.getProvisionPollIntervalMs());
return;
}

// Check user node limit
const maxNodes = parseEnvInt(rc.env.MAX_NODES_PER_USER, 10);
const countResult = await rc.env.DATABASE.prepare(
`SELECT COUNT(*) as c FROM nodes WHERE user_id = ? AND status IN ('running', 'creating', 'recovery')`
).bind(state.userId).first<{ c: number }>();
)
.bind(state.userId)
.first<{ c: number }>();

if ((countResult?.c ?? 0) >= maxNodes) {
throw Object.assign(
new Error(`Maximum ${maxNodes} nodes allowed. Cannot auto-provision.`),
{ permanent: true },
);
throw Object.assign(new Error(`Maximum ${maxNodes} nodes allowed. Cannot auto-provision.`), {
permanent: true,
});
}

// Re-check quota before provisioning (hard gate for platform compute).
Expand All @@ -140,14 +149,14 @@
const credResult = await resolveCredentialSource(
db,
state.userId,
(state.config.cloudProvider as import('@simple-agent-manager/shared').CredentialProvider) ?? undefined,
(state.config.cloudProvider as import('@simple-agent-manager/shared').CredentialProvider) ??

Check warning on line 152 in apps/api/src/durable-objects/task-runner/node-steps.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This assertion is unnecessary since it does not change the type of the expression.

See more on https://sonarcloud.io/project/issues?id=raphaeltm_simple-agent-manager&issues=AZ3k_PxsM5CXM71L5pVC&open=AZ3k_PxsM5CXM71L5pVC&pullRequest=875
undefined
);

if (!credResult) {
throw Object.assign(
new Error('No cloud provider credentials available for provisioning.'),
{ permanent: true },
);
throw Object.assign(new Error('No cloud provider credentials available for provisioning.'), {
permanent: true,
});
}

if (credResult.credentialSource === 'platform') {
Expand All @@ -158,9 +167,9 @@
throw Object.assign(
new Error(
`Monthly compute quota exceeded: ${quotaCheck.used} of ${quotaCheck.limit} vCPU-hours used. ` +
'Add your own cloud provider credentials or contact your admin.'
'Add your own cloud provider credentials or contact your admin.'
),
{ permanent: true },
{ permanent: true }
);
}
}
Expand Down Expand Up @@ -188,7 +197,9 @@
// Store autoProvisionedNodeId on the task
await rc.env.DATABASE.prepare(
`UPDATE tasks SET auto_provisioned_node_id = ?, updated_at = ? WHERE id = ?`
).bind(createdNode.id, new Date().toISOString(), state.taskId).run();
)
.bind(createdNode.id, new Date().toISOString(), state.taskId)
.run();

await rc.ctx.storage.put('state', state);

Expand All @@ -211,7 +222,9 @@
// Verify it's running
const provisionedNode = await rc.env.DATABASE.prepare(
`SELECT status, error_message FROM nodes WHERE id = ?`
).bind(createdNode.id).first<{ status: string; error_message: string | null }>();
)
.bind(createdNode.id)
.first<{ status: string; error_message: string | null }>();

if (!provisionedNode || provisionedNode.status !== 'running') {
throw new Error(provisionedNode?.error_message || 'Node provisioning failed');
Expand All @@ -222,7 +235,7 @@

export async function handleNodeAgentReady(
state: TaskRunnerState,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<void> {
await rc.updateD1ExecutionStep(state.taskId, 'node_agent_ready');

Expand All @@ -240,10 +253,9 @@
const timeoutMs = rc.getAgentReadyTimeoutMs();
const elapsed = Date.now() - state.agentReadyStartedAt;
if (elapsed > timeoutMs) {
throw Object.assign(
new Error(`Node agent not ready within ${timeoutMs}ms`),
{ permanent: true },
);
throw Object.assign(new Error(`Node agent not ready within ${timeoutMs}ms`), {
permanent: true,
});
}

// Check agent health via D1 heartbeat records.
Expand All @@ -259,16 +271,18 @@
// periodically, which update healthStatus and lastHeartbeatAt in D1.
const node = await rc.env.DATABASE.prepare(
`SELECT health_status, last_heartbeat_at, status FROM nodes WHERE id = ?`
).bind(state.stepResults.nodeId).first<{
health_status: string | null;
last_heartbeat_at: string | null;
status: string;
}>();
)
.bind(state.stepResults.nodeId)
.first<{
health_status: string | null;
last_heartbeat_at: string | null;
status: string;
}>();

if (node?.health_status === 'healthy' && node.last_heartbeat_at) {
// Verify the heartbeat is recent (not stale from a previous boot)
const heartbeatTime = new Date(node.last_heartbeat_at).getTime();
const heartbeatIsRecent = heartbeatTime > (state.agentReadyStartedAt! - 30_000);
const heartbeatIsRecent = heartbeatTime > state.agentReadyStartedAt! - 30_000;

Check warning on line 285 in apps/api/src/durable-objects/task-runner/node-steps.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This assertion is unnecessary since it does not change the type of the expression.

See more on https://sonarcloud.io/project/issues?id=raphaeltm_simple-agent-manager&issues=AZ3k_PxsM5CXM71L5pVD&open=AZ3k_PxsM5CXM71L5pVD&pullRequest=875

if (heartbeatIsRecent) {
log.info('task_runner_do.step.node_agent_ready', {
Expand Down Expand Up @@ -307,15 +321,17 @@
*/
export async function verifyNodeAgentHealthy(
nodeId: string,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<boolean> {
try {
const node = await rc.env.DATABASE.prepare(
`SELECT health_status, last_heartbeat_at FROM nodes WHERE id = ?`
).bind(nodeId).first<{
health_status: string | null;
last_heartbeat_at: string | null;
}>();
)
.bind(nodeId)
.first<{
health_status: string | null;
last_heartbeat_at: string | null;
}>();

if (!node || node.health_status !== 'healthy' || !node.last_heartbeat_at) {
return false;
Expand All @@ -332,48 +348,57 @@

async function tryClaimWarmNode(
state: TaskRunnerState,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<string | null> {
if (!rc.env.NODE_LIFECYCLE) return null;

const warmNodes = await rc.env.DATABASE.prepare(
`SELECT id, vm_size, vm_location FROM nodes
WHERE user_id = ? AND status = 'running' AND warm_since IS NOT NULL`
).bind(state.userId).all<{ id: string; vm_size: string; vm_location: string }>();
)
.bind(state.userId)
.all<{ id: string; vm_size: string; vm_location: string }>();

if (!warmNodes.results.length) return null;

// Sort: prefer matching size/location
const sorted = warmNodes.results.sort((a, b) => {
const aSizeMatch = a.vm_size === state.config.vmSize ? 1 : 0;
const bSizeMatch = b.vm_size === state.config.vmSize ? 1 : 0;
if (aSizeMatch !== bSizeMatch) return bSizeMatch - aSizeMatch;
const aLocMatch = a.vm_location === state.config.vmLocation ? 1 : 0;
const bLocMatch = b.vm_location === state.config.vmLocation ? 1 : 0;
return bLocMatch - aLocMatch;
});
// Sort nodes that can satisfy the requested size, preferring exact size/location.
const sorted = warmNodes.results
.filter((node) => canSatisfyVmSize(node.vm_size, state.config.vmSize))
.sort((a, b) => {
const aSizeMatch = a.vm_size === state.config.vmSize ? 1 : 0;
const bSizeMatch = b.vm_size === state.config.vmSize ? 1 : 0;
if (aSizeMatch !== bSizeMatch) return bSizeMatch - aSizeMatch;
const aLocMatch = a.vm_location === state.config.vmLocation ? 1 : 0;
const bLocMatch = b.vm_location === state.config.vmLocation ? 1 : 0;
return bLocMatch - aLocMatch;
});

for (const warmNode of sorted) {
try {
// Re-check freshness
const fresh = await rc.env.DATABASE.prepare(
`SELECT status, warm_since FROM nodes WHERE id = ? AND status = 'running' AND warm_since IS NOT NULL`
).bind(warmNode.id).first<{ status: string; warm_since: string | null }>();
)
.bind(warmNode.id)
.first<{ status: string; warm_since: string | null }>();

if (!fresh) continue;

// Try to claim via NodeLifecycle DO
const doId = rc.env.NODE_LIFECYCLE.idFromName(warmNode.id);
const stub = rc.env.NODE_LIFECYCLE.get(doId) as DurableObjectStub<NodeLifecycle>;
const result = await stub.tryClaim(state.taskId) as { claimed: boolean };
const result = (await stub.tryClaim(state.taskId)) as { claimed: boolean };

if (result.claimed) {
// Defense-in-depth: verify workspace count even for warm nodes
const wsCount = await rc.env.DATABASE.prepare(
`SELECT COUNT(*) as c FROM workspaces WHERE node_id = ? AND status IN ('running', 'creating', 'recovery')`
).bind(warmNode.id).first<{ c: number }>();
const warmMaxWs = state.config.projectScaling?.maxWorkspacesPerNode
?? parseEnvInt(rc.env.MAX_WORKSPACES_PER_NODE, DEFAULT_MAX_WORKSPACES_PER_NODE);
)
.bind(warmNode.id)
.first<{ c: number }>();
const warmMaxWs =
state.config.projectScaling?.maxWorkspacesPerNode ??
parseEnvInt(rc.env.MAX_WORKSPACES_PER_NODE, DEFAULT_MAX_WORKSPACES_PER_NODE);
if ((wsCount?.c ?? 0) >= warmMaxWs) {
continue; // At capacity despite being warm — skip
}
Expand All @@ -393,39 +418,52 @@

async function findNodeWithCapacity(
state: TaskRunnerState,
rc: TaskRunnerContext,
rc: TaskRunnerContext
): Promise<string | null> {
const scaling = state.config.projectScaling;
const cpuThreshold = scaling?.nodeCpuThresholdPercent
?? parseEnvInt(rc.env.TASK_RUN_NODE_CPU_THRESHOLD_PERCENT, DEFAULT_TASK_RUN_NODE_CPU_THRESHOLD_PERCENT);
const memThreshold = scaling?.nodeMemoryThresholdPercent
?? parseEnvInt(rc.env.TASK_RUN_NODE_MEMORY_THRESHOLD_PERCENT, DEFAULT_TASK_RUN_NODE_MEMORY_THRESHOLD_PERCENT);
const maxWorkspaces = scaling?.maxWorkspacesPerNode
?? parseEnvInt(rc.env.MAX_WORKSPACES_PER_NODE, DEFAULT_MAX_WORKSPACES_PER_NODE);
const cpuThreshold =
scaling?.nodeCpuThresholdPercent ??
parseEnvInt(
rc.env.TASK_RUN_NODE_CPU_THRESHOLD_PERCENT,
DEFAULT_TASK_RUN_NODE_CPU_THRESHOLD_PERCENT
);
const memThreshold =
scaling?.nodeMemoryThresholdPercent ??
parseEnvInt(
rc.env.TASK_RUN_NODE_MEMORY_THRESHOLD_PERCENT,
DEFAULT_TASK_RUN_NODE_MEMORY_THRESHOLD_PERCENT
);
const maxWorkspaces =
scaling?.maxWorkspacesPerNode ??
parseEnvInt(rc.env.MAX_WORKSPACES_PER_NODE, DEFAULT_MAX_WORKSPACES_PER_NODE);

const nodes = await rc.env.DATABASE.prepare(
`SELECT id, vm_size, vm_location, health_status, last_metrics FROM nodes
WHERE user_id = ? AND status = 'running' AND health_status != 'unhealthy'`
).bind(state.userId).all<{
id: string;
vm_size: string;
vm_location: string;
health_status: string;
last_metrics: string | null;
}>();
)
.bind(state.userId)
.all<{
id: string;
vm_size: string;
vm_location: string;
health_status: string;
last_metrics: string | null;
}>();

if (!nodes.results.length) return null;

// Batch workspace count query to avoid N+1 D1 round-trips
const nodeIds = nodes.results.map(n => n.id);
const nodeIds = nodes.results.map((n) => n.id);
const placeholders = nodeIds.map(() => '?').join(',');
const wsCounts = await rc.env.DATABASE.prepare(
`SELECT node_id, COUNT(*) as c FROM workspaces
WHERE node_id IN (${placeholders})
AND status IN ('running', 'creating', 'recovery')
GROUP BY node_id`
).bind(...nodeIds).all<{ node_id: string; c: number }>();
const countByNode = new Map((wsCounts.results ?? []).map(r => [r.node_id, r.c]));
)
.bind(...nodeIds)
.all<{ node_id: string; c: number }>();
const countByNode = new Map((wsCounts.results ?? []).map((r) => [r.node_id, r.c]));

type ScoredNode = {
id: string;
Expand All @@ -437,11 +475,17 @@
const candidates: ScoredNode[] = [];

for (const node of nodes.results) {
if (!canSatisfyVmSize(node.vm_size, state.config.vmSize)) continue;

// Hard workspace count limit — reject node regardless of CPU/memory metrics
if ((countByNode.get(node.id) ?? 0) >= maxWorkspaces) continue;
let metrics: { cpuLoadAvg1?: number; memoryPercent?: number } | null = null;
if (node.last_metrics) {
try { metrics = JSON.parse(node.last_metrics); } catch { /* ignore */ }
try {
metrics = JSON.parse(node.last_metrics);
} catch {
/* ignore */
}
}

if (metrics) {
Expand Down
Loading
Loading