diff --git a/src/__tests__/integration/api/agent.test.ts b/src/__tests__/integration/api/agent.test.ts index 1f407ec1a9..2e86fd2771 100644 --- a/src/__tests__/integration/api/agent.test.ts +++ b/src/__tests__/integration/api/agent.test.ts @@ -12,6 +12,21 @@ import { createTestTask } from "@/__tests__/support/factories/task.factory"; import { createTestWorkspace } from "@/__tests__/support/factories/workspace.factory"; import { db } from "@/lib/db"; +const { mockAfter, pendingAfterCallbacks } = vi.hoisted(() => ({ + pendingAfterCallbacks: [] as Promise[], + mockAfter: vi.fn((callback: () => unknown) => { + pendingAfterCallbacks.push(Promise.resolve().then(callback)); + }), +})); + +vi.mock("next/server", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + after: mockAfter, + }; +}); + // Mock the gooseWeb provider const mockStreamText = vi.fn(); vi.mock("ai-sdk-provider-goose-web", () => ({ @@ -34,6 +49,7 @@ vi.stubGlobal("fetch", mockFetch); describe("POST /api/agent Integration Tests", () => { beforeEach(() => { vi.clearAllMocks(); + pendingAfterCallbacks.length = 0; // Enable agent mode feature flag for all tests process.env.NEXT_PUBLIC_FEATURE_TASK_AGENT_MODE = "true"; @@ -44,7 +60,9 @@ describe("POST /api/agent Integration Tests", () => { }); }); - afterEach(() => { + afterEach(async () => { + await Promise.allSettled(pendingAfterCallbacks); + pendingAfterCallbacks.length = 0; delete process.env.NEXT_PUBLIC_FEATURE_TASK_AGENT_MODE; }); @@ -101,6 +119,9 @@ describe("POST /api/agent Integration Tests", () => { json: () => Promise.resolve({ token: "mock-stream-token" }), }); } + if (url.includes("/stream/")) { + return Promise.resolve(new Response("data: {}\n\n")); + } return Promise.resolve({ ok: false, text: () => Promise.resolve("Not found"), @@ -139,6 +160,8 @@ describe("POST /api/agent Integration Tests", () => { expect(response.status).toBe(200); expect(data.success).toBe(true); + expect(data.backgroundStream).toBe(true); + expect(data.streamToken).toBeUndefined(); const sessionCall = mockFetch.mock.calls.find(([url]: [string]) => url.includes("/session"), @@ -149,6 +172,47 @@ describe("POST /api/agent Integration Tests", () => { expect(sessionBody.agent_name).toBeUndefined(); expect(sessionBody.model).toBe("sonnet"); }); + + test("starts the agent stream in a Next after callback", async () => { + const user = await createTestUser(); + const workspace = await createTestWorkspace({ ownerId: user.id }); + const task = await createTestTask({ + workspaceId: workspace.id, + createdById: user.id, + title: "Background stream task", + }); + + await db.task.update({ + where: { id: task.id }, + data: { mode: "agent", model: "sonnet" }, + }); + + getMockedSession().mockResolvedValue(createAuthenticatedSession(user)); + + const request = createPostRequest("http://localhost/api/agent", { + message: "Keep working if I leave the page", + taskId: task.id, + model: "sonnet", + }); + + const response = await POST(request); + const data = await response.json(); + + expect(response.status).toBe(200); + expect(data.backgroundStream).toBe(true); + expect(mockAfter).toHaveBeenCalledTimes(1); + + await Promise.all(pendingAfterCallbacks); + + const streamCall = mockFetch.mock.calls.find(([url]: [string]) => + url.includes(`/stream/${task.id}`), + ); + expect(streamCall).toBeDefined(); + expect(streamCall?.[0]).toContain("token=mock-stream-token"); + expect(JSON.parse(streamCall?.[1].body)).toEqual({ + prompt: "Keep working if I leave the page", + }); + }); }); // NOTE: Most tests commented out due to significant implementation gaps: diff --git a/src/app/api/agent/route.ts b/src/app/api/agent/route.ts index 23282788d1..df2b7d665b 100644 --- a/src/app/api/agent/route.ts +++ b/src/app/api/agent/route.ts @@ -57,7 +57,7 @@ * - `agentWebhookSecret`: Encrypted per-task secret for JWT signing */ -import { NextRequest, NextResponse } from "next/server"; +import { after, NextRequest, NextResponse } from "next/server"; import { authOptions } from "@/lib/auth/nextauth"; import { getServerSession } from "next-auth/next"; import { db } from "@/lib/db"; @@ -70,6 +70,9 @@ import { claimPodAndGetFrontend, updatePodRepositories, POD_PORTS, releasePodByI const encryptionService = EncryptionService.getInstance(); +export const runtime = "nodejs"; +export const maxDuration = 300; + // ============================================================================ // Types // ============================================================================ @@ -107,6 +110,14 @@ interface ServiceInfo { scripts?: Record; } +interface AgentStreamRequest { + agentUrl: string; + taskId: string; + streamToken: string; + prompt: string; + resume: boolean; +} + // ============================================================================ // Helper Functions // ============================================================================ @@ -400,6 +411,68 @@ async function saveUserMessage(taskId: string, message: string, artifacts: Artif } } +/** + * Start and drain the remote agent stream server-side. + * + * The agent writes durable updates through /api/agent/webhook, so Hive only + * needs to keep the stream request alive. Running this from Next's after() + * decouples the agent run from the browser tab that started it. + */ +async function startAgentStreamInBackground({ + agentUrl, + taskId, + streamToken, + prompt, + resume, +}: AgentStreamRequest): Promise { + const streamUrl = agentUrl.replace(/\/$/, "") + `/stream/${taskId}`; + const headers: Record = { + "Content-Type": "application/json", + }; + + const streamBody: Record = { prompt }; + if (resume) { + streamBody.resume = true; + } + + try { + console.log("[Agent] Starting background stream:", { taskId, resume }); + const streamResponse = await fetch(`${streamUrl}?token=${encodeURIComponent(streamToken)}`, { + method: "POST", + headers, + body: JSON.stringify(streamBody), + }); + + if (!streamResponse.ok) { + const errorText = await streamResponse.text().catch(() => ""); + console.error("[Agent] Background stream failed:", { + taskId, + status: streamResponse.status, + error: errorText || streamResponse.statusText, + }); + return; + } + + if (!streamResponse.body) { + console.log("[Agent] Background stream completed without response body:", { taskId }); + return; + } + + const reader = streamResponse.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + console.log("[Agent] Background stream completed:", { taskId }); + } finally { + reader.releaseLock(); + } + } catch (error) { + console.error("[Agent] Error running background stream:", { taskId, error }); + } +} + // ============================================================================ // Main Handler // ============================================================================ @@ -600,17 +673,24 @@ export async function POST(request: NextRequest) { } await saveUserMessage(taskId, message, allArtifacts); - // 9. Return connection info - const streamUrl = agentCredentials.agentUrl.replace(/\/$/, "") + `/stream/${taskId}`; const isResume = messageCount > 0 && !chatHistoryForPrompt; + const promptWithContext = chatHistoryForPrompt ? `${chatHistoryForPrompt}\n\n${message}` : message; + + after(async () => { + await startAgentStreamInBackground({ + agentUrl: agentCredentials.agentUrl, + taskId, + streamToken, + prompt: promptWithContext, + resume: isResume, + }); + }); return NextResponse.json({ success: true, sessionId: taskId, - streamToken, - streamUrl, + backgroundStream: true, resume: isResume, - ...(chatHistoryForPrompt && { historyContext: chatHistoryForPrompt }), ...(podUrls && { podUrls }), }); } diff --git a/src/app/w/[slug]/task/[...taskParams]/page.tsx b/src/app/w/[slug]/task/[...taskParams]/page.tsx index 826290d51d..df8f5476b4 100644 --- a/src/app/w/[slug]/task/[...taskParams]/page.tsx +++ b/src/app/w/[slug]/task/[...taskParams]/page.tsx @@ -1302,7 +1302,14 @@ export default function TaskChatPage() { throw new Error(errorData.error || `Failed to create session: ${sessionResponse.statusText}`); } - const { streamToken, streamUrl, resume, historyContext, podUrls } = await sessionResponse.json(); + const { + backgroundStream, + streamToken, + streamUrl, + resume, + historyContext, + podUrls, + } = await sessionResponse.json(); // If backend claimed a pod (new task or re-claim), update state and add artifacts if (podUrls) { @@ -1336,6 +1343,11 @@ export default function TaskChatPage() { // Signal that pod is ready (for handleStart to switch views) options?.onPodReady?.(); + if (backgroundStream) { + setIsLoading(false); + return; + } + // 2. Connect directly to remote server for streaming // If historyContext is provided, the session was not found on the pod // so we prepend the chat history to help the agent understand the conversation