diff --git a/app/api/inngest/route.ts b/app/api/inngest/route.ts index 8c6a607..9cf9a9d 100644 --- a/app/api/inngest/route.ts +++ b/app/api/inngest/route.ts @@ -6,11 +6,12 @@ import { runApiTests } from "@/lib/test-functions/api-tests"; import { runPerformanceTests } from "@/lib/test-functions/performance-tests"; import { runVibetest } from "@/lib/test-functions/vibetest-run"; import { runSecurityAgent } from "@/lib/security-agent/functions"; +import { indexDeploymentLogs, cronReindexLogs } from "@/lib/vector-indexer"; // ← add this // Allow Inngest handler up to 5 minutes (max for Vercel hobby plan) export const maxDuration = 300; export const { GET, POST, PUT } = serve({ client: inngest, - functions: [runTestSuite, runSecurityScan, runApiTests, runPerformanceTests, runVibetest, runSecurityAgent], -}); + functions: [runTestSuite, runSecurityScan, runApiTests, runPerformanceTests, runVibetest, runSecurityAgent, indexDeploymentLogs, cronReindexLogs], +}); \ No newline at end of file diff --git a/app/api/logs/search/route.ts b/app/api/logs/search/route.ts new file mode 100644 index 0000000..664c86d --- /dev/null +++ b/app/api/logs/search/route.ts @@ -0,0 +1,160 @@ +/** + * app/api/logs/search/route.ts + * + * GET /api/logs/search?q=[&sandboxId=][&limit=] + * Returns semantically similar log chunks for the authenticated user. + * + * POST /api/logs/search + * Body: { q: string; sandboxId?: string; limit?: number } + * Same as GET but accepts a JSON body (useful for longer queries). + * + * POST /api/logs/search?action=trigger + * Body: { sandboxId: string; ttlDays?: number } + * Manually trigger indexing for a sandbox (fires the Inngest event). + */ + +import { NextRequest, NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { searchLogVectors } from "@/lib/db"; +import { inngest } from "@/lib/inngest"; +import { embedQuery } from "@/lib/vector-indexer"; + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +function sanitiseLimit(raw: string | null | undefined): number { + const n = parseInt(raw ?? "10", 10); + if (isNaN(n) || n < 1) return 10; + if (n > 50) return 50; + return n; +} + +// ── Shared search handler ───────────────────────────────────────────────────── + +async function handleSearch(opts: { + userId: string; + query: string; + sandboxId?: string; + limit: number; +}) { + const { userId, query, sandboxId, limit } = opts; + + if (!query || query.trim().length === 0) { + return NextResponse.json( + { error: "Query parameter 'q' is required" }, + { status: 400 } + ); + } + + // Embed the user's natural-language query using Cohere + const queryEmbedding = await embedQuery(query.trim()); + + // Run the cosine-similarity search + const results = await searchLogVectors({ + userId, + queryEmbedding, + sandboxId, + limit, + }); + + return NextResponse.json({ + query, + sandboxId: sandboxId ?? null, + count: results.length, + results: results.map((r) => ({ + id: r.id, + sandboxId: r.sandboxId, + logIdRange: { start: r.logIdStart, end: r.logIdEnd }, + chunkText: r.chunkText, + createdAt: r.createdAt, + })), + }); +} + +// ── GET /api/logs/search ────────────────────────────────────────────────────── + +export async function GET(req: NextRequest) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { searchParams } = new URL(req.url); + const query = searchParams.get("q") ?? ""; + const sandboxId = searchParams.get("sandboxId") ?? undefined; + const limit = sanitiseLimit(searchParams.get("limit")); + + try { + return await handleSearch({ userId: session.user.id, query, sandboxId, limit }); + } catch (err) { + console.error("[logs/search] GET error:", err); + return NextResponse.json( + { error: "Search failed", detail: String(err) }, + { status: 500 } + ); + } +} + +// ── POST /api/logs/search ───────────────────────────────────────────────────── + +export async function POST(req: NextRequest) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + // Handle manual index trigger via ?action=trigger + const { searchParams } = new URL(req.url); + if (searchParams.get("action") === "trigger") { + return handleTrigger(req, session.user.id); + } + + let body: { q?: string; sandboxId?: string; limit?: number }; + try { + body = await req.json(); + } catch { + return NextResponse.json({ error: "Invalid JSON body" }, { status: 400 }); + } + + const query = body.q ?? ""; + const sandboxId = body.sandboxId ?? undefined; + const limit = sanitiseLimit(String(body.limit ?? 10)); + + try { + return await handleSearch({ userId: session.user.id, query, sandboxId, limit }); + } catch (err) { + console.error("[logs/search] POST error:", err); + return NextResponse.json( + { error: "Search failed", detail: String(err) }, + { status: 500 } + ); + } +} + +// ── Manual index trigger ────────────────────────────────────────────────────── + +async function handleTrigger(req: NextRequest, userId: string) { + let body: { sandboxId?: string; ttlDays?: number }; + try { + body = await req.json(); + } catch { + return NextResponse.json({ error: "Invalid JSON body" }, { status: 400 }); + } + + const { sandboxId, ttlDays = 30 } = body; + if (!sandboxId) { + return NextResponse.json( + { error: "sandboxId is required" }, + { status: 400 } + ); + } + + await inngest.send({ + name: "log/index.requested", + data: { sandboxId, userId, ttlDays }, + }); + + return NextResponse.json({ + ok: true, + message: `Indexing triggered for sandbox ${sandboxId}`, + }); +} \ No newline at end of file diff --git a/lib/db.ts b/lib/db.ts index 4951f2a..bd868c3 100644 --- a/lib/db.ts +++ b/lib/db.ts @@ -259,6 +259,44 @@ export async function ensureTables(): Promise { ON notifications (user_id, created_at DESC) `; + // ── pgvector extension ──────────────────────────────────────────────────── + // Requires pgvector installed on your Neon project (available by default). + await sql`CREATE EXTENSION IF NOT EXISTS vector`; + + // ── Log vector index table ──────────────────────────────────────────────── + // Stores chunked log text + 1536-dim OpenAI embeddings for semantic search. + await sql` + CREATE TABLE IF NOT EXISTS deployment_log_vectors ( + id BIGSERIAL PRIMARY KEY, + sandbox_id TEXT NOT NULL, + user_id TEXT NOT NULL DEFAULT '', + log_id_start BIGINT NOT NULL, + log_id_end BIGINT NOT NULL, + chunk_text TEXT NOT NULL, + embedding vector(1024), + created_at BIGINT NOT NULL, + expires_at BIGINT NOT NULL + ) + `; + + await sql` + CREATE INDEX IF NOT EXISTS idx_log_vectors_sandbox + ON deployment_log_vectors (sandbox_id) + `; + + // HNSW index for fast approximate nearest-neighbour search + await sql` + CREATE INDEX IF NOT EXISTS idx_log_vectors_embedding + ON deployment_log_vectors + USING hnsw (embedding vector_cosine_ops) + `; + + // Index to speed up the retention-pruning query + await sql` + CREATE INDEX IF NOT EXISTS idx_log_vectors_expires + ON deployment_log_vectors (expires_at) + `; + tablesReady = true; } @@ -309,3 +347,125 @@ export async function createNotification(opts: { `; } catch { /* non-blocking */ } } + +// ── pgvector helpers ────────────────────────────────────────────────────────── + +export interface LogVector { + id: number; + sandboxId: string; + userId: string; + logIdStart: number; + logIdEnd: number; + chunkText: string; + createdAt: number; +} + +/** + * Insert a pre-computed embedding chunk into deployment_log_vectors. + * Called by the background indexer — not by API routes directly. + */ +export async function insertLogVector(opts: { + sandboxId: string; + userId: string; + logIdStart: number; + logIdEnd: number; + chunkText: string; + embedding: number[]; + /** TTL in milliseconds from now — defaults to 30 days */ + ttlMs?: number; +}): Promise { + const sql = getDb(); + const expiresAt = Date.now() + (opts.ttlMs ?? 30 * 24 * 60 * 60 * 1000); + await sql` + INSERT INTO deployment_log_vectors + (sandbox_id, user_id, log_id_start, log_id_end, + chunk_text, embedding, created_at, expires_at) + VALUES ( + ${opts.sandboxId}, + ${opts.userId}, + ${opts.logIdStart}, + ${opts.logIdEnd}, + ${opts.chunkText}, + ${JSON.stringify(opts.embedding)}::vector, + ${Date.now()}, + ${expiresAt} + ) + `; +} + +/** + * Semantic nearest-neighbour search over deployment_log_vectors. + * Returns up to `limit` chunks ranked by cosine similarity. + */ +export async function searchLogVectors(opts: { + userId: string; + queryEmbedding: number[]; + sandboxId?: string; + limit?: number; +}): Promise { + const sql = getDb(); + const limit = opts.limit ?? 10; + const vec = JSON.stringify(opts.queryEmbedding); + + let rows; + if (opts.sandboxId) { + rows = await sql` + SELECT id, sandbox_id, user_id, log_id_start, log_id_end, + chunk_text, created_at + FROM deployment_log_vectors + WHERE user_id = ${opts.userId} + AND sandbox_id = ${opts.sandboxId} + AND expires_at > ${Date.now()} + ORDER BY embedding <=> ${vec}::vector + LIMIT ${limit} + `; + } else { + rows = await sql` + SELECT id, sandbox_id, user_id, log_id_start, log_id_end, + chunk_text, created_at + FROM deployment_log_vectors + WHERE user_id = ${opts.userId} + AND expires_at > ${Date.now()} + ORDER BY embedding <=> ${vec}::vector + LIMIT ${limit} + `; + } + + return rows.map((r) => ({ + id: Number(r.id), + sandboxId: r.sandbox_id, + userId: r.user_id, + logIdStart: Number(r.log_id_start), + logIdEnd: Number(r.log_id_end), + chunkText: r.chunk_text, + createdAt: Number(r.created_at), + })); +} + +/** + * Delete expired log vectors (retention policy). + * Run periodically from the Inngest indexer or a cron route. + */ +export async function pruneExpiredLogVectors(): Promise { + const sql = getDb(); + const result = await sql` + DELETE FROM deployment_log_vectors + WHERE expires_at < ${Date.now()} + RETURNING id + `; + return result.length; +} + +/** + * Returns the highest log id already indexed for a given sandbox, + * so the indexer knows where to resume. + */ +export async function getLastIndexedLogId(sandboxId: string): Promise { + const sql = getDb(); + const rows = await sql` + SELECT COALESCE(MAX(log_id_end), 0) AS last_id + FROM deployment_log_vectors + WHERE sandbox_id = ${sandboxId} + `; + return Number(rows[0]?.last_id ?? 0); +} \ No newline at end of file diff --git a/lib/deployer.ts b/lib/deployer.ts index 5ea6287..27b840e 100644 --- a/lib/deployer.ts +++ b/lib/deployer.ts @@ -1,5 +1,6 @@ import { Sandbox } from "e2b"; import { getDb, ensureTables } from "./db"; +import { inngest } from "./inngest"; // Custom template: Node 20 + git + pnpm + serve pre-installed (qg1v6gyvxew6q52r04lp) const E2B_TEMPLATE = process.env.E2B_TEMPLATE ?? "secdev-web-runtime"; @@ -177,7 +178,7 @@ export async function startDeployment( `; // Fire-and-forget background deployment pipeline - runDeploymentPipeline(sandbox, sandboxId, repoName, publicUrl, repoUrl, branch, options?.envVars) + runDeploymentPipeline(sandbox, sandboxId, repoName, publicUrl, repoUrl, branch, userId, options?.envVars) .catch(async (err: unknown) => { const msg = err instanceof Error ? err.message : String(err); const ts = Date.now(); @@ -198,6 +199,7 @@ async function runDeploymentPipeline( publicUrl: string, repoUrl: string, branch: string, + userId: string, envVars?: Record ): Promise { const sql = getDb(); @@ -395,6 +397,12 @@ async function runDeploymentPipeline( await setStatus("live"); log(`🎉 Deployment LIVE → ${publicUrl}`); + + // Trigger background log indexing for semantic search + inngest.send({ + name: "log/index.requested", + data: { sandboxId, userId, ttlDays: 30 }, + }).catch(() => null); // fire-and-forget, never block the deployment } // ── Sandbox management ───────────────────────────────────────────────────────── @@ -443,5 +451,4 @@ export async function refreshSandboxStatus( await sql`UPDATE deployments SET status = 'failed' WHERE sandbox_id = ${sandboxId}`; return "failed"; } -} - +} \ No newline at end of file diff --git a/lib/vector-indexer.ts b/lib/vector-indexer.ts new file mode 100644 index 0000000..e1e89f6 --- /dev/null +++ b/lib/vector-indexer.ts @@ -0,0 +1,302 @@ +/** + * lib/vector-indexer.ts + * + * Inngest background function that: + * 1. Fetches new deployment_logs rows since the last indexed id + * 2. Chunks them into ~40 line windows + * 3. Optionally AI-summarises very long chunks (>1500 chars) via Groq + * 4. Creates Cohere embed-english-v3.0 embeddings (1024-dim, free tier) + * 5. Stores vectors in deployment_log_vectors + * 6. Prunes expired rows (retention policy) + * + * Trigger events + * ────────────── + * "log/index.requested" + * { sandboxId: string; userId: string; ttlDays?: number } + * Fire this after a deployment goes live. + * + * "log/index.cron" + * {} — re-indexes all recently active sandboxes + runs pruning. + * Schedule as a daily cron in the Inngest dashboard: 0 3 * * * + */ + +import { inngest } from "@/lib/inngest"; +import { + getDb, + insertLogVector, + getLastIndexedLogId, + pruneExpiredLogVectors, +} from "@/lib/db"; +import Groq from "groq-sdk"; + +// ── Config ──────────────────────────────────────────────────────────────────── + +/** How many log lines to lump into one chunk before embedding */ +const CHUNK_SIZE_LINES = 40; + +/** If a chunk's text exceeds this length, summarise with Groq before embedding */ +const SUMMARISE_THRESHOLD_CHARS = 1_500; + +/** Default retention: 30 days */ +const DEFAULT_TTL_DAYS = 30; + +/** Max chunks processed per function invocation (rate-limit guard) */ +const MAX_CHUNKS_PER_RUN = 50; + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +function getCohereApiKey(): string { + const key = process.env.COHERE_API_KEY; + if (!key) throw new Error("COHERE_API_KEY env var is not set"); + return key; +} + +function getGroq(): Groq { + const apiKey = process.env.GROQ_API_KEY; + if (!apiKey) throw new Error("GROQ_API_KEY env var is not set"); + return new Groq({ apiKey }); +} + +/** + * Embed an array of texts using Cohere embed-english-v3.0. + * Returns a 1024-dimensional float array per input text. + * Uses the fetch API directly — no extra SDK needed. + */ +async function embedTexts(texts: string[]): Promise { + const response = await fetch("https://api.cohere.com/v1/embed", { + method: "POST", + headers: { + Authorization: `Bearer ${getCohereApiKey()}`, + "Content-Type": "application/json", + "X-Client-Name": "secdev", + }, + body: JSON.stringify({ + model: "embed-english-v3.0", + texts, + input_type: "search_document", // use "search_query" when embedding queries + truncate: "END", + }), + }); + + if (!response.ok) { + const err = await response.text(); + throw new Error(`Cohere embed error ${response.status}: ${err}`); + } + + const data = await response.json(); + return data.embeddings as number[][]; +} + +/** + * Embed a single search query using Cohere. + * Uses input_type: "search_query" for better retrieval quality. + */ +export async function embedQuery(query: string): Promise { + const response = await fetch("https://api.cohere.com/v1/embed", { + method: "POST", + headers: { + Authorization: `Bearer ${getCohereApiKey()}`, + "Content-Type": "application/json", + "X-Client-Name": "secdev", + }, + body: JSON.stringify({ + model: "embed-english-v3.0", + texts: [query.slice(0, 2_000)], + input_type: "search_query", + truncate: "END", + }), + }); + + if (!response.ok) { + const err = await response.text(); + throw new Error(`Cohere embed error ${response.status}: ${err}`); + } + + const data = await response.json(); + return data.embeddings[0] as number[]; +} + +/** + * Summarise a long log chunk with Groq llama-3.1-8b-instant so the + * embedding captures semantics rather than noise. + */ +async function summariseChunk(text: string): Promise { + try { + const groq = getGroq(); + const chat = await groq.chat.completions.create({ + model: "llama-3.1-8b-instant", + max_tokens: 256, + messages: [ + { + role: "system", + content: + "You are a concise log analyst. Summarise the following deployment log chunk in 3–5 sentences, focusing on key events, errors, and status changes.", + }, + { role: "user", content: text }, + ], + }); + return chat.choices[0]?.message?.content?.trim() ?? text; + } catch { + // If summarisation fails, fall back to the raw text + return text; + } +} + +// ── Row type returned by the DB query ───────────────────────────────────────── + +interface LogRow { + id: string | number; + msg: string; + level: string; + ts: string | number; +} + +// ── Core indexing logic ─────────────────────────────────────────────────────── + +async function indexSandbox(opts: { + sandboxId: string; + userId: string; + ttlDays: number; +}): Promise<{ chunksIndexed: number; pruned: number }> { + const { sandboxId, userId, ttlDays } = opts; + const sql = getDb(); + + // 1. Find where we left off + const lastId = await getLastIndexedLogId(sandboxId); + + // 2. Fetch new log lines + const rows = (await sql` + SELECT id, msg, level, ts + FROM deployment_logs + WHERE sandbox_id = ${sandboxId} + AND id > ${lastId} + ORDER BY id ASC + LIMIT ${CHUNK_SIZE_LINES * MAX_CHUNKS_PER_RUN} + `) as LogRow[]; + + if (rows.length === 0) { + return { chunksIndexed: 0, pruned: 0 }; + } + + // 3. Slice into chunks of CHUNK_SIZE_LINES + const chunks: LogRow[][] = []; + for (let i = 0; i < rows.length; i += CHUNK_SIZE_LINES) { + chunks.push(rows.slice(i, i + CHUNK_SIZE_LINES)); + } + + // 4. Build text for each chunk (optionally summarise long ones via Groq) + const chunkTexts: string[] = []; + const rawTexts: string[] = []; + + for (const chunk of chunks) { + const rawText = chunk + .map((r) => `[${r.level.toUpperCase()}] ${r.msg}`) + .join("\n"); + rawTexts.push(rawText); + + const textToEmbed = + rawText.length > SUMMARISE_THRESHOLD_CHARS + ? await summariseChunk(rawText) + : rawText; + chunkTexts.push(textToEmbed); + } + + // 5. Batch embed all chunks in one Cohere API call (more efficient) + const embeddings = await embedTexts(chunkTexts); + + // 6. Persist each chunk + its embedding + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; + await insertLogVector({ + sandboxId, + userId, + logIdStart: Number(chunk[0].id), + logIdEnd: Number(chunk[chunk.length - 1].id), + chunkText: rawTexts[i], // always store raw text + embedding: embeddings[i], + ttlMs: ttlDays * 24 * 60 * 60 * 1_000, + }); + } + + // 7. Prune expired rows + const pruned = await pruneExpiredLogVectors(); + + return { chunksIndexed: chunks.length, pruned }; +} + +// ── Inngest function: on-demand (triggered per deployment) ──────────────────── + +export const indexDeploymentLogs = inngest.createFunction( + { + id: "index-deployment-logs", + name: "Index deployment logs into pgvector", + concurrency: { limit: 5 }, + retries: 2, + }, + { event: "log/index.requested" }, + async ({ event, step }) => { + const { + sandboxId, + userId, + ttlDays = DEFAULT_TTL_DAYS, + } = event.data as { + sandboxId: string; + userId: string; + ttlDays?: number; + }; + + const result = await step.run("embed-and-store", () => + indexSandbox({ sandboxId, userId, ttlDays }) + ); + + return { + sandboxId, + chunksIndexed: result.chunksIndexed, + pruned: result.pruned, + }; + } +); + +// ── Inngest function: scheduled cron re-indexer ─────────────────────────────── + +export const cronReindexLogs = inngest.createFunction( + { + id: "cron-reindex-logs", + name: "Daily log re-index + retention pruning", + retries: 1, + }, + { event: "log/index.cron" }, + async ({ step }) => { + const sql = getDb(); + + // Find all sandboxes with logs updated in the last 7 days + const activeSandboxes = (await step.run("find-active-sandboxes", () => + sql` + SELECT DISTINCT d.sandbox_id, d.user_id + FROM deployments d + JOIN deployment_logs l ON l.sandbox_id = d.sandbox_id + WHERE l.ts > ${Date.now() - 7 * 24 * 60 * 60 * 1_000} + ` + )) as { sandbox_id: string; user_id: string }[]; + + const results = []; + + for (const { sandbox_id, user_id } of activeSandboxes) { + const r = await step.run(`index-${sandbox_id}`, () => + indexSandbox({ + sandboxId: sandbox_id, + userId: user_id, + ttlDays: DEFAULT_TTL_DAYS, + }) + ); + results.push({ sandboxId: sandbox_id, ...r }); + } + + // Final global prune pass + const pruned = await step.run("global-prune", () => + pruneExpiredLogVectors() + ); + + return { sandboxesProcessed: results.length, globalPruned: pruned, results }; + } +); \ No newline at end of file