Skip to content

Commit 70729e7

Browse files
committed
fix: gateway extracts thread_id from runtime stream, adds message fallback
The gateway now: - Extracts thread_id from the first streamed line (2:[thread_info]) - Returns thread_id in webhook response so callers can fetch full history - Falls back to GET /agent-threads/{id}/messages if stream parsing fails - The runtime handles all execution, threads, RAG — gateway stays thin
1 parent a32f359 commit 70729e7

File tree

1 file changed

+71
-32
lines changed

1 file changed

+71
-32
lines changed

packages/cli/src/cli/gateway/server.ts

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
/**
22
* Thin webhook gateway.
33
*
4-
* Routes incoming platform webhooks to the AgenticFlow runtime API.
5-
* The runtime does ALL the work — execution, threads, tools, RAG.
6-
* This server is just a protocol translator.
4+
* Routes platform webhooks to the AgenticFlow runtime API.
5+
* The runtime does ALL the work. This is just a protocol translator.
6+
*
7+
* Flow:
8+
* 1. Connector parses platform webhook → { afAgentId, message }
9+
* 2. Gateway POSTs to /v1/agents/{id}/stream
10+
* 3. Reads thread_id from first streamed line
11+
* 4. Waits for stream to finish, then GETs /v1/agent-threads/{thread_id}/messages
12+
* 5. Connector posts result back to platform
713
*/
814

915
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
@@ -20,23 +26,16 @@ function log(config: GatewayConfig, ...args: unknown[]) {
2026
if (config.verbose) console.error("[gateway]", ...args);
2127
}
2228

23-
/** Extract text from Vercel AI SDK v1 stream format. */
24-
function extractStreamText(raw: string): string {
25-
const parts: string[] = [];
26-
for (const line of raw.split("\n")) {
27-
if (line.startsWith("0:")) {
28-
try {
29-
const text = JSON.parse(line.slice(2));
30-
if (typeof text === "string") parts.push(text);
31-
} catch { /* skip */ }
32-
}
33-
}
34-
return parts.join("");
29+
function authHeaders(config: GatewayConfig): Record<string, string> {
30+
return {
31+
"Content-Type": "application/json",
32+
...(config.afApiKey ? { Authorization: `Bearer ${config.afApiKey}` } : {}),
33+
};
3534
}
3635

3736
/**
38-
* Call the AgenticFlow runtime streaming endpoint.
39-
* The runtime handles: execution, tools, RAG, thread persistence — everything.
37+
* Call the AF runtime: stream → extract thread_id → wait → fetch messages.
38+
* Returns the assistant's last message text.
4039
*/
4140
async function callRuntime(
4241
config: GatewayConfig,
@@ -46,18 +45,18 @@ async function callRuntime(
4645
const streamUrl = task.afStreamUrl ?? `${baseUrl}/v1/agents/${task.afAgentId}/stream`;
4746

4847
const uuidRe = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
49-
const threadId = task.threadId && uuidRe.test(task.threadId)
48+
const requestThreadId = task.threadId && uuidRe.test(task.threadId)
5049
? task.threadId
5150
: crypto.randomUUID();
5251

52+
// 1. POST to stream endpoint
53+
log(config, `Streaming to ${streamUrl} (thread: ${requestThreadId})`);
54+
5355
const resp = await fetch(streamUrl, {
5456
method: "POST",
55-
headers: {
56-
"Content-Type": "application/json",
57-
...(config.afApiKey ? { Authorization: `Bearer ${config.afApiKey}` } : {}),
58-
},
57+
headers: authHeaders(config),
5958
body: JSON.stringify({
60-
id: threadId,
59+
id: requestThreadId,
6160
messages: [{ role: "user", content: task.message }],
6261
}),
6362
});
@@ -67,12 +66,58 @@ async function callRuntime(
6766
throw new Error(`Runtime ${resp.status}: ${err.slice(0, 300)}`);
6867
}
6968

69+
// 2. Read stream — extract thread_id from first data line, collect text
7070
const raw = await resp.text();
71-
return { text: extractStreamText(raw), threadId };
71+
let threadId = requestThreadId;
72+
const textParts: string[] = [];
73+
74+
for (const line of raw.split("\n")) {
75+
// Thread info: 2:[{"type":"thread_info","data":{"thread_id":"..."}}]
76+
if (line.startsWith("2:")) {
77+
try {
78+
const arr = JSON.parse(line.slice(2)) as Array<{ type: string; data: Record<string, unknown> }>;
79+
const info = arr.find((e) => e.type === "thread_info");
80+
if (info?.data?.thread_id) {
81+
threadId = info.data.thread_id as string;
82+
}
83+
} catch { /* skip */ }
84+
}
85+
// Text tokens: 0:text
86+
if (line.startsWith("0:")) {
87+
try {
88+
const text = JSON.parse(line.slice(2));
89+
if (typeof text === "string") textParts.push(text);
90+
} catch { /* skip */ }
91+
}
92+
}
93+
94+
// 3. If we got text from the stream, use it directly (faster)
95+
if (textParts.length > 0) {
96+
return { text: textParts.join(""), threadId };
97+
}
98+
99+
// 4. Fallback: fetch messages from thread API (works even if stream parsing fails)
100+
log(config, `Fetching messages from thread ${threadId}`);
101+
try {
102+
const msgResp = await fetch(`${baseUrl}/v1/agent-threads/${threadId}/messages`, {
103+
headers: authHeaders(config),
104+
});
105+
if (msgResp.ok) {
106+
const history = (await msgResp.json()) as {
107+
messages: Array<{ role: string; content: string }>;
108+
};
109+
const assistantMsgs = history.messages.filter((m) => m.role === "assistant");
110+
if (assistantMsgs.length > 0) {
111+
return { text: assistantMsgs[assistantMsgs.length - 1].content, threadId };
112+
}
113+
}
114+
} catch { /* thread fetch failed — return empty */ }
115+
116+
return { text: "", threadId };
72117
}
73118

74119
/**
75-
* Create a Web-standard Request → Response handler.
120+
* Web-standard Request → Response handler.
76121
* Deploy to: Vercel, Lambda, Cloudflare Workers, or `af gateway serve`.
77122
*/
78123
export function createGatewayHandler(
@@ -90,7 +135,6 @@ export function createGatewayHandler(
90135
return async (req: Request): Promise<Response> => {
91136
const url = new URL(req.url, "http://localhost");
92137

93-
// Health
94138
if (req.method === "GET" && url.pathname === "/health") {
95139
return json({
96140
status: "ok",
@@ -100,7 +144,6 @@ export function createGatewayHandler(
100144
});
101145
}
102146

103-
// Webhook dispatch
104147
const match = url.pathname.match(/^\/webhook\/([a-z0-9_-]+)/);
105148
if (req.method === "POST" && match) {
106149
const name = match[1];
@@ -114,18 +157,15 @@ export function createGatewayHandler(
114157
const headers: Record<string, string | string[] | undefined> = {};
115158
req.headers.forEach((v, k) => { headers[k] = v; });
116159

117-
// 1. Connector parses platform webhook (thin)
118160
const task = await connector.parseWebhook(headers, body);
119161
if (!task) return json({ status: "skipped", channel: name });
120162

121163
log(config, `[${name}] ${task.label} → agent ${task.afAgentId}`);
122164

123-
// 2. Runtime does all the work
124165
const result = await callRuntime(config, task);
125166

126-
log(config, `[${name}] Response: ${result.text.length} chars`);
167+
log(config, `[${name}] ${result.text.length} chars, thread ${result.threadId}`);
127168

128-
// 3. Connector posts result back to platform (thin)
129169
if (result.text) {
130170
await connector.postResult(task, result.text);
131171
}
@@ -158,7 +198,6 @@ async function readBody(req: IncomingMessage): Promise<string> {
158198
return Buffer.concat(chunks).toString("utf-8");
159199
}
160200

161-
/** Start a long-running gateway server. */
162201
export function startGateway(
163202
config: GatewayConfig,
164203
connectors: ChannelConnector[],

0 commit comments

Comments
 (0)