diff --git a/src/__tests__/integration/api/agent.test.ts b/src/__tests__/integration/api/agent.test.ts index bb4602f1f1..80db3f5dbb 100644 --- a/src/__tests__/integration/api/agent.test.ts +++ b/src/__tests__/integration/api/agent.test.ts @@ -10,7 +10,10 @@ import { import { createTestUser } from "@/__tests__/support/factories/user.factory"; import { createTestTask } from "@/__tests__/support/factories/task.factory"; import { createTestWorkspace } from "@/__tests__/support/factories/workspace.factory"; +import { createTestSwarm } from "@/__tests__/support/factories/swarm.factory"; +import { createTestPod } from "@/__tests__/support/factories/pod.factory"; import { db } from "@/lib/db"; +import { PodUsageStatus } from "@prisma/client"; // Mock the gooseWeb provider const mockStreamText = vi.fn(); @@ -182,6 +185,85 @@ describe("POST /api/agent Integration Tests", () => { }); }); + describe("Pod claim rollback", () => { + test("releases the pod and clears task state when claim setup fails after reservation", async () => { + const user = await createTestUser(); + const workspace = await createTestWorkspace({ ownerId: user.id }); + const swarm = await createTestSwarm({ + workspaceId: workspace.id, + status: "ACTIVE", + }); + + await db.swarm.update({ + where: { id: swarm.id }, + data: { + poolApiKey: "test-pool-api-key", + }, + }); + + const pod = await createTestPod({ + swarmId: swarm.id, + portMappings: [3000], + password: "plain-password", + }); + + const task = await createTestTask({ + workspaceId: workspace.id, + createdById: user.id, + title: "Rollback task", + }); + + await db.task.update({ + where: { id: task.id }, + data: { mode: "agent" }, + }); + + getMockedSession().mockResolvedValue(createAuthenticatedSession(user)); + + const request = createPostRequest("http://localhost/api/agent", { + message: "Start the agent", + taskId: task.id, + }); + + const response = await POST(request); + const data = await response.json(); + + expect(response.status).toBe(503); + expect(data.error).toBe("No pods available"); + expect(mockFetch).not.toHaveBeenCalled(); + + const [updatedTask, updatedPod] = await Promise.all([ + db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentUrl: true, + agentPassword: true, + }, + }), + db.pod.findUnique({ + where: { id: pod.id }, + select: { + usageStatus: true, + usageStatusMarkedAt: true, + usageStatusMarkedBy: true, + }, + }), + ]); + + expect(updatedTask).toEqual({ + podId: null, + agentUrl: null, + agentPassword: null, + }); + expect(updatedPod).toEqual({ + usageStatus: PodUsageStatus.UNUSED, + usageStatusMarkedAt: null, + usageStatusMarkedBy: null, + }); + }); + }); + // NOTE: Most tests commented out due to significant implementation gaps: // 1. Production code does NOT validate message field (empty, missing, whitespace) // 2. Task/ChatMessage persistence only works with taskId (no standalone messages saved without task) diff --git a/src/__tests__/integration/api/chat-response-notifications.test.ts b/src/__tests__/integration/api/chat-response-notifications.test.ts index 683f6c5bb9..7ad6915dfb 100644 --- a/src/__tests__/integration/api/chat-response-notifications.test.ts +++ b/src/__tests__/integration/api/chat-response-notifications.test.ts @@ -9,7 +9,8 @@ import { db } from "@/lib/db"; import { POST } from "@/app/api/chat/response/route"; import { NextRequest } from "next/server"; import { resetDatabase } from "@/__tests__/support/utilities/database"; -import { NotificationTriggerType, NotificationTriggerStatus } from "@prisma/client"; +import { NotificationTriggerType, NotificationTriggerStatus, PodUsageStatus } from "@prisma/client"; +import { EncryptionService } from "@/lib/encryption"; // Mock Sphinx delivery vi.mock("@/lib/sphinx/direct-message", () => ({ @@ -201,4 +202,168 @@ describe("POST /api/chat/response — plan artifact notifications", () => { expect(record!.sendAfter!.getTime()).toBeGreaterThan(Date.now() + 4 * 60 * 1000); expect(record!.message).toBeTruthy(); }); + + it("does not mutate task pod fields when an artifact references a nonexistent pod", async () => { + const task = await db.task.create({ + data: { + title: "Missing pod task", + workspaceId: workspace.id, + createdById: owner.id, + updatedById: owner.id, + }, + }); + + const req = makeRequest({ + taskId: task.id, + message: "IDE opened", + artifacts: [ + { + type: "IDE", + content: { + url: "https://ide.test", + podId: "missing-pod-id", + }, + }, + ], + }); + + const res = await POST(req); + expect(res.status).toBe(201); + + const [updatedTask, claimedPodCount] = await Promise.all([ + db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentUrl: true, + agentPassword: true, + }, + }), + db.pod.count({ + where: { + usageStatusMarkedBy: task.id, + }, + }), + ]); + + expect(updatedTask).toEqual({ + podId: null, + agentUrl: null, + agentPassword: null, + }); + expect(claimedPodCount).toBe(0); + }); + + it("attaches a valid artifact pod to the task and mirrors pod usage state", async () => { + const { createTestSwarm } = await import("@/__tests__/support/factories/swarm.factory"); + const { createTestPod } = await import("@/__tests__/support/factories/pod.factory"); + const encryptionService = EncryptionService.getInstance(); + + const swarm = await createTestSwarm({ + workspaceId: workspace.id, + status: "ACTIVE", + }); + + const pod = await createTestPod({ + swarmId: swarm.id, + password: "plain-pod-password", + portMappings: [3000, 15552], + }); + + const task = await db.task.create({ + data: { + title: "Attach artifact pod task", + workspaceId: workspace.id, + createdById: owner.id, + updatedById: owner.id, + }, + }); + + const req = makeRequest({ + taskId: task.id, + message: "IDE opened", + artifacts: [ + { + type: "IDE", + content: { + url: "https://ide.test", + podId: pod.podId, + agentPassword: "artifact-secret", + }, + }, + ], + }); + + const res = await POST(req); + expect(res.status).toBe(201); + + const [updatedTask, updatedPod] = await Promise.all([ + db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentUrl: true, + agentPassword: true, + }, + }), + db.pod.findUnique({ + where: { id: pod.id }, + select: { + usageStatus: true, + usageStatusMarkedBy: true, + }, + }), + ]); + + expect(updatedTask?.podId).toBe(pod.podId); + expect(updatedTask?.agentUrl).toBeNull(); + expect(updatedTask?.agentPassword).toBeTruthy(); + expect(encryptionService.decryptField("agentPassword", updatedTask!.agentPassword!)).toBe("artifact-secret"); + expect(updatedPod).toEqual({ + usageStatus: PodUsageStatus.USED, + usageStatusMarkedBy: task.id, + }); + }); + + it("does not store agentPassword when the task podId points to a missing pod", async () => { + const task = await db.task.create({ + data: { + title: "Stale pod task", + workspaceId: workspace.id, + createdById: owner.id, + updatedById: owner.id, + podId: "stale-pod-id", + }, + }); + + const req = makeRequest({ + taskId: task.id, + message: "IDE reopened", + artifacts: [ + { + type: "IDE", + content: { + url: "https://ide.test", + agentPassword: "artifact-secret", + }, + }, + ], + }); + + const res = await POST(req); + expect(res.status).toBe(201); + + const updatedTask = await db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentPassword: true, + }, + }); + + expect(updatedTask).toEqual({ + podId: "stale-pod-id", + agentPassword: null, + }); + }); }); diff --git a/src/__tests__/integration/api/pool-manager-claim-pod.test.ts b/src/__tests__/integration/api/pool-manager-claim-pod.test.ts index 1d92872931..52d1400b5d 100644 --- a/src/__tests__/integration/api/pool-manager-claim-pod.test.ts +++ b/src/__tests__/integration/api/pool-manager-claim-pod.test.ts @@ -18,8 +18,8 @@ import { createTestSwarm, createTestTask, } from "@/__tests__/support/fixtures"; -import { EncryptionService } from "@/lib/encryption"; import { db } from "@/lib/db"; +import { PodStatus, PodUsageStatus } from "@prisma/client"; // Mock environment config vi.mock("@/config/env", () => ({ @@ -357,5 +357,171 @@ describe("POST /api/pool-manager/claim-pod/[workspaceId] - Integration Tests", ( await expectError(response, "Task already has a pod assigned", 409); }); + + test("releases the pod and clears task fields when task claim setup fails after reservation", async () => { + const { owner, workspace, swarm } = await createTestWorkspaceScenario({ + withSwarm: true, + }); + + const pod = await db.pod.create({ + data: { + podId: `test-pod-rollback-${Date.now()}`, + swarmId: swarm!.id, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.UNUSED, + password: "plain-password", + portMappings: [3000], + }, + }); + + const task = await createTestTask({ + workspaceId: workspace.id, + createdById: owner.id, + }); + + const request = createAuthenticatedPostRequest( + `http://localhost:3000/api/pool-manager/claim-pod/${workspace.id}?taskId=${task.id}`, + owner, + {}, + ); + + const response = await POST(request, { + params: Promise.resolve({ workspaceId: workspace.id }), + }); + + await expectError(response, "Failed to claim pod", 500); + + const [updatedTask, updatedPod] = await Promise.all([ + db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentUrl: true, + agentPassword: true, + }, + }), + db.pod.findUnique({ + where: { id: pod.id }, + select: { + usageStatus: true, + usageStatusMarkedAt: true, + usageStatusMarkedBy: true, + }, + }), + ]); + + expect(updatedTask).toEqual({ + podId: null, + agentUrl: null, + agentPassword: null, + }); + expect(updatedPod).toEqual({ + usageStatus: PodUsageStatus.UNUSED, + usageStatusMarkedAt: null, + usageStatusMarkedBy: null, + }); + }); + + test("retries a failed task claim without consuming additional pods", async () => { + const { owner, workspace, swarm } = await createTestWorkspaceScenario({ + withSwarm: true, + }); + + const suffix = Date.now(); + const [badPod, goodPod] = await Promise.all([ + db.pod.create({ + data: { + podId: `test-pod-bad-${suffix}`, + swarmId: swarm!.id, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.UNUSED, + password: "plain-password", + portMappings: [3000], + createdAt: new Date("2024-01-01T00:00:00Z"), + }, + }), + db.pod.create({ + data: { + podId: `test-pod-good-${suffix}`, + swarmId: swarm!.id, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.UNUSED, + password: "plain-password", + portMappings: [3000, 15552], + createdAt: new Date("2024-01-02T00:00:00Z"), + }, + }), + ]); + + const task = await createTestTask({ + workspaceId: workspace.id, + createdById: owner.id, + }); + + const requestUrl = `http://localhost:3000/api/pool-manager/claim-pod/${workspace.id}?taskId=${task.id}`; + const firstRequest = createAuthenticatedPostRequest(requestUrl, owner, {}); + const firstResponse = await POST(firstRequest, { + params: Promise.resolve({ workspaceId: workspace.id }), + }); + await expectError(firstResponse, "Failed to claim pod", 500); + + const secondRequest = createAuthenticatedPostRequest(requestUrl, owner, {}); + const secondResponse = await POST(secondRequest, { + params: Promise.resolve({ workspaceId: workspace.id }), + }); + await expectError(secondResponse, "Failed to claim pod", 500); + + const [updatedTask, pods, usedPods] = await Promise.all([ + db.task.findUnique({ + where: { id: task.id }, + select: { + podId: true, + agentUrl: true, + agentPassword: true, + }, + }), + db.pod.findMany({ + where: { + id: { + in: [badPod.id, goodPod.id], + }, + }, + orderBy: { createdAt: "asc" }, + select: { + podId: true, + usageStatus: true, + usageStatusMarkedAt: true, + usageStatusMarkedBy: true, + }, + }), + db.pod.count({ + where: { + swarmId: swarm!.id, + usageStatus: PodUsageStatus.USED, + }, + }), + ]); + + expect(updatedTask).toEqual({ + podId: null, + agentUrl: null, + agentPassword: null, + }); + expect(usedPods).toBe(0); + expect(pods).toEqual([ + { + podId: badPod.podId, + usageStatus: PodUsageStatus.UNUSED, + usageStatusMarkedAt: null, + usageStatusMarkedBy: null, + }, + { + podId: goodPod.podId, + usageStatus: PodUsageStatus.UNUSED, + usageStatusMarkedAt: null, + usageStatusMarkedBy: null, + }, + ]); + }); }); }); diff --git a/src/__tests__/integration/lib/pods/queries.test.ts b/src/__tests__/integration/lib/pods/queries.test.ts index 7448255b74..c8a7d5a5f9 100644 --- a/src/__tests__/integration/lib/pods/queries.test.ts +++ b/src/__tests__/integration/lib/pods/queries.test.ts @@ -3,11 +3,13 @@ * Tests atomic operations, race conditions, and soft-delete filtering */ -import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { db } from "@/lib/db"; import { PodStatus, PodUsageStatus } from "@prisma/client"; import { claimAvailablePod, + attachPodToTaskAtomically, + claimPodForTaskAtomically, getPodDetails, releasePodById, getPodUsageStatus, @@ -361,6 +363,196 @@ describe("Pod Queries", () => { }); }); + describe("claimPodForTaskAtomically", () => { + it("should only reserve one pod for concurrent claims on the same task", async () => { + const suffix = Date.now(); + + await Promise.all( + Array.from({ length: 5 }, (_, index) => + db.pod.create({ + data: { + podId: `test-pod-task-race-${suffix}-${index}`, + swarmId: testSwarmId, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.UNUSED, + password: `password-${index}`, + portMappings: [3000, 15552], + }, + }), + ), + ); + + const task = await db.task.create({ + data: { + title: `Concurrent Claim Task ${suffix}`, + workspace: { connect: { id: testWorkspaceId } }, + createdBy: { connect: { id: testUserId } }, + updatedBy: { connect: { id: testUserId } }, + }, + }); + + const results = await Promise.all( + Array.from({ length: 5 }, () => claimPodForTaskAtomically(testSwarmId, task.id)), + ); + + const successfulClaims = results.filter((result): result is Pod => result !== null); + const failedClaims = results.filter((result) => result === null); + + expect(successfulClaims).toHaveLength(1); + expect(failedClaims).toHaveLength(4); + + const updatedTask = await db.task.findUnique({ + where: { id: task.id }, + }); + expect(updatedTask?.podId).toBe(successfulClaims[0].podId); + + const usedPods = await db.pod.findMany({ + where: { + swarmId: testSwarmId, + usageStatus: PodUsageStatus.USED, + }, + select: { + podId: true, + usageStatusMarkedBy: true, + }, + }); + + expect(usedPods).toHaveLength(1); + expect(usedPods[0]).toEqual({ + podId: successfulClaims[0].podId, + usageStatusMarkedBy: task.id, + }); + }); + }); + + describe("attachPodToTaskAtomically", () => { + it("should attach an existing pod to a task and mirror usage state", async () => { + const suffix = Date.now(); + const pod = await db.pod.create({ + data: { + podId: `test-pod-attach-${suffix}`, + swarmId: testSwarmId, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.UNUSED, + password: "plain-password", + portMappings: [3000, 15552], + }, + }); + + const task = await db.task.create({ + data: { + title: `Attach Pod Task ${suffix}`, + workspace: { connect: { id: testWorkspaceId } }, + createdBy: { connect: { id: testUserId } }, + updatedBy: { connect: { id: testUserId } }, + }, + }); + + const result = await attachPodToTaskAtomically({ + taskId: task.id, + podId: pod.podId, + agentUrl: "https://control.test", + agentPassword: "encrypted-agent-password", + }); + + expect(result).toEqual({ + podId: pod.podId, + taskId: task.id, + workspaceSlug: expect.any(String), + }); + + const [updatedTask, updatedPod] = await Promise.all([ + db.task.findUnique({ where: { id: task.id } }), + db.pod.findUnique({ where: { id: pod.id } }), + ]); + + expect(updatedTask?.podId).toBe(pod.podId); + expect(updatedTask?.agentUrl).toBe("https://control.test"); + expect(updatedTask?.agentPassword).toBe("encrypted-agent-password"); + expect(updatedPod?.usageStatus).toBe(PodUsageStatus.USED); + expect(updatedPod?.usageStatusMarkedBy).toBe(task.id); + expect(updatedPod?.usageStatusMarkedAt).toBeInstanceOf(Date); + }); + + it("should reject nonexistent pods and leave the task unchanged", async () => { + const suffix = Date.now(); + const task = await db.task.create({ + data: { + title: `Attach Missing Pod Task ${suffix}`, + workspace: { connect: { id: testWorkspaceId } }, + createdBy: { connect: { id: testUserId } }, + updatedBy: { connect: { id: testUserId } }, + }, + }); + + await expect( + attachPodToTaskAtomically({ + taskId: task.id, + podId: `missing-pod-${suffix}`, + }), + ).rejects.toThrow(`Pod missing-pod-${suffix} not found`); + + const updatedTask = await db.task.findUnique({ + where: { id: task.id }, + }); + + expect(updatedTask?.podId).toBeNull(); + expect(updatedTask?.agentUrl).toBeNull(); + expect(updatedTask?.agentPassword).toBeNull(); + }); + + it("should reject attaching a pod already assigned to another task", async () => { + const suffix = Date.now(); + const pod = await db.pod.create({ + data: { + podId: `test-pod-shared-${suffix}`, + swarmId: testSwarmId, + status: PodStatus.RUNNING, + usageStatus: PodUsageStatus.USED, + usageStatusMarkedAt: new Date(), + usageStatusMarkedBy: `task-owner-${suffix}`, + password: "plain-password", + portMappings: [3000, 15552], + }, + }); + + const [existingTask, competingTask] = await Promise.all([ + db.task.create({ + data: { + title: `Existing Pod Task ${suffix}`, + workspace: { connect: { id: testWorkspaceId } }, + createdBy: { connect: { id: testUserId } }, + updatedBy: { connect: { id: testUserId } }, + podId: pod.podId, + }, + }), + db.task.create({ + data: { + title: `Competing Pod Task ${suffix}`, + workspace: { connect: { id: testWorkspaceId } }, + createdBy: { connect: { id: testUserId } }, + updatedBy: { connect: { id: testUserId } }, + }, + }), + ]); + + await expect( + attachPodToTaskAtomically({ + taskId: competingTask.id, + podId: pod.podId, + }), + ).rejects.toThrow(`Pod ${pod.podId} is already assigned to task ${existingTask.id}`); + + const [reloadedExistingTask, reloadedCompetingTask] = await Promise.all([ + db.task.findUnique({ where: { id: existingTask.id } }), + db.task.findUnique({ where: { id: competingTask.id } }), + ]); + + expect(reloadedExistingTask?.podId).toBe(pod.podId); + expect(reloadedCompetingTask?.podId).toBeNull(); + }); + }); + describe("getPodDetails", () => { it("should return password and portMappings", async () => { const pod = await db.pod.create({ @@ -453,6 +645,8 @@ describe("Pod Queries", () => { connect: { id: testUserId }, }, podId: pod.podId, + agentUrl: "https://control.test", + agentPassword: "encrypted-password", }, }); @@ -464,12 +658,13 @@ describe("Pod Queries", () => { expect(releasedPod?.usageStatusMarkedBy).toBeNull(); expect(releasedPod?.usageStatusReason).toBeNull(); - // Verify task's podId and agentPassword are cleared + // Verify task pod association and credentials are cleared const updatedTask = await db.task.findUnique({ where: { id: task.id }, }); expect(updatedTask?.podId).toBeNull(); + expect(updatedTask?.agentUrl).toBeNull(); expect(updatedTask?.agentPassword).toBeNull(); }); @@ -494,6 +689,7 @@ describe("Pod Queries", () => { createdBy: { connect: { id: testUserId } }, updatedBy: { connect: { id: testUserId } }, podId: pod.podId, + agentUrl: "https://control-1.test", agentPassword: "encrypted-password-1", }, }), @@ -504,6 +700,7 @@ describe("Pod Queries", () => { createdBy: { connect: { id: testUserId } }, updatedBy: { connect: { id: testUserId } }, podId: pod.podId, + agentUrl: "https://control-2.test", agentPassword: "encrypted-password-2", }, }), @@ -525,6 +722,8 @@ describe("Pod Queries", () => { // Verify all tasks also have agentPassword cleared const updatedTask1 = await db.task.findUnique({ where: { id: task1.id } }); const updatedTask2 = await db.task.findUnique({ where: { id: task2.id } }); + expect(updatedTask1?.agentUrl).toBeNull(); + expect(updatedTask2?.agentUrl).toBeNull(); expect(updatedTask1?.agentPassword).toBeNull(); expect(updatedTask2?.agentPassword).toBeNull(); }); diff --git a/src/app/api/agent/route.ts b/src/app/api/agent/route.ts index 9522acf7b7..fe5d07cbf3 100644 --- a/src/app/api/agent/route.ts +++ b/src/app/api/agent/route.ts @@ -65,7 +65,7 @@ import { EncryptionService } from "@/lib/encryption"; import { ChatRole, ChatStatus, ArtifactType } from "@prisma/client"; import { createWebhookToken, generateWebhookSecret } from "@/lib/auth/agent-jwt"; import { isValidModel, getApiKeyForModel, type ModelName } from "@/lib/ai/models"; -import { claimPodAndGetFrontend, updatePodRepositories, POD_PORTS } from "@/lib/pods"; +import { claimTaskPodAndSetup, type ServiceInfo } from "@/lib/pods"; const encryptionService = EncryptionService.getInstance(); @@ -100,12 +100,6 @@ interface PodClaimResult { credentials: AgentCredentials; } -interface ServiceInfo { - name: string; - port: number; - scripts?: Record; -} - // ============================================================================ // Helper Functions // ============================================================================ @@ -204,49 +198,28 @@ async function claimPodForTask(taskId: string, workspaceId: string): Promise ({ url: repo.repositoryUrl })), + refreshRepositories: true, + requireControlPort: true, + }); - if (!controlUrl) { - throw new Error("Pod control port not available"); + if (!claimResult) { + throw new Error("Task already has a pod assigned"); } - // Update repositories on new pod (non-fatal if fails) - if (workspace.repositories.length > 0) { - try { - await updatePodRepositories( - controlUrl, - podWorkspace.password, - workspace.repositories.map((r) => ({ url: r.repositoryUrl })), - ); - console.log("[Agent] Updated repositories on pod"); - } catch (repoError) { - console.error("[Agent] Error updating repositories (non-fatal):", repoError); - } - } - - // Store pod credentials on task - const encryptedPassword = encryptionService.encryptField("agentPassword", podWorkspace.password); - await db.task.update({ - where: { id: taskId }, - data: { - podId: podWorkspace.id, - agentUrl: controlUrl, - agentPassword: JSON.stringify(encryptedPassword), - }, - }); - - console.log("[Agent] Claimed pod:", podWorkspace.id, "for task:", taskId); + console.log("[Agent] Claimed pod:", claimResult.podId, "for task:", taskId); return { - podId: podWorkspace.id, - frontend, - ide: podWorkspace.url || podWorkspace.portMappings["8080"] || "", + podId: claimResult.podId, + frontend: claimResult.frontend, + ide: claimResult.ide || claimResult.workspace.portMappings["8080"] || "", credentials: { - agentUrl: controlUrl, - agentPassword: podWorkspace.password, + agentUrl: claimResult.control!, + agentPassword: claimResult.password, }, }; } @@ -466,6 +439,9 @@ export async function POST(request: NextRequest) { } } catch (claimError) { console.error("[Agent] Failed to claim pod:", claimError); + if (claimError instanceof Error && claimError.message === "Task already has a pod assigned") { + return NextResponse.json({ error: claimError.message }, { status: 409 }); + } return NextResponse.json({ error: "No pods available" }, { status: 503 }); } } else if (!task.podId && isUsingCustomUrl) { @@ -473,12 +449,12 @@ export async function POST(request: NextRequest) { const mockPodId = "local-dev"; const mockFrontend = process.env.MOCK_BROWSER_URL || "http://localhost:3000"; - // Store mock podId on task + // Store custom agent details without creating a fake pod/task link await db.task.update({ where: { id: taskId }, data: { - podId: mockPodId, agentUrl: process.env.CUSTOM_GOOSE_URL, + agentPassword: null, }, }); diff --git a/src/app/api/chat/response/route.ts b/src/app/api/chat/response/route.ts index cc72caa252..282a259df4 100644 --- a/src/app/api/chat/response/route.ts +++ b/src/app/api/chat/response/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from "next/server"; import { db } from "@/lib/db"; -import { Prisma, PodUsageStatus, WorkflowStatus, NotificationTriggerType } from "@prisma/client"; +import { Prisma, NotificationTriggerType } from "@prisma/client"; import { ChatRole, ChatStatus, @@ -14,6 +14,7 @@ import { } from "@/lib/chat"; import { pusherServer, getTaskChannelName, getFeatureChannelName, getWorkspaceChannelName, PUSHER_EVENTS } from "@/lib/pusher"; import { EncryptionService } from "@/lib/encryption"; +import { attachPodToTaskAtomically } from "@/lib/pods/queries"; import { processScreenshotUpload, processRecordingUpload } from "@/lib/screenshot-upload"; import { parsePlanXml } from "@/lib/utils/plan-xml"; import { createAndSendNotification } from "@/services/notifications"; @@ -390,53 +391,69 @@ export async function POST(request: NextRequest) { if (podId || agentPassword) { try { - const updateData: { podId?: string; agentPassword?: string } = {}; + let workspaceSlug: string | null = null; + let effectivePodId: string | undefined = podId; + const encryptedAgentPassword = agentPassword + ? JSON.stringify(EncryptionService.getInstance().encryptField("agentPassword", agentPassword)) + : undefined; if (podId) { - updateData.podId = podId; - } + const attachedTask = await attachPodToTaskAtomically({ + taskId, + podId, + ...(encryptedAgentPassword !== undefined ? { agentPassword: encryptedAgentPassword } : {}), + }); + workspaceSlug = attachedTask.workspaceSlug; + } else if (encryptedAgentPassword !== undefined) { + const currentTask = await db.task.findUnique({ + where: { id: taskId }, + select: { + podId: true, + workspace: { + select: { slug: true }, + }, + }, + }); - if (agentPassword) { - const encryptionService = EncryptionService.getInstance(); - const encrypted = encryptionService.encryptField("agentPassword", agentPassword); - updateData.agentPassword = JSON.stringify(encrypted); - } + if (!currentTask?.podId) { + throw new Error(`Cannot store agentPassword for task ${taskId} without a valid podId`); + } - const updatedTask = await db.task.update({ - where: { id: taskId }, - data: updateData, - include: { - workspace: { - select: { slug: true }, + const existingPod = await db.pod.findFirst({ + where: { + podId: currentTask.podId, + deletedAt: null, }, - }, - }); - console.log( - `✅ Stored podId=${podId}, agentPassword=${agentPassword ? "[encrypted]" : "undefined"} from artifact for task ${taskId}`, - ); + select: { podId: true }, + }); - // Sync pods table so pool status stays accurate - // Stakwork claims pods via Pool Manager directly, so we need to mark the pod as USED here - if (podId) { - try { - await db.pod.updateMany({ - where: { podId, deletedAt: null }, - data: { - usageStatus: PodUsageStatus.USED, - usageStatusMarkedAt: new Date(), - usageStatusMarkedBy: taskId, - }, - }); - console.log(`✅ Synced pods table: marked ${podId} as USED for task ${taskId}`); - } catch (podSyncError) { - console.error("Failed to sync pods table:", podSyncError); + if (!existingPod) { + throw new Error(`Cannot store agentPassword for task ${taskId} because pod ${currentTask.podId} does not exist`); } + + effectivePodId = currentTask.podId; + + const updatedTask = await db.task.update({ + where: { id: taskId }, + data: { + agentPassword: encryptedAgentPassword, + }, + include: { + workspace: { + select: { slug: true }, + }, + }, + }); + workspaceSlug = updatedTask.workspace?.slug ?? null; } + console.log( + `✅ Stored podId=${effectivePodId}, agentPassword=${agentPassword ? "[encrypted]" : "undefined"} from artifact for task ${taskId}`, + ); // Broadcast podId update to both channels for real-time UI updates const podUpdatePayload = { taskId, - podId, + podId: effectivePodId, timestamp: new Date(), }; @@ -449,9 +466,9 @@ export async function POST(request: NextRequest) { } // Send to workspace channel (for task list) - if (updatedTask.workspace?.slug) { + if (workspaceSlug) { try { - const workspaceChannelName = getWorkspaceChannelName(updatedTask.workspace.slug); + const workspaceChannelName = getWorkspaceChannelName(workspaceSlug); await pusherServer.trigger( workspaceChannelName, PUSHER_EVENTS.WORKSPACE_TASK_TITLE_UPDATE, diff --git a/src/app/api/pool-manager/claim-pod/[workspaceId]/route.ts b/src/app/api/pool-manager/claim-pod/[workspaceId]/route.ts index 74f4913ea0..1ff7444404 100644 --- a/src/app/api/pool-manager/claim-pod/[workspaceId]/route.ts +++ b/src/app/api/pool-manager/claim-pod/[workspaceId]/route.ts @@ -1,13 +1,15 @@ import { NextRequest, NextResponse } from "next/server"; import { db } from "@/lib/db"; -import { EncryptionService } from "@/lib/encryption"; import { type ApiError } from "@/types"; -import { claimPodAndGetFrontend, updatePodRepositories, POD_PORTS } from "@/lib/pods"; +import { + claimAvailablePodAndSetup, + claimTaskPodAndSetup, + updatePodRepositories, + POD_PORTS, +} from "@/lib/pods"; import { POD_BASE_DOMAIN } from "@/lib/pods/queries"; import { requireAuthOrApiToken, validateApiToken } from "@/lib/auth/api-token"; -const encryptionService: EncryptionService = EncryptionService.getInstance(); - export async function POST(request: NextRequest, { params }: { params: Promise<{ workspaceId: string }> }) { try { const { workspaceId } = await params; @@ -58,17 +60,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ const mockFrontend = process.env.MOCK_BROWSER_URL || "http://localhost:3000"; const mockPodId = "local-dev"; - // Still save podId and agentUrl to task in dev mode + // Store custom agent details without creating a fake pod/task link. if (taskId) { try { await db.task.update({ where: { id: taskId }, data: { - podId: mockPodId, agentUrl: process.env.CUSTOM_GOOSE_URL, + agentPassword: null, }, }); - console.log(`✅ Stored mock podId ${mockPodId} for task ${taskId}`); + console.log(`✅ Stored custom agent URL for task ${taskId}`); } catch (error) { console.error("Failed to store mock pod info:", error); } @@ -126,38 +128,51 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ return NextResponse.json({ success: false, error: "Workspace has no swarm configured" }, { status: 400 }); } - // Guard: reject if the task already has a pod assigned + // Get services from swarm + const services = workspace.swarm.services as + | Array<{ name: string; port: number; scripts?: Record }> + | null + | undefined; + + let frontend: string; + let control: string | null; + let ide: string | null; + let podWorkspace: Awaited>["workspace"]; if (taskId) { - const existingTask = await db.task.findUnique({ - where: { id: taskId }, - select: { podId: true }, + const result = await claimTaskPodAndSetup({ + taskId, + swarmId, + services: services || undefined, + repositories: workspace.repositories.map((repo) => ({ url: repo.repositoryUrl })), + refreshRepositories: shouldUpdateToLatest, + requireControlPort: true, }); - if (existingTask?.podId) { + if (!result) { return NextResponse.json( { success: false, error: "Task already has a pod assigned" }, { status: 409 }, ); } - } - // Get services from swarm - const services = workspace.swarm.services as - | Array<{ name: string; port: number; scripts?: Record }> - | null - | undefined; - - const userInfo = taskId || undefined; - - const { frontend, workspace: podWorkspace } = await claimPodAndGetFrontend( - swarmId, - userInfo, - services || undefined, - ); + frontend = result.frontend; + podWorkspace = result.workspace; + control = result.control; + ide = result.ide; + } else { + const result = await claimAvailablePodAndSetup({ + swarmId, + services: services || undefined, + }); + frontend = result.frontend; + podWorkspace = result.workspace; + control = result.control; + ide = result.ide; + } // If "latest" parameter is provided, update the pod repositories - if (shouldUpdateToLatest) { - const controlPortUrl = podWorkspace.portMappings[POD_PORTS.CONTROL]; + if (shouldUpdateToLatest && !taskId) { + const controlPortUrl = control; if (!controlPortUrl) { console.error(`Control port (${POD_PORTS.CONTROL}) not found in port mappings, skipping repository update`); @@ -176,34 +191,10 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ } } - // Extract control and pod URLs - const control = podWorkspace.portMappings[POD_PORTS.CONTROL] || null; const pod_url = `https://${podWorkspace.id}.${POD_BASE_DOMAIN}`; - const ide = podWorkspace.url || null; console.log(">>> control", control); - // If taskId is provided, store agent credentials and podId on the task - // Use control URL (staklink on port 15552) for agentUrl since /session endpoint is there - if (taskId && control) { - try { - const encryptedPassword = encryptionService.encryptField("agentPassword", podWorkspace.password); - - await db.task.update({ - where: { id: taskId }, - data: { - podId: podWorkspace.id, - agentUrl: control, - agentPassword: JSON.stringify(encryptedPassword), - }, - }); - - console.log(`✅ Stored podId ${podWorkspace.id} and agent credentials for task ${taskId}`); - } catch (error) { - console.error("Failed to store pod info:", error); - } - } - return NextResponse.json( { success: true, diff --git a/src/app/api/user-journeys/[taskId]/execute/route.ts b/src/app/api/user-journeys/[taskId]/execute/route.ts index 06af0164d5..4223da6df1 100644 --- a/src/app/api/user-journeys/[taskId]/execute/route.ts +++ b/src/app/api/user-journeys/[taskId]/execute/route.ts @@ -26,7 +26,7 @@ import { NextRequest, NextResponse } from "next/server"; import { getServerSession } from "next-auth"; import { authOptions } from "@/lib/auth/nextauth"; import { db } from "@/lib/db"; -import { claimPodAndGetFrontend, POD_PORTS } from "@/lib/pods"; +import { claimAvailablePodAndSetup, releasePodById } from "@/lib/pods"; import { EncryptionService } from "@/lib/encryption"; import { validateWorkspaceAccessById } from "@/services/workspace"; import crypto from "crypto"; @@ -36,6 +36,7 @@ export const fetchCache = "force-no-store"; const encryptionService = EncryptionService.getInstance(); export async function POST(request: NextRequest, { params }: { params: Promise<{ taskId: string }> }) { + let claimedPodId: string | null = null; try { // Step 1: Request Validation & Authentication const { taskId } = await params; @@ -110,7 +111,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ // Step 2: Claim Pod (Reuse Existing Logic) const poolId = task.workspace.swarm.id || task.workspace.swarm.poolName; - const poolApiKeyPlain = encryptionService.decryptField("poolApiKey", task.workspace.swarm.poolApiKey); // Get services from swarm const services = task.workspace.swarm.services as @@ -124,7 +124,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ let podResult; try { - podResult = await claimPodAndGetFrontend(poolId as string, poolApiKeyPlain, services || undefined); + podResult = await claimAvailablePodAndSetup({ + swarmId: poolId as string, + userInfo: taskId, + services: services || undefined, + requireControlPort: true, + }); + claimedPodId = podResult.podId; } catch (error) { console.error("Failed to claim pod:", error); return NextResponse.json( @@ -137,13 +143,9 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ } // Extract control URL and password from pod result - controlUrl = podResult.workspace.portMappings[POD_PORTS.CONTROL]; - podPassword = podResult.workspace.password; + controlUrl = podResult.control!; + podPassword = podResult.password; frontendUrl = podResult.frontend; - - if (!controlUrl) { - return NextResponse.json({ error: "Control port not available on claimed pod" }, { status: 500 }); - } } // Step 3: Generate One-Time API Key @@ -200,6 +202,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ } catch (error) { console.error("Failed to start test execution:", error); + if (claimedPodId) { + try { + await releasePodById(claimedPodId); + claimedPodId = null; + } catch (releaseError) { + console.error("Failed to release claimed pod after test start failure:", releaseError); + } + } + // Revert task status on failure try { await db.task.update({ @@ -243,6 +254,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ ); } catch (error) { console.error("Unexpected error in test execution endpoint:", error); + if (claimedPodId) { + try { + await releasePodById(claimedPodId); + } catch (releaseError) { + console.error("Failed to release claimed pod after unexpected error:", releaseError); + } + } return NextResponse.json({ error: "Internal error" }, { status: 500 }); } } diff --git a/src/lib/pods/capacity-queries.ts b/src/lib/pods/capacity-queries.ts index cb2d311c60..76cc6a51fc 100644 --- a/src/lib/pods/capacity-queries.ts +++ b/src/lib/pods/capacity-queries.ts @@ -19,6 +19,7 @@ export async function getBasicVMDataFromPods( podId: true, status: true, usageStatus: true, + usageStatusMarkedAt: true, usageStatusMarkedBy: true, password: true, createdAt: true, @@ -28,6 +29,24 @@ export async function getBasicVMDataFromPods( }, }); + const taskIdsByPodId = new Map( + ( + await db.task.findMany({ + where: { + deleted: false, + archived: false, + podId: { + in: pods.map((pod) => pod.podId), + }, + }, + select: { + id: true, + podId: true, + }, + }) + ).map((task) => [task.podId!, task.id]), + ); + return pods.map((pod) => { const url = buildPodUrl(pod.podId, POD_PORTS.CONTROL); const subdomain = pod.podId; @@ -51,9 +70,12 @@ export async function getBasicVMDataFromPods( // Map database usageStatus to pool-manager format const usage_status = pod.usageStatus === "USED" ? "used" : "unused"; - // Use usageStatusMarkedBy as user_info if VM is in use + // Task.podId is the authoritative task-to-pod link during Phase 1. + // Fall back to usageStatusMarkedBy for non-task/manual claims. const user_info = - usage_status === "used" ? pod.usageStatusMarkedBy ?? undefined : undefined; + usage_status === "used" + ? taskIdsByPodId.get(pod.podId) ?? pod.usageStatusMarkedBy ?? undefined + : undefined; return { id: pod.podId, @@ -62,7 +84,7 @@ export async function getBasicVMDataFromPods( internal_state: state, // Use same value as state for basic query usage_status, user_info: user_info ?? null, - marked_at: pod.usageStatusMarkedBy ? pod.createdAt.toISOString() : null, + marked_at: pod.usageStatusMarkedAt?.toISOString() ?? null, password: pod.password || undefined, url, repository: undefined, // Not available in basic query diff --git a/src/lib/pods/queries.ts b/src/lib/pods/queries.ts index 4f89eb89fd..b82f573cae 100644 --- a/src/lib/pods/queries.ts +++ b/src/lib/pods/queries.ts @@ -136,6 +136,207 @@ export async function findDeletedPods(swarmId: string): Promise { }); } +/** + * Atomically claim a pod for a specific task, preventing race conditions. + * Uses a transaction with SELECT FOR UPDATE on the task row to serialize + * concurrent requests for the same taskId, preventing multiple pods from + * being claimed for one task. The task's podId is persisted inside the same + * transaction so later claims immediately observe the assignment. + * + * @param swarmId - The swarm ID to claim a pod from + * @param taskId - The task ID to claim a pod for + * @returns The claimed pod, or null if the task already has a pod assigned + * @throws Error if no pods are available + */ +export async function claimPodForTaskAtomically(swarmId: string, taskId: string): Promise { + return db.$transaction(async (tx) => { + // Lock the task row to serialize concurrent claims for the same task + const taskRows = await tx.$queryRaw<{ pod_id: string | null }[]>` + SELECT pod_id FROM tasks WHERE id = ${taskId} FOR UPDATE + `; + + if (taskRows.length === 0) { + throw new Error(`Task ${taskId} not found`); + } + + // If task already has a pod, another request won the race + if (taskRows[0].pod_id) { + return null; + } + + // Now claim a pod — safe because we hold the task lock + const rawPods = await tx.$queryRaw` + UPDATE pods + SET + usage_status = 'USED'::"PodUsageStatus", + usage_status_marked_at = NOW(), + usage_status_marked_by = ${taskId} + WHERE id = ( + SELECT p.id FROM pods p + WHERE + p.swarm_id = ${swarmId} + AND p.status = 'RUNNING'::"PodStatus" + AND p.usage_status = 'UNUSED'::"PodUsageStatus" + AND p.deleted_at IS NULL + AND NOT EXISTS ( + SELECT 1 FROM tasks t + WHERE t.pod_id = p.pod_id + ) + ORDER BY p.created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING * + `; + + if (rawPods.length === 0) { + throw new Error(`No available pods for swarm: ${swarmId}`); + } + + await tx.task.update({ + where: { id: taskId }, + data: { podId: rawPods[0].pod_id }, + }); + + return mapRawPodResult(rawPods[0]); + }); +} + +export interface AttachPodToTaskOptions { + taskId: string; + podId: string; + agentUrl?: string | null; + agentPassword?: string | null; +} + +export interface AttachPodToTaskResult { + podId: string; + taskId: string; + workspaceSlug: string | null; +} + +/** + * Attach an existing pod row to a task while keeping task/pod state in sync. + * This is the single Phase 1 path for task-side pod assignments outside the + * initial claim transaction. + */ +export async function attachPodToTaskAtomically( + options: AttachPodToTaskOptions, +): Promise { + const { taskId, podId, agentUrl, agentPassword } = options; + + return db.$transaction(async (tx) => { + const taskRows = await tx.$queryRaw<{ pod_id: string | null }[]>` + SELECT pod_id FROM tasks WHERE id = ${taskId} FOR UPDATE + `; + + if (taskRows.length === 0) { + throw new Error(`Task ${taskId} not found`); + } + + const podRows = await tx.$queryRaw<{ id: string }[]>` + SELECT id + FROM pods + WHERE pod_id = ${podId} AND deleted_at IS NULL + FOR UPDATE + `; + + if (podRows.length === 0) { + throw new Error(`Pod ${podId} not found`); + } + + const conflictingTaskRows = await tx.$queryRaw<{ id: string }[]>` + SELECT id + FROM tasks + WHERE pod_id = ${podId} AND id <> ${taskId} + LIMIT 1 + FOR UPDATE + `; + + if (conflictingTaskRows.length > 0) { + throw new Error(`Pod ${podId} is already assigned to task ${conflictingTaskRows[0].id}`); + } + + const currentPodId = taskRows[0].pod_id; + if (currentPodId && currentPodId !== podId) { + throw new Error(`Task ${taskId} already has a different pod assigned`); + } + + const updatedTask = await tx.task.update({ + where: { id: taskId }, + data: { + podId, + ...(agentUrl !== undefined ? { agentUrl } : {}), + ...(agentPassword !== undefined ? { agentPassword } : {}), + }, + include: { + workspace: { + select: { slug: true }, + }, + }, + }); + + await tx.pod.update({ + where: { id: podRows[0].id }, + data: { + usageStatus: PodUsageStatus.USED, + usageStatusMarkedAt: new Date(), + usageStatusMarkedBy: taskId, + }, + }); + + return { + podId, + taskId, + workspaceSlug: updatedTask.workspace?.slug ?? null, + }; + }); +} + +interface RawPodResult { + id: string; + pod_id: string; + swarm_id: string; + status: PodStatus; + usage_status: PodUsageStatus; + usage_status_marked_at: Date | null; + usage_status_marked_by: string | null; + usage_status_reason: string | null; + password: string | null; + port_mappings: any; + flagged_for_recreation: boolean; + flagged_at: Date | null; + flagged_reason: string | null; + last_health_check: Date | null; + health_status: string | null; + created_at: Date; + updated_at: Date; + deleted_at: Date | null; +} + +function mapRawPodResult(rawPod: RawPodResult): Pod { + return { + id: rawPod.id, + podId: rawPod.pod_id, + swarmId: rawPod.swarm_id, + status: rawPod.status, + usageStatus: rawPod.usage_status, + usageStatusMarkedAt: rawPod.usage_status_marked_at, + usageStatusMarkedBy: rawPod.usage_status_marked_by, + usageStatusReason: rawPod.usage_status_reason, + password: rawPod.password, + portMappings: rawPod.port_mappings, + flaggedForRecreation: rawPod.flagged_for_recreation, + flaggedAt: rawPod.flagged_at, + flaggedReason: rawPod.flagged_reason, + lastHealthCheck: rawPod.last_health_check, + healthStatus: rawPod.health_status, + createdAt: rawPod.created_at, + updatedAt: rawPod.updated_at, + deletedAt: rawPod.deleted_at, + } as Pod; +} + /** * Atomically claim an available pod for a user * Uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions @@ -145,37 +346,15 @@ export async function findDeletedPods(swarmId: string): Promise { * @returns The claimed pod or null if none available */ export async function claimAvailablePod(swarmId: string, userInfo?: string): Promise { - // Use raw SQL for atomic SELECT FOR UPDATE SKIP LOCKED - interface RawPodResult { - id: string; - pod_id: string; - swarm_id: string; - status: PodStatus; - usage_status: PodUsageStatus; - usage_status_marked_at: Date | null; - usage_status_marked_by: string | null; - usage_status_reason: string | null; - password: string | null; - port_mappings: any; - flagged_for_recreation: boolean; - flagged_at: Date | null; - flagged_reason: string | null; - last_health_check: Date | null; - health_status: string | null; - created_at: Date; - updated_at: Date; - deleted_at: Date | null; - } - const rawPods = await db.$queryRaw` UPDATE pods - SET + SET usage_status = 'USED'::"PodUsageStatus", usage_status_marked_at = NOW(), usage_status_marked_by = ${userInfo} WHERE id = ( SELECT id FROM pods - WHERE + WHERE swarm_id = ${swarmId} AND status = 'RUNNING'::"PodStatus" AND usage_status = 'UNUSED'::"PodUsageStatus" @@ -191,28 +370,7 @@ export async function claimAvailablePod(swarmId: string, userInfo?: string): Pro return null; } - // Map snake_case to camelCase - const rawPod = rawPods[0]; - return { - id: rawPod.id, - podId: rawPod.pod_id, - swarmId: rawPod.swarm_id, - status: rawPod.status, - usageStatus: rawPod.usage_status, - usageStatusMarkedAt: rawPod.usage_status_marked_at, - usageStatusMarkedBy: rawPod.usage_status_marked_by, - usageStatusReason: rawPod.usage_status_reason, - password: rawPod.password, - portMappings: rawPod.port_mappings, - flaggedForRecreation: rawPod.flagged_for_recreation, - flaggedAt: rawPod.flagged_at, - flaggedReason: rawPod.flagged_reason, - lastHealthCheck: rawPod.last_health_check, - healthStatus: rawPod.health_status, - createdAt: rawPod.created_at, - updatedAt: rawPod.updated_at, - deletedAt: rawPod.deleted_at, - } as Pod; + return mapRawPodResult(rawPods[0]); } /** Base domain for pod URLs */ @@ -289,6 +447,7 @@ export async function releasePodById(podId: string): Promise { }, data: { podId: null, + agentUrl: null, agentPassword: null, }, }); diff --git a/src/lib/pods/utils.ts b/src/lib/pods/utils.ts index e25287bc19..4974690002 100644 --- a/src/lib/pods/utils.ts +++ b/src/lib/pods/utils.ts @@ -1,7 +1,17 @@ import { parsePM2Content } from "@/utils/devContainerUtils"; import { db } from "@/lib/db"; +import { EncryptionService } from "@/lib/encryption"; import { JlistProcess } from "@/types/pod-repair"; -import { claimAvailablePod, getPodDetails, releasePodById, getPodUsageStatus, buildPodUrl } from "./queries"; +import type { Pod } from "@prisma/client"; +import { + attachPodToTaskAtomically, + buildPodUrl, + claimAvailablePod, + claimPodForTaskAtomically, + getPodDetails, + getPodUsageStatus, + releasePodById, +} from "./queries"; // Re-export constants for external use export { POD_PORTS, PROCESS_NAMES } from "./constants"; @@ -95,7 +105,7 @@ function getFrontendUrl(processList: ProcessInfo[], portMappings: Record { - // Claim a pod from the database atomically - const pod = await claimAvailablePod(swarmId, userInfo); - - if (!pod) { - throw new Error(`No available pods for swarm: ${swarmId}`); - } - - console.log(">>> claimed pod", pod.podId); +const encryptionService = EncryptionService.getInstance(); - // Password is already decrypted from the database +export function buildPodWorkspace(pod: Pod): PodWorkspace { if (!pod.password) { throw new Error("Pod password not found"); } - const password = pod.password; - // Get port mappings as number array const portArray = (pod.portMappings as number[] | null) || []; - - // Convert port array to URL dictionary for PodWorkspace compatibility const portMappings: Record = {}; for (const port of portArray) { portMappings[port.toString()] = buildPodUrl(pod.podId, port); } - // Convert database pod to PodWorkspace format for compatibility - const workspace: PodWorkspace = { + return { id: pod.podId, - password, + password: pod.password, portMappings, state: pod.status, usage_status: pod.usageStatus, marked_at: pod.usageStatusMarkedAt?.toISOString() || "", - // Legacy fields - not used but kept for type compatibility branches: [], created: pod.createdAt.toISOString(), customImage: false, @@ -158,15 +150,17 @@ export async function claimPodAndGetFrontend( url: "", useDevContainer: false, }; +} - console.log(">>> workspace data", workspace); - +export async function discoverPodFrontend( + workspace: PodWorkspace, + services?: ServiceInfo[], +): Promise<{ frontend: string; processList?: ProcessInfo[] }> { let frontend: string | undefined; let processList: ProcessInfo[] | undefined; const controlPortUrl = workspace.portMappings[POD_PORTS.CONTROL]; - // Always try to fetch process list if control port exists if (controlPortUrl) { try { processList = await getProcessList(controlPortUrl, workspace.password); @@ -176,23 +170,20 @@ export async function claimPodAndGetFrontend( } } - // FIRST: Try to get frontend port from services array if provided if (services && services.length > 0) { try { const frontendService = services.find((svc) => svc.name === "frontend"); if (frontendService?.port) { console.log(`>>> Found frontend port ${frontendService.port} from services array`); - - // Try to find the port in port mappings frontend = workspace.portMappings[frontendService.port.toString()]; if (frontend) { console.log(`>>> Using frontend from services array on port ${frontendService.port}:`, frontend); - return { frontend, workspace, processList }; - } else { - console.log(`>>> Port ${frontendService.port} from services not found in port mappings, trying fallbacks`); + return { frontend, processList }; } + + console.log(`>>> Port ${frontendService.port} from services not found in port mappings, trying fallbacks`); } else { console.log(">>> No frontend service found in services array, trying fallbacks"); } @@ -201,33 +192,26 @@ export async function claimPodAndGetFrontend( } } - // SECOND: Try to get frontend from process discovery if we have process list if (processList) { try { - // Get the frontend URL from port mappings frontend = getFrontendUrl(processList, workspace.portMappings); } catch (error) { console.error( `>>> Failed to get frontend from process list, falling back to port ${POD_PORTS.FRONTEND_FALLBACK}:`, error, ); - // frontend remains undefined, will try fallback below } } else if (!controlPortUrl) { - // Control port not available, will try fallback console.error( `>>> Control port (${POD_PORTS.CONTROL}) not found in port mappings, falling back to port ${POD_PORTS.FRONTEND_FALLBACK}`, ); } - // If frontend not found via process discovery, use fallback if (!frontend) { - // Fallback to port 3000 if process discovery failed or control port was missing frontend = workspace.portMappings[POD_PORTS.FRONTEND_FALLBACK]; if (!frontend && controlPortUrl) { - // Final fallback: try to find frontend port from process list if we have it - let frontendPort = POD_PORTS.FRONTEND_FALLBACK as string; // default to 3000 if we can't find it + let frontendPort = POD_PORTS.FRONTEND_FALLBACK as string; if (processList) { const frontendProcess = processList.find((proc) => proc.name === PROCESS_NAMES.FRONTEND); @@ -237,7 +221,6 @@ export async function claimPodAndGetFrontend( } } - // Replace control port with dynamically discovered frontend port in controlPortUrl frontend = controlPortUrl.replace(POD_PORTS.CONTROL, frontendPort); console.log( `>>> Using final fallback - replacing port ${POD_PORTS.CONTROL} with ${frontendPort} in controlPortUrl:`, @@ -252,6 +235,159 @@ export async function claimPodAndGetFrontend( } } + return { frontend, processList }; +} + +export interface ClaimAvailablePodSetupOptions { + swarmId: string; + userInfo?: string; + services?: ServiceInfo[]; + requireControlPort?: boolean; +} + +export interface ClaimAvailablePodSetupResult { + podId: string; + workspace: PodWorkspace; + frontend: string; + control: string | null; + ide: string | null; + password: string; + processList?: ProcessInfo[]; +} + +export async function claimAvailablePodAndSetup( + options: ClaimAvailablePodSetupOptions, +): Promise { + const { swarmId, userInfo, services, requireControlPort = false } = options; + const pod = await claimAvailablePod(swarmId, userInfo); + + if (!pod) { + throw new Error(`No available pods for swarm: ${swarmId}`); + } + + try { + const workspace = buildPodWorkspace(pod); + const { frontend, processList } = await discoverPodFrontend(workspace, services); + const control = workspace.portMappings[POD_PORTS.CONTROL] || null; + + if (requireControlPort && !control) { + throw new Error("Pod control port not available"); + } + + return { + podId: workspace.id, + workspace, + frontend, + control, + ide: workspace.url || null, + password: workspace.password, + processList, + }; + } catch (error) { + try { + await releasePodById(pod.podId); + } catch (releaseError) { + console.error(`Failed to release pod ${pod.podId} after setup failure:`, releaseError); + } + throw error; + } +} + +export interface ClaimTaskPodSetupOptions { + taskId: string; + swarmId: string; + services?: ServiceInfo[]; + repositories?: Array<{ url: string }>; + refreshRepositories?: boolean; + requireControlPort?: boolean; +} + +export interface ClaimTaskPodSetupResult extends ClaimAvailablePodSetupResult {} + +export async function claimTaskPodAndSetup( + options: ClaimTaskPodSetupOptions, +): Promise { + const { + taskId, + swarmId, + services, + repositories = [], + refreshRepositories = false, + requireControlPort = true, + } = options; + + const claimedPod = await claimPodForTaskAtomically(swarmId, taskId); + if (!claimedPod) { + return null; + } + + try { + const workspace = buildPodWorkspace(claimedPod); + const { frontend, processList } = await discoverPodFrontend(workspace, services); + const control = workspace.portMappings[POD_PORTS.CONTROL] || null; + + if (requireControlPort && !control) { + throw new Error("Pod control port not available"); + } + + if (refreshRepositories && control && repositories.length > 0) { + try { + await updatePodRepositories(control, workspace.password, repositories); + } catch (repoError) { + console.error("[pods] Error updating repositories after claim (non-fatal):", repoError); + } + } + + const encryptedPassword = JSON.stringify( + encryptionService.encryptField("agentPassword", workspace.password), + ); + + await attachPodToTaskAtomically({ + taskId, + podId: workspace.id, + agentUrl: control, + agentPassword: encryptedPassword, + }); + + return { + podId: workspace.id, + workspace, + frontend, + control, + ide: workspace.url || null, + password: workspace.password, + processList, + }; + } catch (error) { + try { + await releasePodById(claimedPod.podId); + } catch (releaseError) { + console.error(`Failed to release pod ${claimedPod.podId} after task setup failure:`, releaseError); + } + throw error; + } +} + +export async function claimPodAndGetFrontend( + swarmId: string, + userInfo?: string, + services?: ServiceInfo[], + preClaimedPod?: import("@prisma/client").Pod, +): Promise<{ frontend: string; workspace: PodWorkspace; processList?: ProcessInfo[] }> { + // Use pre-claimed pod if provided, otherwise claim one + const pod = preClaimedPod ?? await claimAvailablePod(swarmId, userInfo); + + if (!pod) { + throw new Error(`No available pods for swarm: ${swarmId}`); + } + + console.log(">>> claimed pod", pod.podId); + + const workspace = buildPodWorkspace(pod); + console.log(">>> workspace data", workspace); + + const { frontend, processList } = await discoverPodFrontend(workspace, services); + return { frontend, workspace, processList }; } @@ -452,10 +588,10 @@ export async function releaseTaskPod(options: ReleaseTaskPodOptions): Promise