Skip to content

Commit 5196fb9

Browse files
committed
feat(sdk): add agents.run() and AgentStream.threadId
SDK: - AgentStream now extracts threadId and userMessageId from the thread_info data event during streaming - New agents.run(agentId, { message, threadId?, timeoutMs? }) method: sends a task, waits for result, returns { response, threadId, status } - Internally streams to collect text + threadId, falls back to polling GET /agent-threads/{id}/messages if stream returns empty - Export AgentRunResult, AgentRunOptions types CLI: - `af agent run` now delegates to sdk agents.run() (was 80 lines inline, now 20 lines) - Returns structured JSON: { schema, status, agent_id, thread_id, response } Usage (AI agent calling via Bash): af agent run --agent-id <id> --message "Analyze X" --json → {"status":"completed","thread_id":"...","response":"..."}
1 parent 0847324 commit 5196fb9

File tree

5 files changed

+143
-79
lines changed

5 files changed

+143
-79
lines changed

packages/cli/src/cli/main.ts

Lines changed: 23 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3623,90 +3623,35 @@ export function createProgram(): Command {
36233623
.option("--poll-interval <seconds>", "Seconds between status checks", "2")
36243624
.action(async (opts) => {
36253625
const client = buildClient(program.opts());
3626-
const timeoutMs = Number.parseInt(opts.timeout, 10) * 1000;
3627-
const pollMs = Number.parseInt(opts.pollInterval, 10) * 1000;
3628-
const threadId = opts.threadId ?? crypto.randomUUID();
36293626

3630-
try {
3631-
// 1. Fire: POST to stream, read thread_id from first line, don't block
3632-
if (!isJsonFlagEnabled()) {
3633-
console.error(`Sending task to agent ${opts.agentId}...`);
3634-
}
3635-
3636-
const streamBody = {
3637-
messages: [{ role: "user" as const, content: opts.message }],
3638-
};
3639-
3640-
const token = resolveToken(program.opts());
3641-
const stream = token
3642-
? await client.agents.stream(opts.agentId, streamBody)
3643-
: await client.agents.streamAnonymous(opts.agentId, streamBody);
3644-
3645-
// Consume stream to get thread info and full text
3646-
const text = await stream.text();
3647-
3648-
// Try to get thread_id from stream metadata
3649-
let resolvedThreadId = threadId;
3650-
try {
3651-
const rawParts = (stream as unknown as { _raw?: string })._raw;
3652-
if (typeof rawParts === "string") {
3653-
for (const line of rawParts.split("\n")) {
3654-
if (line.startsWith("2:")) {
3655-
const arr = JSON.parse(line.slice(2)) as Array<{ type: string; data: Record<string, unknown> }>;
3656-
const info = arr.find((e) => e.type === "thread_info");
3657-
if (info?.data?.thread_id) {
3658-
resolvedThreadId = info.data.thread_id as string;
3659-
}
3660-
}
3661-
}
3662-
}
3663-
} catch { /* ignore — use default threadId */ }
3627+
if (!isJsonFlagEnabled()) {
3628+
console.error(`Sending task to agent ${opts.agentId}...`);
3629+
}
36643630

3665-
// 2. If we got text from stream, return it immediately
3666-
if (text && text.trim()) {
3667-
printResult({
3668-
schema: "agenticflow.agent.run.v1",
3669-
status: "completed",
3670-
agent_id: opts.agentId,
3671-
thread_id: resolvedThreadId,
3672-
response: text,
3673-
});
3674-
return;
3675-
}
3631+
try {
3632+
const result = await client.agents.run(opts.agentId, {
3633+
message: opts.message,
3634+
threadId: opts.threadId,
3635+
timeoutMs: Number.parseInt(opts.timeout, 10) * 1000,
3636+
pollIntervalMs: Number.parseInt(opts.pollInterval, 10) * 1000,
3637+
});
36763638

3677-
// 3. Fallback: poll thread status then fetch messages
3678-
if (!isJsonFlagEnabled()) {
3679-
console.error(`Waiting for response (thread: ${resolvedThreadId})...`);
3639+
if (result.status === "failed") {
3640+
fail("agent_run_failed", `Agent run failed (thread: ${result.threadId})`);
36803641
}
3681-
3682-
const start = Date.now();
3683-
while (Date.now() - start < timeoutMs) {
3684-
try {
3685-
const thread = (await client.agentThreads.get(resolvedThreadId)) as Record<string, unknown>;
3686-
const status = thread.status as string;
3687-
if (status === "processed" || status === "idle") {
3688-
const history = (await client.agentThreads.getMessages(resolvedThreadId)) as {
3689-
messages: Array<{ role: string; content: string }>;
3690-
};
3691-
const msgs = history.messages?.filter((m) => m.role === "assistant") ?? [];
3692-
const lastMsg = msgs.length > 0 ? msgs[msgs.length - 1].content : "";
3693-
printResult({
3694-
schema: "agenticflow.agent.run.v1",
3695-
status: "completed",
3696-
agent_id: opts.agentId,
3697-
thread_id: resolvedThreadId,
3698-
response: lastMsg,
3699-
});
3700-
return;
3701-
}
3702-
if (status === "failed") {
3703-
fail("agent_run_failed", `Agent run failed (thread: ${resolvedThreadId})`);
3704-
}
3705-
} catch { /* thread not ready yet */ }
3706-
await new Promise((r) => setTimeout(r, pollMs));
3642+
if (result.status === "timeout") {
3643+
fail("agent_run_timeout",
3644+
`Agent did not respond within ${opts.timeout}s`,
3645+
`Thread: ${result.threadId}. Check with: af agent-threads messages --thread-id ${result.threadId}`);
37073646
}
37083647

3709-
fail("agent_run_timeout", `Agent did not respond within ${opts.timeout}s`, `Thread: ${resolvedThreadId}. Check with: af agent-threads messages --thread-id ${resolvedThreadId}`);
3648+
printResult({
3649+
schema: "agenticflow.agent.run.v1",
3650+
status: result.status,
3651+
agent_id: opts.agentId,
3652+
thread_id: result.threadId,
3653+
response: result.response,
3654+
});
37103655
} catch (err) {
37113656
const message = err instanceof Error ? err.message : String(err);
37123657
fail("agent_run_failed", message);

packages/sdk/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ export {
2323
} from "./exceptions.js";
2424
export {
2525
AgentsResource,
26+
type AgentRunResult,
27+
type AgentRunOptions,
2628
WorkflowsResource,
2729
ConnectionsResource,
2830
NodeTypesResource,

packages/sdk/src/resources/agents.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,28 @@
11
import type { AgenticFlowSDK } from "../core.js";
22
import { AgentStream, type StreamRequest } from "../streaming.js";
33

4+
/** Result of a non-streaming agent run. */
5+
export interface AgentRunResult {
6+
/** Agent response text. */
7+
response: string;
8+
/** Thread ID for conversation continuity. */
9+
threadId: string;
10+
/** "completed" | "timeout" | "failed" */
11+
status: string;
12+
}
13+
14+
/** Options for `agents.run()`. */
15+
export interface AgentRunOptions {
16+
/** Message to send. */
17+
message: string;
18+
/** Thread ID to continue a conversation. Auto-generated if omitted. */
19+
threadId?: string;
20+
/** Max milliseconds to wait. Default 120000 (2 min). */
21+
timeoutMs?: number;
22+
/** Milliseconds between poll attempts. Default 2000. */
23+
pollIntervalMs?: number;
24+
}
25+
426
export class AgentsResource {
527
constructor(private client: AgenticFlowSDK) { }
628

@@ -63,6 +85,70 @@ export class AgentsResource {
6385
return new AgentStream(response);
6486
}
6587

88+
// ── Run (non-streaming, fire → collect → return) ───────────────────
89+
90+
/**
91+
* Send a message to an agent and return the full response.
92+
* Non-streaming — blocks until the agent finishes, then returns text.
93+
*
94+
* Ideal for AI agents calling the SDK programmatically:
95+
* ```ts
96+
* const result = await client.agents.run("agent-id", { message: "Analyze this" });
97+
* console.log(result.response); // agent's answer
98+
* console.log(result.threadId); // for follow-up
99+
* ```
100+
*
101+
* Internally: streams to get thread_id + text. If stream returns empty,
102+
* falls back to polling GET /agent-threads/{id}/messages.
103+
*/
104+
async run(agentId: string, options: AgentRunOptions): Promise<AgentRunResult> {
105+
const threadId = options.threadId ?? crypto.randomUUID();
106+
const timeoutMs = options.timeoutMs ?? 120_000;
107+
const pollIntervalMs = options.pollIntervalMs ?? 2_000;
108+
109+
const streamReq: StreamRequest = {
110+
id: threadId,
111+
messages: [{ role: "user", content: options.message }],
112+
};
113+
114+
// 1. Stream to get response text + thread_id
115+
const stream = await this.stream(agentId, streamReq);
116+
const text = await stream.text();
117+
const resolvedThreadId = stream.threadId ?? threadId;
118+
119+
// 2. If we got text, return immediately
120+
if (text && text.trim()) {
121+
return { response: text, threadId: resolvedThreadId, status: "completed" };
122+
}
123+
124+
// 3. Fallback: poll thread until processed, then fetch messages
125+
const start = Date.now();
126+
while (Date.now() - start < timeoutMs) {
127+
try {
128+
const thread = (await this.client.get(`/v1/agent-threads/${resolvedThreadId}`)).data as Record<string, unknown> | null;
129+
const status = thread?.status as string | undefined;
130+
131+
if (status === "processed" || status === "idle") {
132+
const history = (await this.client.get(`/v1/agent-threads/${resolvedThreadId}/messages`)).data as {
133+
messages?: Array<{ role: string; content: string }>;
134+
};
135+
const assistantMsgs = history.messages?.filter((m) => m.role === "assistant") ?? [];
136+
const lastMsg = assistantMsgs.length > 0 ? assistantMsgs[assistantMsgs.length - 1].content : "";
137+
return { response: lastMsg, threadId: resolvedThreadId, status: "completed" };
138+
}
139+
140+
if (status === "failed") {
141+
return { response: "", threadId: resolvedThreadId, status: "failed" };
142+
}
143+
} catch {
144+
// Thread not ready yet
145+
}
146+
await new Promise((r) => setTimeout(r, pollIntervalMs));
147+
}
148+
149+
return { response: "", threadId: resolvedThreadId, status: "timeout" };
150+
}
151+
66152
// ── Upload File (authenticated) ────────────────────────────────────
67153
async uploadFile(agentId: string, payload: unknown): Promise<unknown> {
68154
return (await this.client.post(`/v1/agents/${agentId}/upload-file`, { json: payload })).data;

packages/sdk/src/resources/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export { AgentsResource } from "./agents.js";
1+
export { AgentsResource, type AgentRunResult, type AgentRunOptions } from "./agents.js";
22
export { WorkflowsResource } from "./workflows.js";
33
export { ConnectionsResource } from "./connections.js";
44
export { NodeTypesResource } from "./node-types.js";

packages/sdk/src/streaming.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ export class AgentStream implements AsyncIterable<StreamPart> {
122122
private _textChunks: string[] = [];
123123
private _processingPromise: Promise<void> | null = null;
124124

125+
/** Thread ID extracted from the first `data` event (type: thread_info). */
126+
threadId: string | null = null;
127+
128+
/** User message ID extracted from the first `data` event. */
129+
userMessageId: string | null = null;
130+
125131
constructor(response: Response) {
126132
this.response = response;
127133
}
@@ -160,6 +166,27 @@ export class AgentStream implements AsyncIterable<StreamPart> {
160166
}
161167
}
162168

169+
/** Extract thread_id and user_message_id from data events. */
170+
private _extractMeta(part: StreamPart): void {
171+
if (part.type === "data" && Array.isArray(part.value)) {
172+
for (const item of part.value) {
173+
if (
174+
item &&
175+
typeof item === "object" &&
176+
(item as Record<string, unknown>).type === "thread_info"
177+
) {
178+
const data = (item as Record<string, unknown>).data as Record<string, unknown> | undefined;
179+
if (data?.thread_id && !this.threadId) {
180+
this.threadId = data.thread_id as string;
181+
}
182+
if (data?.user_message_id && !this.userMessageId) {
183+
this.userMessageId = data.user_message_id as string;
184+
}
185+
}
186+
}
187+
}
188+
}
189+
163190
/**
164191
* Process the stream, dispatching events to listeners.
165192
* Automatically called by text(), parts(), or async iteration.
@@ -201,6 +228,7 @@ export class AgentStream implements AsyncIterable<StreamPart> {
201228
if (!part) continue;
202229

203230
this._parts.push(part);
231+
this._extractMeta(part);
204232

205233
if (part.type === "textDelta") {
206234
this._textChunks.push(part.value as string);
@@ -215,6 +243,7 @@ export class AgentStream implements AsyncIterable<StreamPart> {
215243
const part = parseStreamLine(buffer);
216244
if (part) {
217245
this._parts.push(part);
246+
this._extractMeta(part);
218247
if (part.type === "textDelta") {
219248
this._textChunks.push(part.value as string);
220249
}
@@ -272,6 +301,7 @@ export class AgentStream implements AsyncIterable<StreamPart> {
272301
if (!part) continue;
273302

274303
this._parts.push(part);
304+
this._extractMeta(part);
275305
if (part.type === "textDelta") {
276306
this._textChunks.push(part.value as string);
277307
}
@@ -285,6 +315,7 @@ export class AgentStream implements AsyncIterable<StreamPart> {
285315
const part = parseStreamLine(buffer);
286316
if (part) {
287317
this._parts.push(part);
318+
this._extractMeta(part);
288319
if (part.type === "textDelta") {
289320
this._textChunks.push(part.value as string);
290321
}

0 commit comments

Comments
 (0)