diff --git a/pkgs/website/astro.config.mjs b/pkgs/website/astro.config.mjs index 8f85d3bf7..15865e873 100644 --- a/pkgs/website/astro.config.mjs +++ b/pkgs/website/astro.config.mjs @@ -447,6 +447,10 @@ export default defineConfig({ link: '/tutorials/', id: 'tutorials', items: [ + { + label: 'Chatbot with pgflow', + link: '/tutorials/chatbot-pgflow/', + }, { label: 'RAG Pipeline', autogenerate: { diff --git a/pkgs/website/src/content/docs/tutorials/chatbot-pgflow.mdx b/pkgs/website/src/content/docs/tutorials/chatbot-pgflow.mdx new file mode 100644 index 000000000..076353e1c --- /dev/null +++ b/pkgs/website/src/content/docs/tutorials/chatbot-pgflow.mdx @@ -0,0 +1,1157 @@ +--- +title: Advanced Chatbot with pgflow and AI SDK +description: Build a production-ready chatbot using pgflow for context gathering and AI SDK for streaming responses +sidebar: + order: 5 +--- + +import { Steps, Aside, FileTree, CardGrid, LinkCard } from '@astrojs/starlight/components'; + +Build a sophisticated chatbot that uses pgflow to orchestrate parallel context gathering from multiple sources, then streams responses using the AI SDK's `useChat` hook. + +## Architecture Overview + +This tutorial demonstrates a two-phase architecture: + +1. **Context Gathering Phase** (pgflow): Multi-step workflow that gathers context from conversation history, knowledge base, web search, and memory - with real-time progress updates +2. **Response Streaming Phase** (AI SDK): Edge function that loads the prepared context and streams the AI response + +```d2 +direction: right + +browser: Browser { + style.multiple: true +} + +pgflow_client: pgflow Client { + start_run: Start Run + subscribe: Subscribe to Progress + wait: Wait for Completion +} + +pgflow_flow: pgflow Flow { + history: Load History + parallel: Parallel Context Gathering { + semantic: Semantic Search + web: Web Search + memory: Memory Search + } + rerank: Merge & Rerank +} + +edge_chat: Edge Function\n(SSE Chat) { + load_context: Load Run Output + stream: Stream AI Response +} + +browser -> pgflow_client: 1. Start context gathering +pgflow_client -> pgflow_flow: Execute workflow +pgflow_flow -> pgflow_client: Progress updates +pgflow_client -> browser: 2. Show progress UI +pgflow_flow -> edge_chat: 3. Run completed +browser -> edge_chat: 4. Start chat stream (runId) +edge_chat -> browser: 5. Stream response +``` + +## What You'll Build + + + +1. A **multi-step pgflow workflow** that: + - Loads conversation history + - Performs parallel context gathering from 3 sources + - Merges and reranks results for optimal relevance + +2. A **browser client** that: + - Starts the pgflow run + - Shows real-time progress + - Triggers chat streaming after context is ready + +3. An **Edge Function** that: + - Loads prepared context by run ID + - Streams AI responses using AI SDK + + + +## Prerequisites + +| Tool | Version | +|------|---------| +| Node.js | 20+ | +| Supabase CLI | Latest | +| PostgreSQL | 15+ with pgvector | + + + +## Database Schema + +First, set up the database schema for conversations, messages, knowledge base, and memory: + +```sql +-- Enable vector extension +create extension if not exists vector; + +-- Conversations table +create table conversations ( + id bigserial primary key, + user_id text, + created_at timestamptz default now() +); + +-- Messages table with history +create table messages ( + id bigserial primary key, + conversation_id bigint references conversations(id) on delete cascade, + role text not null check (role in ('user', 'assistant', 'system')), + content text not null, + created_at timestamptz default now() +); + +create index messages_conversation_id_idx on messages(conversation_id, created_at desc); + +-- Knowledge base with vector embeddings +create table knowledge_base ( + id bigserial primary key, + title text, + content text not null, + embedding vector(1536), + metadata jsonb default '{}'::jsonb, + created_at timestamptz default now() +); + +create index knowledge_base_embedding_idx on knowledge_base + using ivfflat (embedding vector_cosine_ops) with (lists = 100); + +-- Memory/context table for user-specific information +create table user_memory ( + id bigserial primary key, + user_id text not null, + memory_type text not null, -- 'preference', 'fact', 'context' + content text not null, + embedding vector(1536), + created_at timestamptz default now() +); + +create index user_memory_embedding_idx on user_memory + using ivfflat (embedding vector_cosine_ops) with (lists = 100); +create index user_memory_user_id_idx on user_memory(user_id); + +-- Web search cache (optional, for caching search results) +create table web_search_cache ( + id bigserial primary key, + query text not null, + results jsonb not null, + created_at timestamptz default now() +); + +create index web_search_cache_query_idx on web_search_cache(query); +``` + +Add the vector similarity search functions: + +```sql +-- Match knowledge base documents +create or replace function match_knowledge( + query_embedding vector(1536), + match_threshold float default 0.7, + match_count int default 5 +) +returns table ( + id bigint, + content text, + title text, + similarity float +) +language sql stable +as $$ + select + knowledge_base.id, + knowledge_base.content, + knowledge_base.title, + 1 - (knowledge_base.embedding <=> query_embedding) as similarity + from knowledge_base + where 1 - (knowledge_base.embedding <=> query_embedding) > match_threshold + order by knowledge_base.embedding <=> query_embedding + limit match_count; +$$; + +-- Match user memories +create or replace function match_user_memory( + p_user_id text, + query_embedding vector(1536), + match_threshold float default 0.7, + match_count int default 3 +) +returns table ( + id bigint, + content text, + memory_type text, + similarity float +) +language sql stable +as $$ + select + user_memory.id, + user_memory.content, + user_memory.memory_type, + 1 - (user_memory.embedding <=> query_embedding) as similarity + from user_memory + where user_memory.user_id = p_user_id + and 1 - (user_memory.embedding <=> query_embedding) > match_threshold + order by user_memory.embedding <=> query_embedding + limit match_count; +$$; +``` + +## Project Structure + + +- supabase/ + - functions/ + - _tasks/ + - loadHistory.ts + - formulateSemanticQuery.ts + - semanticSearch.ts + - extractSearchQueries.ts + - webSearch.ts + - formulateMemoryQueries.ts + - searchMemory.ts + - mergeAndRerank.ts + - _flows/ + - chatbotContext.ts + - chatbot-stream/ + - index.ts (Edge function for SSE chat) + - pgflow_worker/ + - index.ts + - migrations/ + - [timestamp]_chatbot_schema.sql + + +## Task Implementations + +### Step 1: Load Conversation History + +```typescript +// supabase/functions/_tasks/loadHistory.ts +import { createClient } from 'jsr:@supabase/supabase-js'; + +export default async function loadHistory(input: { conversationId: number }) { + const supabase = createClient( + Deno.env.get('SUPABASE_URL')!, + Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')! + ); + + const { data, error } = await supabase + .from('messages') + .select('role, content, created_at') + .eq('conversation_id', input.conversationId) + .order('created_at', { ascending: true }) + .limit(20); + + if (error) throw error; + + return data || []; +} +``` + +### Step 2a: Semantic Search (Knowledge Base) + +**Formulate semantic query:** + +```typescript +// supabase/functions/_tasks/formulateSemanticQuery.ts +import { openai } from 'npm:@ai-sdk/openai'; +import { generateText } from 'npm:ai'; + +export default async function formulateSemanticQuery(input: { + history: Array<{ role: string; content: string }>; + message: string; +}) { + const { text } = await generateText({ + model: openai('gpt-4o-mini'), + prompt: `Given this conversation history and the latest message, formulate a concise semantic search query to find relevant knowledge. + +History: +${input.history.slice(-5).map(m => `${m.role}: ${m.content}`).join('\n')} + +Latest message: ${input.message} + +Generate a focused search query (1-2 sentences):`, + }); + + return { query: text.trim() }; +} +``` + +**Execute semantic search:** + +```typescript +// supabase/functions/_tasks/semanticSearch.ts +import { createClient } from 'jsr:@supabase/supabase-js'; +import { openai } from 'npm:@ai-sdk/openai'; +import { embed } from 'npm:ai'; + +export default async function semanticSearch(input: { query: string }) { + const supabase = createClient( + Deno.env.get('SUPABASE_URL')!, + Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')! + ); + + // Generate embedding for the query + const { embedding } = await embed({ + model: openai.embedding('text-embedding-3-small'), + value: input.query, + }); + + // Search knowledge base + const { data, error } = await supabase.rpc('match_knowledge', { + query_embedding: embedding, + match_threshold: 0.7, + match_count: 5, + }); + + if (error) throw error; + + return { + source: 'knowledge_base', + results: data || [], + }; +} +``` + +### Step 2b: Web Search + +**Extract search queries:** + +```typescript +// supabase/functions/_tasks/extractSearchQueries.ts +import { openai } from 'npm:@ai-sdk/openai'; +import { generateObject } from 'npm:ai'; +import { z } from 'npm:zod'; + +export default async function extractSearchQueries(input: { message: string }) { + const { object } = await generateObject({ + model: openai('gpt-4o-mini'), + schema: z.object({ + queries: z.array(z.string()).describe('Search queries to find relevant information'), + }), + prompt: `Extract 1-3 web search queries from this message that would help answer it: + +Message: ${input.message} + +Generate specific, focused search queries.`, + }); + + return { queries: object.queries.slice(0, 2) }; // Limit to 2 queries +} +``` + +**Execute web search:** + +```typescript +// supabase/functions/_tasks/webSearch.ts +import { createClient } from 'jsr:@supabase/supabase-js'; + +export default async function webSearch(input: { queries: string[] }) { + const supabase = createClient( + Deno.env.get('SUPABASE_URL')!, + Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')! + ); + + const results = []; + + for (const query of input.queries) { + // Check cache first + const { data: cached } = await supabase + .from('web_search_cache') + .select('results') + .eq('query', query) + .gte('created_at', new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString()) + .single(); + + if (cached) { + results.push(...cached.results); + continue; + } + + // Perform actual search (example using a hypothetical API) + // In production, use services like Brave Search API, Serper, or Tavily + const searchResults = await performWebSearch(query); + + // Cache results + await supabase + .from('web_search_cache') + .insert({ query, results: searchResults }); + + results.push(...searchResults); + } + + return { + source: 'web_search', + results, + }; +} + +async function performWebSearch(query: string) { + // Example placeholder - integrate with your preferred search API + // For demo purposes, returning mock data + const BRAVE_API_KEY = Deno.env.get('BRAVE_API_KEY'); + + if (!BRAVE_API_KEY) { + return [{ title: 'Search unavailable', snippet: 'Configure BRAVE_API_KEY', url: '#' }]; + } + + const response = await fetch( + `https://api.search.brave.com/res/v1/web/search?q=${encodeURIComponent(query)}`, + { + headers: { + 'Accept': 'application/json', + 'X-Subscription-Token': BRAVE_API_KEY, + }, + } + ); + + const data = await response.json(); + + return (data.web?.results || []).slice(0, 3).map((r: any) => ({ + title: r.title, + snippet: r.description, + url: r.url, + })); +} +``` + +### Step 2c: Memory Search + +**Formulate memory queries:** + +```typescript +// supabase/functions/_tasks/formulateMemoryQueries.ts +import { openai } from 'npm:@ai-sdk/openai'; +import { generateObject } from 'npm:ai'; +import { z } from 'npm:zod'; + +export default async function formulateMemoryQueries(input: { + message: string; + userId: string; +}) { + const { object } = await generateObject({ + model: openai('gpt-4o-mini'), + schema: z.object({ + query: z.string().describe('A query to find relevant user memories and context'), + }), + prompt: `Formulate a query to search user memories and preferences that would be relevant to this message: + +Message: ${input.message} + +Generate a focused memory search query:`, + }); + + return { query: object.query }; +} +``` + +**Search memory:** + +```typescript +// supabase/functions/_tasks/searchMemory.ts +import { createClient } from 'jsr:@supabase/supabase-js'; +import { openai } from 'npm:@ai-sdk/openai'; +import { embed } from 'npm:ai'; + +export default async function searchMemory(input: { + query: string; + userId: string; +}) { + const supabase = createClient( + Deno.env.get('SUPABASE_URL')!, + Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')! + ); + + // Generate embedding for the query + const { embedding } = await embed({ + model: openai.embedding('text-embedding-3-small'), + value: input.query, + }); + + // Search user memories + const { data, error } = await supabase.rpc('match_user_memory', { + p_user_id: input.userId, + query_embedding: embedding, + match_threshold: 0.7, + match_count: 3, + }); + + if (error) throw error; + + return { + source: 'user_memory', + results: data || [], + }; +} +``` + +### Step 3: Merge and Rerank + +```typescript +// supabase/functions/_tasks/mergeAndRerank.ts +import { openai } from 'npm:@ai-sdk/openai'; +import { generateObject } from 'npm:ai'; +import { z } from 'npm:zod'; + +type SourceResult = { + source: string; + results: any[]; +}; + +export default async function mergeAndRerank(input: { + semanticResults: SourceResult; + webResults: SourceResult; + memoryResults: SourceResult; + message: string; +}) { + // Combine all results + const allResults = [ + ...input.semanticResults.results.map((r, i) => ({ + id: `kb_${i}`, + source: 'knowledge_base', + content: r.content || r.snippet, + title: r.title, + score: r.similarity || 0, + })), + ...input.webResults.results.map((r, i) => ({ + id: `web_${i}`, + source: 'web_search', + content: r.snippet, + title: r.title, + url: r.url, + score: 0.8, // Default score for web results + })), + ...input.memoryResults.results.map((r, i) => ({ + id: `mem_${i}`, + source: 'user_memory', + content: r.content, + type: r.memory_type, + score: r.similarity || 0, + })), + ]; + + if (allResults.length === 0) { + return { rankedResults: [] }; + } + + // Use LLM to rerank based on relevance to the query + const { object } = await generateObject({ + model: openai('gpt-4o-mini'), + schema: z.object({ + rankedIds: z.array(z.string()).describe('Result IDs in order of relevance'), + reasoning: z.string().describe('Brief explanation of ranking decisions'), + }), + prompt: `Rerank these search results by relevance to the user's query. + +User Query: ${input.message} + +Results: +${allResults.map(r => `ID: ${r.id} +Source: ${r.source} +Title: ${r.title || 'N/A'} +Content: ${r.content.substring(0, 200)}... +Current Score: ${r.score} +`).join('\n---\n')} + +Return the IDs in order of relevance (most relevant first). Consider: +1. Direct relevance to the query +2. Source reliability (user_memory > knowledge_base > web_search) +3. Content quality and specificity + +Select the top 5-8 most relevant results.`, + }); + + // Reorder based on LLM ranking + const rankedResults = object.rankedIds + .map(id => allResults.find(r => r.id === id)) + .filter(Boolean) + .slice(0, 8); // Top 8 results + + return { + rankedResults, + reasoning: object.reasoning, + }; +} +``` + +## Flow Definition + +Now create the complete flow that orchestrates all steps: + +```typescript +// supabase/functions/_flows/chatbotContext.ts +import { Flow } from 'npm:@pgflow/dsl'; +import loadHistory from '../_tasks/loadHistory.ts'; +import formulateSemanticQuery from '../_tasks/formulateSemanticQuery.ts'; +import semanticSearch from '../_tasks/semanticSearch.ts'; +import extractSearchQueries from '../_tasks/extractSearchQueries.ts'; +import webSearch from '../_tasks/webSearch.ts'; +import formulateMemoryQueries from '../_tasks/formulateMemoryQueries.ts'; +import searchMemory from '../_tasks/searchMemory.ts'; +import mergeAndRerank from '../_tasks/mergeAndRerank.ts'; + +type FlowInput = { + conversationId: number; + message: string; + userId: string; +}; + +export const ChatbotContextFlow = new Flow({ + slug: 'chatbot-context', + maxAttempts: 2, + baseDelay: 1, +}) + // Step 1: Load conversation history + .step({ slug: 'history' }, (flowInput) => + loadHistory({ conversationId: flowInput.conversationId }) + ) + + // Step 2a: Semantic search pipeline + .step({ slug: 'semantic-query', dependsOn: ['history'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return formulateSemanticQuery({ + history: deps.history, + message: flowInput.message, + }); + }) + .step({ slug: 'semantic-search', dependsOn: ['semantic-query'] }, (deps) => + semanticSearch({ query: deps['semantic-query'].query }) + ) + + // Step 2b: Web search pipeline + .step({ slug: 'web-queries' }, (flowInput) => + extractSearchQueries({ message: flowInput.message }) + ) + .step({ slug: 'web-search', dependsOn: ['web-queries'] }, (deps) => + webSearch({ queries: deps['web-queries'].queries }) + ) + + // Step 2c: Memory search pipeline + .step({ slug: 'memory-queries' }, (flowInput) => + formulateMemoryQueries({ + message: flowInput.message, + userId: flowInput.userId, + }) + ) + .step({ slug: 'memory-search', dependsOn: ['memory-queries'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return searchMemory({ + query: deps['memory-queries'].query, + userId: flowInput.userId, + }); + }) + + // Step 3: Merge and rerank all results + .step( + { + slug: 'rerank', + dependsOn: ['semantic-search', 'web-search', 'memory-search'], + }, + async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return mergeAndRerank({ + semanticResults: deps['semantic-search'], + webResults: deps['web-search'], + memoryResults: deps['memory-search'], + message: flowInput.message, + }); + } + ); + +export default ChatbotContextFlow; +``` + + + +## Edge Function for Chat Streaming + +Create an edge function that loads the prepared context and streams the AI response: + +```typescript +// supabase/functions/chatbot-stream/index.ts +import { serve } from 'https://deno.land/std@0.168.0/http/server.ts'; +import { createClient } from 'jsr:@supabase/supabase-js'; +import { openai } from 'npm:@ai-sdk/openai'; +import { streamText } from 'npm:ai'; + +const corsHeaders = { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type', +}; + +serve(async (req) => { + // Handle CORS preflight + if (req.method === 'OPTIONS') { + return new Response('ok', { headers: corsHeaders }); + } + + try { + const { runId, conversationId, message } = await req.json(); + + if (!runId || !conversationId || !message) { + return new Response( + JSON.stringify({ error: 'Missing required fields' }), + { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } + ); + } + + // Initialize Supabase client + const supabase = createClient( + Deno.env.get('SUPABASE_URL')!, + Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')! + ); + + // Load the completed run to get prepared context + const { data: run, error: runError } = await supabase + .from('pgflow.runs') + .select('output, status') + .eq('run_id', runId) + .single(); + + if (runError || !run) { + return new Response( + JSON.stringify({ error: 'Run not found' }), + { status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } + ); + } + + if (run.status !== 'completed') { + return new Response( + JSON.stringify({ error: 'Run not completed yet' }), + { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } + ); + } + + // Extract context from run output + const { rankedResults, reasoning } = run.output.rerank; + const history = run.output.history; + + // Format context for the LLM + const contextText = rankedResults + .map((r: any) => { + const source = r.source === 'knowledge_base' ? '📚 Knowledge' + : r.source === 'web_search' ? '🌐 Web' + : '🧠 Memory'; + return `${source}: ${r.title || r.type || ''}\n${r.content}`; + }) + .join('\n\n---\n\n'); + + // Save user message + await supabase + .from('messages') + .insert({ + conversation_id: conversationId, + role: 'user', + content: message, + }); + + // Stream AI response + const result = await streamText({ + model: openai('gpt-4o'), + messages: [ + { + role: 'system', + content: `You are a helpful AI assistant. Use the provided context to answer questions accurately. + +Context from multiple sources: +${contextText} + +Ranking reasoning: ${reasoning}`, + }, + ...history.map((m: any) => ({ + role: m.role, + content: m.content, + })), + { role: 'user', content: message }, + ], + temperature: 0.7, + maxTokens: 1000, + }); + + // Save assistant response after streaming completes + let fullResponse = ''; + + const stream = result.toTextStreamResponse(); + + // Transform stream to also collect the response + const transformStream = new TransformStream({ + transform(chunk, controller) { + const text = new TextDecoder().decode(chunk); + fullResponse += text; + controller.enqueue(chunk); + }, + flush: async () => { + // Save the complete response + await supabase + .from('messages') + .insert({ + conversation_id: conversationId, + role: 'assistant', + content: fullResponse, + }); + }, + }); + + const responseStream = stream.body!.pipeThrough(transformStream); + + return new Response(responseStream, { + headers: { + ...corsHeaders, + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); + } catch (error) { + console.error('Error in chatbot-stream:', error); + return new Response( + JSON.stringify({ error: error.message }), + { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } + ); + } +}); +``` + +## Browser Client Implementation + +Create a React component that uses pgflow client to gather context, then streams the chat response: + +```tsx +// app/components/ChatbotUI.tsx +'use client'; + +import { useState, useEffect, useRef } from 'react'; +import { PgflowClient } from '@pgflow/client'; +import { createClient } from '@supabase/supabase-js'; +import { useChat } from 'ai/react'; + +const supabase = createClient( + process.env.NEXT_PUBLIC_SUPABASE_URL!, + process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY! +); + +export default function ChatbotUI({ conversationId, userId }: { + conversationId: number; + userId: string; +}) { + const [pgflow] = useState(() => new PgflowClient({ supabase })); + const [contextStatus, setContextStatus] = useState<{ + phase: 'idle' | 'gathering' | 'ready' | 'error'; + progress: Record; + runId?: string; + }>({ + phase: 'idle', + progress: {}, + }); + + const { messages, input, setInput, append, isLoading } = useChat({ + api: '/api/chatbot-stream', + onError: (error) => { + console.error('Chat error:', error); + }, + }); + + const handleSendMessage = async () => { + if (!input.trim()) return; + + const userMessage = input; + setInput(''); + + try { + // Phase 1: Start context gathering with pgflow + setContextStatus({ phase: 'gathering', progress: {} }); + + const run = await pgflow.startFlow('chatbot-context', { + conversationId, + message: userMessage, + userId, + }); + + // Subscribe to progress updates + const stepSlugs = [ + 'history', + 'semantic-query', + 'semantic-search', + 'web-queries', + 'web-search', + 'memory-queries', + 'memory-search', + 'rerank', + ]; + + stepSlugs.forEach((slug) => { + const step = run.step(slug); + + step.on('started', () => { + setContextStatus((prev) => ({ + ...prev, + progress: { ...prev.progress, [slug]: 'started' }, + })); + }); + + step.on('completed', () => { + setContextStatus((prev) => ({ + ...prev, + progress: { ...prev.progress, [slug]: 'completed' }, + })); + }); + + step.on('failed', () => { + setContextStatus((prev) => ({ + ...prev, + progress: { ...prev.progress, [slug]: 'failed' }, + })); + }); + }); + + // Wait for context gathering to complete + await run.waitForStatus('completed', { timeoutMs: 60000 }); + + setContextStatus({ + phase: 'ready', + progress: {}, + runId: run.run_id, + }); + + // Phase 2: Start chat streaming with prepared context + await append({ + role: 'user', + content: userMessage, + data: { + runId: run.run_id, + conversationId, + }, + }); + + // Cleanup + pgflow.dispose(run.run_id); + + setContextStatus({ phase: 'idle', progress: {} }); + } catch (error) { + console.error('Error:', error); + setContextStatus({ phase: 'error', progress: {} }); + } + }; + + return ( +
+ {/* Messages */} +
+ {messages.map((m) => ( +
+
+ {m.content} +
+
+ ))} + + {/* Context gathering progress */} + {contextStatus.phase === 'gathering' && ( +
+

Gathering context...

+
+ {Object.entries(contextStatus.progress).map(([step, status]) => ( +
+ + {status === 'completed' ? '✓' : status === 'started' ? '⋯' : '✗'} + + {step.replace(/-/g, ' ')} +
+ ))} +
+
+ )} + + {isLoading && ( +
+
+ Thinking... +
+
+ )} +
+ + {/* Input */} +
+ setInput(e.target.value)} + onKeyDown={(e) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSendMessage(); + } + }} + placeholder="Ask a question..." + disabled={contextStatus.phase === 'gathering' || isLoading} + className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + +
+ + {/* Status indicator */} + {contextStatus.phase === 'error' && ( +
+ Error gathering context. Please try again. +
+ )} +
+ ); +} +``` + +Update your chat API route to use the runId: + +```typescript +// app/api/chatbot-stream/route.ts +export async function POST(req: Request) { + const { messages, data } = await req.json(); + const { runId, conversationId } = data; + + // Forward to your Supabase edge function + const response = await fetch( + `${process.env.NEXT_PUBLIC_SUPABASE_URL}/functions/v1/chatbot-stream`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY}`, + }, + body: JSON.stringify({ + runId, + conversationId, + message: messages[messages.length - 1].content, + }), + } + ); + + return new Response(response.body, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + }, + }); +} +``` + +## Deploy and Test + + + +1. **Compile the flow** + + ```bash + npx pgflow@latest compile supabase/functions/_flows/chatbotContext.ts + ``` + +2. **Apply migrations** + + ```bash + npx supabase db push + ``` + +3. **Deploy edge functions** + + ```bash + npx supabase functions deploy chatbot-stream + npx supabase functions deploy pgflow_worker + ``` + +4. **Set environment variables** + + ```bash + npx supabase secrets set \ + OPENAI_API_KEY=your_key \ + BRAVE_API_KEY=your_key + ``` + +5. **Test the chatbot** + + Start your application and send a message. You should see: + - Real-time progress as context is gathered from all sources + - Streaming AI response once context is ready + - Messages saved to the database + + + +## How It Works + +The chatbot uses a sophisticated two-phase architecture: + +### Phase 1: Context Gathering (pgflow) + +1. **Load History**: Retrieves the last 20 messages from the conversation +2. **Parallel Processing** (3 simultaneous pipelines): + - **Semantic Search**: Formulates a semantic query → searches knowledge base with pgvector + - **Web Search**: Extracts search queries → performs web searches + - **Memory Search**: Formulates memory queries → searches user-specific memories +3. **Merge & Rerank**: Uses an LLM to intelligently rank all results by relevance + +The browser client subscribes to real-time updates via pgflow client (Supabase Realtime) and shows progress for each step. + +### Phase 2: Response Streaming (AI SDK) + +1. Browser waits for the run to complete +2. Calls the edge function with the `runId` +3. Edge function loads the prepared context from the run output +4. Streams the AI response using AI SDK's `streamText` +5. Response is displayed in real-time using `useChat` + + + +## Next Steps + + + + + + +