diff --git a/src/__tests__/integration/api/workflow-editor.test.ts b/src/__tests__/integration/api/workflow-editor.test.ts index eedb557e67..56a2ff4cfc 100644 --- a/src/__tests__/integration/api/workflow-editor.test.ts +++ b/src/__tests__/integration/api/workflow-editor.test.ts @@ -52,9 +52,11 @@ vi.mock("@/lib/pusher", () => ({ // Import mocked functions import { getGithubUsernameAndPAT } from "@/lib/auth/nextauth"; import { config } from "@/config/env"; +import { pusherServer, getTaskChannelName } from "@/lib/pusher"; const mockGetGithubUsernameAndPAT = vi.mocked(getGithubUsernameAndPAT); const mockFetch = global.fetch as vi.MockedFunction; +const mockPusherTrigger = vi.mocked(pusherServer.trigger); describe("POST /api/workflow-editor Integration Tests", () => { // Helper to create complete test data with all required relationships @@ -773,6 +775,87 @@ describe("POST /api/workflow-editor Integration Tests", () => { }); }); + describe("Pusher NEW_MESSAGE Tests", () => { + test("fires Pusher NEW_MESSAGE for user message after saving to DB", async () => { + const { user, task } = await createTestDataWithStakworkWorkspace(); + getMockedSession().mockResolvedValue(createAuthenticatedSession(user)); + + const request = createPostRequest("http://localhost:3000/api/workflow-editor", { + taskId: task.id, + message: "Test pusher trigger", + workflowId: 100, + workflowRefId: "workflow-ref-pusher", + }); + + await POST(request); + + // Should be called twice: once for user message, once for WORKFLOW artifact + expect(mockPusherTrigger).toHaveBeenCalledTimes(2); + + const firstCall = mockPusherTrigger.mock.calls[0]; + expect(firstCall[0]).toBe(`task-${task.id}`); + expect(firstCall[1]).toBe("new-message"); + // chatMessage.id — a string + expect(typeof firstCall[2]).toBe("string"); + // No sourceWebsocketID — fourth arg should be empty object + expect(firstCall[3]).toEqual({}); + }); + + test("passes socket_id in Pusher trigger when sourceWebsocketID is provided", async () => { + const { user, task } = await createTestDataWithStakworkWorkspace(); + getMockedSession().mockResolvedValue(createAuthenticatedSession(user)); + + const socketId = "test-socket-id-123"; + + const request = createPostRequest("http://localhost:3000/api/workflow-editor", { + taskId: task.id, + message: "Test pusher with socket id", + workflowId: 100, + workflowRefId: "workflow-ref-socket", + sourceWebsocketID: socketId, + }); + + await POST(request); + + expect(mockPusherTrigger).toHaveBeenCalledTimes(2); + + const firstCall = mockPusherTrigger.mock.calls[0]; + expect(firstCall[0]).toBe(`task-${task.id}`); + expect(firstCall[1]).toBe("new-message"); + // Fourth arg should include socket_id to exclude sender + expect(firstCall[3]).toEqual({ socket_id: socketId }); + }); + + test("second Pusher call fires for WORKFLOW artifact message", async () => { + const { user, task } = await createTestDataWithStakworkWorkspace(); + getMockedSession().mockResolvedValue(createAuthenticatedSession(user)); + + mockFetch.mockResolvedValue({ + ok: true, + json: async () => ({ success: true, data: { project_id: 55555 } }), + statusText: "OK", + } as Response); + + const request = createPostRequest("http://localhost:3000/api/workflow-editor", { + taskId: task.id, + message: "Test second pusher call", + workflowId: 100, + }); + + await POST(request); + + expect(mockPusherTrigger).toHaveBeenCalledTimes(2); + + // Second call is for the WORKFLOW artifact assistant message — no socket exclusion + const secondCall = mockPusherTrigger.mock.calls[1]; + expect(secondCall[0]).toBe(`task-${task.id}`); + expect(secondCall[1]).toBe("new-message"); + expect(typeof secondCall[2]).toBe("string"); + // Workflow artifact trigger has no fourth arg (socket exclusion) + expect(secondCall[3]).toBeUndefined(); + }); + }); + describe("Edge Cases and Integration Tests", () => { test("handles workflow with all optional fields populated", async () => { const { user, task } = await createTestDataWithStakworkWorkspace(); @@ -887,6 +970,7 @@ describe("POST /api/workflow-editor Integration Tests", () => { // Verify database state (1 USER + 1 ASSISTANT with WORKFLOW artifact) const messages = await db.chatMessage.findMany({ where: { taskId: task.id }, + orderBy: { createdAt: "asc" }, }); expect(messages).toHaveLength(2); @@ -896,6 +980,22 @@ describe("POST /api/workflow-editor Integration Tests", () => { stakworkProjectId: stakworkProjectId, }); expect(updatedTask?.workflowStartedAt).toBeDefined(); + + // Verify Pusher triggered twice: once for user message, once for WORKFLOW artifact + expect(mockPusherTrigger).toHaveBeenCalledTimes(2); + expect(mockPusherTrigger).toHaveBeenNthCalledWith( + 1, + getTaskChannelName(task.id), + "new-message", + messages[0].id, + {}, + ); + expect(mockPusherTrigger).toHaveBeenNthCalledWith( + 2, + getTaskChannelName(task.id), + "new-message", + messages[1].id, + ); }); }); }); diff --git a/src/__tests__/integration/services/task-assignment-notification.test.ts b/src/__tests__/integration/services/task-assignment-notification.test.ts index 13c7db48a7..3d055b5446 100644 --- a/src/__tests__/integration/services/task-assignment-notification.test.ts +++ b/src/__tests__/integration/services/task-assignment-notification.test.ts @@ -83,15 +83,22 @@ describe("TASK_ASSIGNED notification", () => { await updateTicket(task.id, owner.id, { assigneeId: assignee.id }); - await new Promise((r) => setTimeout(r, 100)); - - const record = await db.notificationTrigger.findFirst({ - where: { - targetUserId: assignee.id, - notificationType: NotificationTriggerType.TASK_ASSIGNED, - taskId: task.id, - }, - }); + // Poll for the notification record since fire-and-forget timing is non-deterministic + let record = null; + const maxWaitMs = 5000; + const pollIntervalMs = 100; + const startTime = Date.now(); + + while (!record && Date.now() - startTime < maxWaitMs) { + await new Promise((r) => setTimeout(r, pollIntervalMs)); + record = await db.notificationTrigger.findFirst({ + where: { + targetUserId: assignee.id, + notificationType: NotificationTriggerType.TASK_ASSIGNED, + taskId: task.id, + }, + }); + } expect(record).not.toBeNull(); expect(record!.targetUserId).toBe(assignee.id); @@ -100,7 +107,7 @@ describe("TASK_ASSIGNED notification", () => { expect(record!.sendAfter!.getTime()).toBeGreaterThan(Date.now() + 4 * 60 * 1000); expect(record!.message).toBeTruthy(); expect(sendDirectMessage).not.toHaveBeenCalled(); - }); + }, 10000); it("creates a SKIPPED notification_trigger row when workspace has Sphinx disabled", async () => { const plainOwner = await db.user.create({ diff --git a/src/app/api/workflow-editor/route.ts b/src/app/api/workflow-editor/route.ts index 997793feb9..f2c846df49 100644 --- a/src/app/api/workflow-editor/route.ts +++ b/src/app/api/workflow-editor/route.ts @@ -31,6 +31,7 @@ interface WorkflowEditorRequest { webhook?: string; workflowJson?: string; // Current workflow JSON to store as original for diff comparison workflowVersionId?: string; + sourceWebsocketID?: string; } export async function POST(request: NextRequest) { @@ -64,6 +65,7 @@ export async function POST(request: NextRequest) { webhook, workflowJson, workflowVersionId, + sourceWebsocketID, } = body; // Validate required fields @@ -152,6 +154,18 @@ export async function POST(request: NextRequest) { }, }); + // Broadcast user message to other connected clients (exclude sender to prevent duplicates) + try { + await pusherServer.trigger( + getTaskChannelName(taskId), + PUSHER_EVENTS.NEW_MESSAGE, + chatMessage.id, + sourceWebsocketID ? { socket_id: sourceWebsocketID } : {}, + ); + } catch (error) { + console.error("Error broadcasting user message to Pusher:", error); + } + // Fetch chat history (excluding the message just created) const history = await fetchChatHistory(taskId, chatMessage.id); diff --git a/src/app/w/[slug]/task/[...taskParams]/page.tsx b/src/app/w/[slug]/task/[...taskParams]/page.tsx index 101df05041..c5a4bb3492 100644 --- a/src/app/w/[slug]/task/[...taskParams]/page.tsx +++ b/src/app/w/[slug]/task/[...taskParams]/page.tsx @@ -1081,6 +1081,7 @@ export default function TaskChatPage() { workflowId: currentWorkflowContext.workflowId, workflowName: currentWorkflowContext.workflowName, workflowRefId: currentWorkflowContext.workflowRefId, + sourceWebsocketID: getPusherClient().connection.socket_id, // Include latest workflow version ID if tracked ...(currentWorkflowContext.workflowVersionId && { workflowVersionId: currentWorkflowContext.workflowVersionId }), // Include webhook if available for continuing existing workflow @@ -1660,6 +1661,7 @@ Plan and implement the real feature from this branch.`; workflowId: currentWorkflowContext.workflowId, workflowName: currentWorkflowContext.workflowName, workflowRefId: currentWorkflowContext.workflowRefId, + sourceWebsocketID: getPusherClient().connection.socket_id, ...(currentWorkflowContext.workflowVersionId && { workflowVersionId: currentWorkflowContext.workflowVersionId, }),