Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion src/__tests__/integration/api/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ import { createTestTask } from "@/__tests__/support/factories/task.factory";
import { createTestWorkspace } from "@/__tests__/support/factories/workspace.factory";
import { db } from "@/lib/db";

const { mockAfter, pendingAfterCallbacks } = vi.hoisted(() => ({
pendingAfterCallbacks: [] as Promise<unknown>[],
mockAfter: vi.fn((callback: () => unknown) => {
pendingAfterCallbacks.push(Promise.resolve().then(callback));
}),
}));

vi.mock("next/server", async (importOriginal) => {
const actual = await importOriginal<typeof import("next/server")>();
return {
...actual,
after: mockAfter,
};
});

// Mock the gooseWeb provider
const mockStreamText = vi.fn();
vi.mock("ai-sdk-provider-goose-web", () => ({
Expand All @@ -34,6 +49,7 @@ vi.stubGlobal("fetch", mockFetch);
describe("POST /api/agent Integration Tests", () => {
beforeEach(() => {
vi.clearAllMocks();
pendingAfterCallbacks.length = 0;

// Enable agent mode feature flag for all tests
process.env.NEXT_PUBLIC_FEATURE_TASK_AGENT_MODE = "true";
Expand All @@ -44,7 +60,9 @@ describe("POST /api/agent Integration Tests", () => {
});
});

afterEach(() => {
afterEach(async () => {
await Promise.allSettled(pendingAfterCallbacks);
pendingAfterCallbacks.length = 0;
delete process.env.NEXT_PUBLIC_FEATURE_TASK_AGENT_MODE;
});

Expand Down Expand Up @@ -101,6 +119,9 @@ describe("POST /api/agent Integration Tests", () => {
json: () => Promise.resolve({ token: "mock-stream-token" }),
});
}
if (url.includes("/stream/")) {
return Promise.resolve(new Response("data: {}\n\n"));
}
return Promise.resolve({
ok: false,
text: () => Promise.resolve("Not found"),
Expand Down Expand Up @@ -139,6 +160,8 @@ describe("POST /api/agent Integration Tests", () => {

expect(response.status).toBe(200);
expect(data.success).toBe(true);
expect(data.backgroundStream).toBe(true);
expect(data.streamToken).toBeUndefined();

const sessionCall = mockFetch.mock.calls.find(([url]: [string]) =>
url.includes("/session"),
Expand All @@ -149,6 +172,47 @@ describe("POST /api/agent Integration Tests", () => {
expect(sessionBody.agent_name).toBeUndefined();
expect(sessionBody.model).toBe("sonnet");
});

test("starts the agent stream in a Next after callback", async () => {
const user = await createTestUser();
const workspace = await createTestWorkspace({ ownerId: user.id });
const task = await createTestTask({
workspaceId: workspace.id,
createdById: user.id,
title: "Background stream task",
});

await db.task.update({
where: { id: task.id },
data: { mode: "agent", model: "sonnet" },
});

getMockedSession().mockResolvedValue(createAuthenticatedSession(user));

const request = createPostRequest("http://localhost/api/agent", {
message: "Keep working if I leave the page",
taskId: task.id,
model: "sonnet",
});

const response = await POST(request);
const data = await response.json();

expect(response.status).toBe(200);
expect(data.backgroundStream).toBe(true);
expect(mockAfter).toHaveBeenCalledTimes(1);

await Promise.all(pendingAfterCallbacks);

const streamCall = mockFetch.mock.calls.find(([url]: [string]) =>
url.includes(`/stream/${task.id}`),
);
expect(streamCall).toBeDefined();
expect(streamCall?.[0]).toContain("token=mock-stream-token");
expect(JSON.parse(streamCall?.[1].body)).toEqual({
prompt: "Keep working if I leave the page",
});
});
});

// NOTE: Most tests commented out due to significant implementation gaps:
Expand Down
92 changes: 86 additions & 6 deletions src/app/api/agent/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* - `agentWebhookSecret`: Encrypted per-task secret for JWT signing
*/

import { NextRequest, NextResponse } from "next/server";
import { after, NextRequest, NextResponse } from "next/server";
import { authOptions } from "@/lib/auth/nextauth";
import { getServerSession } from "next-auth/next";
import { db } from "@/lib/db";
Expand All @@ -70,6 +70,9 @@ import { claimPodAndGetFrontend, updatePodRepositories, POD_PORTS, releasePodByI

const encryptionService = EncryptionService.getInstance();

export const runtime = "nodejs";
export const maxDuration = 300;

// ============================================================================
// Types
// ============================================================================
Expand Down Expand Up @@ -107,6 +110,14 @@ interface ServiceInfo {
scripts?: Record<string, string>;
}

interface AgentStreamRequest {
agentUrl: string;
taskId: string;
streamToken: string;
prompt: string;
resume: boolean;
}

// ============================================================================
// Helper Functions
// ============================================================================
Expand Down Expand Up @@ -400,6 +411,68 @@ async function saveUserMessage(taskId: string, message: string, artifacts: Artif
}
}

/**
* Start and drain the remote agent stream server-side.
*
* The agent writes durable updates through /api/agent/webhook, so Hive only
* needs to keep the stream request alive. Running this from Next's after()
* decouples the agent run from the browser tab that started it.
*/
async function startAgentStreamInBackground({
agentUrl,
taskId,
streamToken,
prompt,
resume,
}: AgentStreamRequest): Promise<void> {
const streamUrl = agentUrl.replace(/\/$/, "") + `/stream/${taskId}`;
const headers: Record<string, string> = {
"Content-Type": "application/json",
};

const streamBody: Record<string, unknown> = { prompt };
if (resume) {
streamBody.resume = true;
}

try {
console.log("[Agent] Starting background stream:", { taskId, resume });
const streamResponse = await fetch(`${streamUrl}?token=${encodeURIComponent(streamToken)}`, {
method: "POST",
headers,
body: JSON.stringify(streamBody),
});

if (!streamResponse.ok) {
const errorText = await streamResponse.text().catch(() => "");
console.error("[Agent] Background stream failed:", {
taskId,
status: streamResponse.status,
error: errorText || streamResponse.statusText,
});
return;
}

if (!streamResponse.body) {
console.log("[Agent] Background stream completed without response body:", { taskId });
return;
}

const reader = streamResponse.body.getReader();
try {
while (true) {
const { done } = await reader.read();
if (done) break;
}
console.log("[Agent] Background stream completed:", { taskId });
} finally {
reader.releaseLock();
}
} catch (error) {
console.error("[Agent] Error running background stream:", { taskId, error });
}
}

// ============================================================================
// Main Handler
// ============================================================================
Expand Down Expand Up @@ -600,17 +673,24 @@ export async function POST(request: NextRequest) {
}
await saveUserMessage(taskId, message, allArtifacts);

// 9. Return connection info
const streamUrl = agentCredentials.agentUrl.replace(/\/$/, "") + `/stream/${taskId}`;
const isResume = messageCount > 0 && !chatHistoryForPrompt;
const promptWithContext = chatHistoryForPrompt ? `${chatHistoryForPrompt}\n\n${message}` : message;

after(async () => {
await startAgentStreamInBackground({
agentUrl: agentCredentials.agentUrl,
taskId,
streamToken,
prompt: promptWithContext,
resume: isResume,
});
});

return NextResponse.json({
success: true,
sessionId: taskId,
streamToken,
streamUrl,
backgroundStream: true,
resume: isResume,
...(chatHistoryForPrompt && { historyContext: chatHistoryForPrompt }),
...(podUrls && { podUrls }),
});
}
14 changes: 13 additions & 1 deletion src/app/w/[slug]/task/[...taskParams]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,14 @@ export default function TaskChatPage() {
throw new Error(errorData.error || `Failed to create session: ${sessionResponse.statusText}`);
}

const { streamToken, streamUrl, resume, historyContext, podUrls } = await sessionResponse.json();
const {
backgroundStream,
streamToken,
streamUrl,
resume,
historyContext,
podUrls,
} = await sessionResponse.json();

// If backend claimed a pod (new task or re-claim), update state and add artifacts
if (podUrls) {
Expand Down Expand Up @@ -1336,6 +1343,11 @@ export default function TaskChatPage() {
// Signal that pod is ready (for handleStart to switch views)
options?.onPodReady?.();

if (backgroundStream) {
setIsLoading(false);
return;
}

// 2. Connect directly to remote server for streaming
// If historyContext is provided, the session was not found on the pod
// so we prepend the chat history to help the agent understand the conversation
Expand Down