11/**
22 * Thin webhook gateway.
33 *
4- * Routes incoming platform webhooks to the AgenticFlow runtime API.
5- * The runtime does ALL the work — execution, threads, tools, RAG.
6- * This server is just a protocol translator.
4+ * Routes platform webhooks to the AgenticFlow runtime API.
5+ * The runtime does ALL the work. This is just a protocol translator.
6+ *
7+ * Flow:
8+ * 1. Connector parses platform webhook → { afAgentId, message }
9+ * 2. Gateway POSTs to /v1/agents/{id}/stream
10+ * 3. Reads thread_id from first streamed line
11+ * 4. Waits for stream to finish, then GETs /v1/agent-threads/{thread_id}/messages
12+ * 5. Connector posts result back to platform
713 */
814
915import { createServer , type IncomingMessage , type ServerResponse } from "node:http" ;
@@ -20,23 +26,16 @@ function log(config: GatewayConfig, ...args: unknown[]) {
2026 if ( config . verbose ) console . error ( "[gateway]" , ...args ) ;
2127}
2228
23- /** Extract text from Vercel AI SDK v1 stream format. */
24- function extractStreamText ( raw : string ) : string {
25- const parts : string [ ] = [ ] ;
26- for ( const line of raw . split ( "\n" ) ) {
27- if ( line . startsWith ( "0:" ) ) {
28- try {
29- const text = JSON . parse ( line . slice ( 2 ) ) ;
30- if ( typeof text === "string" ) parts . push ( text ) ;
31- } catch { /* skip */ }
32- }
33- }
34- return parts . join ( "" ) ;
29+ function authHeaders ( config : GatewayConfig ) : Record < string , string > {
30+ return {
31+ "Content-Type" : "application/json" ,
32+ ...( config . afApiKey ? { Authorization : `Bearer ${ config . afApiKey } ` } : { } ) ,
33+ } ;
3534}
3635
3736/**
38- * Call the AgenticFlow runtime streaming endpoint .
39- * The runtime handles: execution, tools, RAG, thread persistence — everything .
37+ * Call the AF runtime: stream → extract thread_id → wait → fetch messages .
38+ * Returns the assistant's last message text .
4039 */
4140async function callRuntime (
4241 config : GatewayConfig ,
@@ -46,18 +45,18 @@ async function callRuntime(
4645 const streamUrl = task . afStreamUrl ?? `${ baseUrl } /v1/agents/${ task . afAgentId } /stream` ;
4746
4847 const uuidRe = / ^ [ 0 - 9 a - f ] { 8 } - [ 0 - 9 a - f ] { 4 } - [ 0 - 9 a - f ] { 4 } - [ 0 - 9 a - f ] { 4 } - [ 0 - 9 a - f ] { 12 } $ / i;
49- const threadId = task . threadId && uuidRe . test ( task . threadId )
48+ const requestThreadId = task . threadId && uuidRe . test ( task . threadId )
5049 ? task . threadId
5150 : crypto . randomUUID ( ) ;
5251
52+ // 1. POST to stream endpoint
53+ log ( config , `Streaming to ${ streamUrl } (thread: ${ requestThreadId } )` ) ;
54+
5355 const resp = await fetch ( streamUrl , {
5456 method : "POST" ,
55- headers : {
56- "Content-Type" : "application/json" ,
57- ...( config . afApiKey ? { Authorization : `Bearer ${ config . afApiKey } ` } : { } ) ,
58- } ,
57+ headers : authHeaders ( config ) ,
5958 body : JSON . stringify ( {
60- id : threadId ,
59+ id : requestThreadId ,
6160 messages : [ { role : "user" , content : task . message } ] ,
6261 } ) ,
6362 } ) ;
@@ -67,12 +66,58 @@ async function callRuntime(
6766 throw new Error ( `Runtime ${ resp . status } : ${ err . slice ( 0 , 300 ) } ` ) ;
6867 }
6968
69+ // 2. Read stream — extract thread_id from first data line, collect text
7070 const raw = await resp . text ( ) ;
71- return { text : extractStreamText ( raw ) , threadId } ;
71+ let threadId = requestThreadId ;
72+ const textParts : string [ ] = [ ] ;
73+
74+ for ( const line of raw . split ( "\n" ) ) {
75+ // Thread info: 2:[{"type":"thread_info","data":{"thread_id":"..."}}]
76+ if ( line . startsWith ( "2:" ) ) {
77+ try {
78+ const arr = JSON . parse ( line . slice ( 2 ) ) as Array < { type : string ; data : Record < string , unknown > } > ;
79+ const info = arr . find ( ( e ) => e . type === "thread_info" ) ;
80+ if ( info ?. data ?. thread_id ) {
81+ threadId = info . data . thread_id as string ;
82+ }
83+ } catch { /* skip */ }
84+ }
85+ // Text tokens: 0:text
86+ if ( line . startsWith ( "0:" ) ) {
87+ try {
88+ const text = JSON . parse ( line . slice ( 2 ) ) ;
89+ if ( typeof text === "string" ) textParts . push ( text ) ;
90+ } catch { /* skip */ }
91+ }
92+ }
93+
94+ // 3. If we got text from the stream, use it directly (faster)
95+ if ( textParts . length > 0 ) {
96+ return { text : textParts . join ( "" ) , threadId } ;
97+ }
98+
99+ // 4. Fallback: fetch messages from thread API (works even if stream parsing fails)
100+ log ( config , `Fetching messages from thread ${ threadId } ` ) ;
101+ try {
102+ const msgResp = await fetch ( `${ baseUrl } /v1/agent-threads/${ threadId } /messages` , {
103+ headers : authHeaders ( config ) ,
104+ } ) ;
105+ if ( msgResp . ok ) {
106+ const history = ( await msgResp . json ( ) ) as {
107+ messages : Array < { role : string ; content : string } > ;
108+ } ;
109+ const assistantMsgs = history . messages . filter ( ( m ) => m . role === "assistant" ) ;
110+ if ( assistantMsgs . length > 0 ) {
111+ return { text : assistantMsgs [ assistantMsgs . length - 1 ] . content , threadId } ;
112+ }
113+ }
114+ } catch { /* thread fetch failed — return empty */ }
115+
116+ return { text : "" , threadId } ;
72117}
73118
74119/**
75- * Create a Web-standard Request → Response handler.
120+ * Web-standard Request → Response handler.
76121 * Deploy to: Vercel, Lambda, Cloudflare Workers, or `af gateway serve`.
77122 */
78123export function createGatewayHandler (
@@ -90,7 +135,6 @@ export function createGatewayHandler(
90135 return async ( req : Request ) : Promise < Response > => {
91136 const url = new URL ( req . url , "http://localhost" ) ;
92137
93- // Health
94138 if ( req . method === "GET" && url . pathname === "/health" ) {
95139 return json ( {
96140 status : "ok" ,
@@ -100,7 +144,6 @@ export function createGatewayHandler(
100144 } ) ;
101145 }
102146
103- // Webhook dispatch
104147 const match = url . pathname . match ( / ^ \/ w e b h o o k \/ ( [ a - z 0 - 9 _ - ] + ) / ) ;
105148 if ( req . method === "POST" && match ) {
106149 const name = match [ 1 ] ;
@@ -114,18 +157,15 @@ export function createGatewayHandler(
114157 const headers : Record < string , string | string [ ] | undefined > = { } ;
115158 req . headers . forEach ( ( v , k ) => { headers [ k ] = v ; } ) ;
116159
117- // 1. Connector parses platform webhook (thin)
118160 const task = await connector . parseWebhook ( headers , body ) ;
119161 if ( ! task ) return json ( { status : "skipped" , channel : name } ) ;
120162
121163 log ( config , `[${ name } ] ${ task . label } → agent ${ task . afAgentId } ` ) ;
122164
123- // 2. Runtime does all the work
124165 const result = await callRuntime ( config , task ) ;
125166
126- log ( config , `[${ name } ] Response: ${ result . text . length } chars` ) ;
167+ log ( config , `[${ name } ] ${ result . text . length } chars, thread ${ result . threadId } ` ) ;
127168
128- // 3. Connector posts result back to platform (thin)
129169 if ( result . text ) {
130170 await connector . postResult ( task , result . text ) ;
131171 }
@@ -158,7 +198,6 @@ async function readBody(req: IncomingMessage): Promise<string> {
158198 return Buffer . concat ( chunks ) . toString ( "utf-8" ) ;
159199}
160200
161- /** Start a long-running gateway server. */
162201export function startGateway (
163202 config : GatewayConfig ,
164203 connectors : ChannelConnector [ ] ,
0 commit comments