Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions .cursor/skills/dag-task-runner/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Always use the link text `Open Canvas`. Use the absolute path in both the `file:
Ensure `CURSOR_API_KEY` is set (the runner fails fast if missing), then launch:

```bash
[ -n "$CURSOR_API_KEY" ] || { [ -f .env ] && set -a && source .env && set +a; }
: "${CURSOR_API_KEY:?Set CURSOR_API_KEY in the environment before running the DAG runner.}"

"$RUNNER_DIR/node_modules/.bin/tsx" "$RUNNER_DIR/run_dag.ts" \
--dag /tmp/dag-<slug>.json \
Expand Down Expand Up @@ -179,18 +179,12 @@ Override any subset inline with top-level DAG `models`, or pass a reusable profi

## Auth

The runner reads `CURSOR_API_KEY` from the environment. Set it however you usually manage secrets:
The runner reads `CURSOR_API_KEY` from the environment. Set it with your usual secrets manager or export it explicitly:

```bash
export CURSOR_API_KEY=crsr_...
```

If the current workspace has a `.env` containing it, source that first:

```bash
set -a && source .env && set +a
```

## CLI options

| Flag | Default | Notes |
Expand Down
76 changes: 63 additions & 13 deletions .cursor/skills/dag-task-runner/scripts/run_dag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ interface RunnerTaskRun {
stream: () => AsyncIterable<unknown>;
wait: () => Promise<{
status: string;
result?: string;
durationMs?: number;
usage?: { inputTokens?: number; outputTokens?: number };
}>;
Expand All @@ -63,6 +64,8 @@ interface RunnerTaskRun {
durationMs?: number;
}

type RunnerAgent = Awaited<ReturnType<typeof Agent.create>>;

function parseArgs(argv: string[]): CliArgs {
const args: Record<string, string> = {};
for (let i = 0; i < argv.length; i++) {
Expand Down Expand Up @@ -157,7 +160,7 @@ async function main(): Promise<void> {

if (!args.initOnly && !process.env.CURSOR_API_KEY) {
throw new Error(
"CURSOR_API_KEY is not set. Export it or `set -a && source .env && set +a` first.",
"CURSOR_API_KEY is not set. Export it before launching the runner.",
);
}

Expand Down Expand Up @@ -337,12 +340,8 @@ async function runTask(
? `${upstreamContext}\n\n---\n\n${task.subtask_prompt}`
: task.subtask_prompt;

const agent = await Agent.create({
apiKey: process.env.CURSOR_API_KEY!,
model: { id: ts.model },
local: { cwd },
});

let agent: RunnerAgent | undefined;
let disposeAgentOnCreate = false;
let run: RunnerTaskRun | undefined;
const buffer = new BoundedTextBuffer(STREAM_CAP);
let lastPublishAt = 0;
Expand All @@ -357,7 +356,27 @@ async function runTask(
const deadline = Date.now() + taskTimeoutMs;

try {
run = (await agent.send(stitched)) as RunnerTaskRun;
const agentPromise = Agent.create({
apiKey: process.env.CURSOR_API_KEY!,
model: { id: ts.model },
local: { cwd },
});
void agentPromise.then(
(created) => {
if (disposeAgentOnCreate) void disposeAgent(created);
},
() => {},
);
agent = await withTaskDeadline(
agentPromise,
deadline,
`Task ${task.id} exceeded deadline while creating SDK agent`,
);
run = (await withTaskDeadline(
agent.send(stitched),
deadline,
`Task ${task.id} exceeded deadline while sending prompt`,
)) as RunnerTaskRun;
const iterator = run.stream()[Symbol.asyncIterator]();
while (true) {
const timeoutForNext = Math.min(deadline - Date.now(), streamIdleTimeoutMs);
Expand Down Expand Up @@ -395,6 +414,7 @@ async function runTask(
let result:
| {
status: string;
result?: string;
durationMs?: number;
usage?: { inputTokens?: number; outputTokens?: number };
}
Expand Down Expand Up @@ -432,7 +452,7 @@ async function runTask(
ts.durationMs = result.durationMs ?? ts.finishedAt - (ts.startedAt ?? ts.finishedAt);
ts.inputTokens = result.usage?.inputTokens;
ts.outputTokens = result.usage?.outputTokens;
const rendered = buffer.render().trim();
const rendered = buffer.render().trim() || renderCappedText(result.result);
if (rendered) ts.resultText = rendered;

if (result.status === "finished") {
Expand All @@ -442,6 +462,9 @@ async function runTask(
ts.errorMessage = `Run ${result.status}`;
}
} catch (err) {
if (!agent) {
disposeAgentOnCreate = true;
}
if (run && isTimeoutError(err)) {
await bestEffortCancel(run, task.id);
}
Expand All @@ -456,10 +479,10 @@ async function runTask(
await bestEffortCancel(run, task.id);
}
publishIfDue(true);
try {
await (agent as unknown as AsyncDisposable)[Symbol.asyncDispose]();
} catch {
// ignore dispose errors
if (agent) {
await disposeAgent(agent);
} else {
disposeAgentOnCreate = true;
}
writer.schedule(structuredCloneState(state));
}
Expand Down Expand Up @@ -533,6 +556,26 @@ async function withTimeout<T>(
}
}

async function withTaskDeadline<T>(
promise: Promise<T>,
deadline: number,
timeoutMessage: string,
): Promise<T> {
const timeoutMs = deadline - Date.now();
if (timeoutMs <= 0) {
throw new TimeoutError(timeoutMessage);
}
return withTimeout(promise, timeoutMs, timeoutMessage);
}

async function disposeAgent(agent: RunnerAgent): Promise<void> {
try {
await (agent as unknown as AsyncDisposable)[Symbol.asyncDispose]();
} catch {
// ignore dispose errors
}
}

async function bestEffortCancel(
run: { cancel?: () => Promise<void> | void },
taskId: string,
Expand Down Expand Up @@ -567,6 +610,13 @@ class BoundedTextBuffer {
}
}

function renderCappedText(text: string | undefined): string {
if (!text?.trim()) return "";
const buffer = new BoundedTextBuffer(STREAM_CAP);
buffer.append(text);
return buffer.render().trim();
}

function skipTask(
task: RawTask,
stateById: Map<string, TaskState>,
Expand Down
10 changes: 2 additions & 8 deletions sdk/dag-task-runner/skill/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Always use the link text `Open Canvas`. Use the absolute path in both the `file:
Ensure `CURSOR_API_KEY` is set (the runner fails fast if missing), then launch:

```bash
[ -n "$CURSOR_API_KEY" ] || { [ -f .env ] && set -a && source .env && set +a; }
: "${CURSOR_API_KEY:?Set CURSOR_API_KEY in the environment before running the DAG runner.}"

"$RUNNER_DIR/node_modules/.bin/tsx" "$RUNNER_DIR/run_dag.ts" \
--dag /tmp/dag-<slug>.json \
Expand Down Expand Up @@ -179,18 +179,12 @@ Override any subset inline with top-level DAG `models`, or pass a reusable profi

## Auth

The runner reads `CURSOR_API_KEY` from the environment. Set it however you usually manage secrets:
The runner reads `CURSOR_API_KEY` from the environment. Set it with your usual secrets manager or export it explicitly:

```bash
export CURSOR_API_KEY=crsr_...
```

If the current workspace has a `.env` containing it, source that first:

```bash
set -a && source .env && set +a
```

## CLI options

| Flag | Default | Notes |
Expand Down
76 changes: 63 additions & 13 deletions sdk/dag-task-runner/src/run_dag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ interface RunnerTaskRun {
stream: () => AsyncIterable<unknown>;
wait: () => Promise<{
status: string;
result?: string;
durationMs?: number;
usage?: { inputTokens?: number; outputTokens?: number };
}>;
Expand All @@ -63,6 +64,8 @@ interface RunnerTaskRun {
durationMs?: number;
}

type RunnerAgent = Awaited<ReturnType<typeof Agent.create>>;

function parseArgs(argv: string[]): CliArgs {
const args: Record<string, string> = {};
for (let i = 0; i < argv.length; i++) {
Expand Down Expand Up @@ -157,7 +160,7 @@ async function main(): Promise<void> {

if (!args.initOnly && !process.env.CURSOR_API_KEY) {
throw new Error(
"CURSOR_API_KEY is not set. Export it or `set -a && source .env && set +a` first.",
"CURSOR_API_KEY is not set. Export it before launching the runner.",
);
}

Expand Down Expand Up @@ -337,12 +340,8 @@ async function runTask(
? `${upstreamContext}\n\n---\n\n${task.subtask_prompt}`
: task.subtask_prompt;

const agent = await Agent.create({
apiKey: process.env.CURSOR_API_KEY!,
model: { id: ts.model },
local: { cwd },
});

let agent: RunnerAgent | undefined;
let disposeAgentOnCreate = false;
let run: RunnerTaskRun | undefined;
const buffer = new BoundedTextBuffer(STREAM_CAP);
let lastPublishAt = 0;
Expand All @@ -357,7 +356,27 @@ async function runTask(
const deadline = Date.now() + taskTimeoutMs;

try {
run = (await agent.send(stitched)) as RunnerTaskRun;
const agentPromise = Agent.create({
apiKey: process.env.CURSOR_API_KEY!,
model: { id: ts.model },
local: { cwd },
});
void agentPromise.then(
(created) => {
if (disposeAgentOnCreate) void disposeAgent(created);
},
() => {},
);
agent = await withTaskDeadline(
agentPromise,
deadline,
`Task ${task.id} exceeded deadline while creating SDK agent`,
);
run = (await withTaskDeadline(
agent.send(stitched),
deadline,
`Task ${task.id} exceeded deadline while sending prompt`,
)) as RunnerTaskRun;
const iterator = run.stream()[Symbol.asyncIterator]();
while (true) {
const timeoutForNext = Math.min(deadline - Date.now(), streamIdleTimeoutMs);
Expand Down Expand Up @@ -395,6 +414,7 @@ async function runTask(
let result:
| {
status: string;
result?: string;
durationMs?: number;
usage?: { inputTokens?: number; outputTokens?: number };
}
Expand Down Expand Up @@ -432,7 +452,7 @@ async function runTask(
ts.durationMs = result.durationMs ?? ts.finishedAt - (ts.startedAt ?? ts.finishedAt);
ts.inputTokens = result.usage?.inputTokens;
ts.outputTokens = result.usage?.outputTokens;
const rendered = buffer.render().trim();
const rendered = buffer.render().trim() || renderCappedText(result.result);
if (rendered) ts.resultText = rendered;

if (result.status === "finished") {
Expand All @@ -442,6 +462,9 @@ async function runTask(
ts.errorMessage = `Run ${result.status}`;
}
} catch (err) {
if (!agent) {
disposeAgentOnCreate = true;
}
if (run && isTimeoutError(err)) {
await bestEffortCancel(run, task.id);
}
Expand All @@ -456,10 +479,10 @@ async function runTask(
await bestEffortCancel(run, task.id);
}
publishIfDue(true);
try {
await (agent as unknown as AsyncDisposable)[Symbol.asyncDispose]();
} catch {
// ignore dispose errors
if (agent) {
await disposeAgent(agent);
} else {
disposeAgentOnCreate = true;
}
writer.schedule(structuredCloneState(state));
}
Expand Down Expand Up @@ -533,6 +556,26 @@ async function withTimeout<T>(
}
}

async function withTaskDeadline<T>(
promise: Promise<T>,
deadline: number,
timeoutMessage: string,
): Promise<T> {
const timeoutMs = deadline - Date.now();
if (timeoutMs <= 0) {
throw new TimeoutError(timeoutMessage);
}
return withTimeout(promise, timeoutMs, timeoutMessage);
}

async function disposeAgent(agent: RunnerAgent): Promise<void> {
try {
await (agent as unknown as AsyncDisposable)[Symbol.asyncDispose]();
} catch {
// ignore dispose errors
}
}

async function bestEffortCancel(
run: { cancel?: () => Promise<void> | void },
taskId: string,
Expand Down Expand Up @@ -567,6 +610,13 @@ class BoundedTextBuffer {
}
}

function renderCappedText(text: string | undefined): string {
if (!text?.trim()) return "";
const buffer = new BoundedTextBuffer(STREAM_CAP);
buffer.append(text);
return buffer.render().trim();
}

function skipTask(
task: RawTask,
stateById: Map<string, TaskState>,
Expand Down