From acb1205653cbfa689f4ab91c82978376e91ed389 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Feb 2026 21:34:56 +0000 Subject: [PATCH 1/6] Initial plan From 62f0d167610d5599459940103a85a1108351cd79 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Feb 2026 21:39:50 +0000 Subject: [PATCH 2/6] Add SSE infrastructure and integrate with backend mutations Co-authored-by: danielswiatek <9080990+danielswiatek@users.noreply.github.com> --- apps/api/src/server.ts | 2 + .../anmeldung/anmeldungPublicCreate.ts | 3 + .../anmeldung/anmeldungVerwaltungPatch.ts | 5 +- .../veranstaltungVerwaltungCreate.ts | 5 +- .../veranstaltungVerwaltungPatch.ts | 5 +- apps/api/src/sse/SSEManager.ts | 103 +++++++++++++++++ apps/api/src/sse/helpers.ts | 30 +++++ apps/api/src/sse/index.ts | 4 + apps/api/src/sse/routes.ts | 104 ++++++++++++++++++ 9 files changed, 258 insertions(+), 3 deletions(-) create mode 100644 apps/api/src/sse/SSEManager.ts create mode 100644 apps/api/src/sse/helpers.ts create mode 100644 apps/api/src/sse/index.ts create mode 100644 apps/api/src/sse/routes.ts diff --git a/apps/api/src/server.ts b/apps/api/src/server.ts index d7319e59..60e4abef 100644 --- a/apps/api/src/server.ts +++ b/apps/api/src/server.ts @@ -11,6 +11,7 @@ import { createContext } from './context.js' import { appRouter } from './index.js' import { logger as appLogger } from './logger.js' import * as routes from './routes/index.js' +import { sseRouter } from './sse/index.js' import { makeApp } from './util/make-app.js' const staticRoot = resolve('./static') @@ -52,6 +53,7 @@ const app = makeApp() .route('/api/import', routes.importRouter) .route('/api/file', routes.fileRouter) .route('/api/connect', routes.oidcRouter) + .route('/api/events', sseRouter) .use( serveStatic({ root: staticRoot, diff --git a/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts b/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts index 857e69a7..d4a847f5 100644 --- a/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts +++ b/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts @@ -7,6 +7,7 @@ import prisma from '../../prisma.js' import { customFieldValuesCreateMany, defineCustomFieldValues } from '../../types/defineCustomFieldValues.js' import { definePublicMutateProcedure } from '../../types/defineProcedure.js' import logActivity from '../../util/activity.js' +import { emitTableUpdate } from '../../sse/index.js' import { sendMail } from '../../util/mail.js' import { getPersonCreateData, personSchema } from '../person/schema/person.schema.js' import { updateMeiliPerson } from '../../meilisearch/person.js' @@ -216,5 +217,7 @@ export const anmeldungPublicCreateProcedure = definePublicMutateProcedure({ accessToken, }, }) + + emitTableUpdate('anmeldung') }, }) diff --git a/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts b/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts index c69a1acb..86a61551 100644 --- a/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts +++ b/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure({ @@ -15,7 +16,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure( }), }), async handler(options) { - return prisma.anmeldung.update({ + const result = await prisma.anmeldung.update({ where: { id: options.input.id, }, @@ -24,5 +25,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure( id: true, }, }) + emitTableUpdate('anmeldung') + return result }, }) diff --git a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts index 6d116ff5..dd12815a 100644 --- a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts +++ b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProcedure({ @@ -26,11 +27,13 @@ export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProce }), }), async handler(options) { - return prisma.veranstaltung.create({ + const result = await prisma.veranstaltung.create({ data: options.input.data, select: { id: true, }, }) + emitTableUpdate('veranstaltung') + return result }, }) diff --git a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts index e2e8111d..033c1523 100644 --- a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts +++ b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProcedure({ @@ -27,7 +28,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced }), }), async handler(options) { - return prisma.veranstaltung.update({ + const result = await prisma.veranstaltung.update({ where: { id: options.input.id, }, @@ -51,5 +52,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced id: true, }, }) + emitTableUpdate('veranstaltung') + return result }, }) diff --git a/apps/api/src/sse/SSEManager.ts b/apps/api/src/sse/SSEManager.ts new file mode 100644 index 00000000..a5f17fdc --- /dev/null +++ b/apps/api/src/sse/SSEManager.ts @@ -0,0 +1,103 @@ +import { logger } from '../logger.js' + +export type SSEClient = { + id: string + accountId: string + controller: ReadableStreamDefaultController + subscriptions: Set +} + +export type SSEEvent = { + type: 'table-update' + resource: string + data?: Record +} + +class SSEManager { + private clients: Map = new Map() + + addClient(clientId: string, accountId: string, controller: ReadableStreamDefaultController) { + const client: SSEClient = { + id: clientId, + accountId, + controller, + subscriptions: new Set(), + } + this.clients.set(clientId, client) + logger.info(`SSE client connected: ${clientId} (account: ${accountId})`) + return client + } + + removeClient(clientId: string) { + const client = this.clients.get(clientId) + if (client) { + this.clients.delete(clientId) + logger.info(`SSE client disconnected: ${clientId}`) + } + } + + subscribe(clientId: string, resource: string) { + const client = this.clients.get(clientId) + if (client) { + client.subscriptions.add(resource) + logger.debug(`Client ${clientId} subscribed to ${resource}`) + } + } + + unsubscribe(clientId: string, resource: string) { + const client = this.clients.get(clientId) + if (client) { + client.subscriptions.delete(resource) + logger.debug(`Client ${clientId} unsubscribed from ${resource}`) + } + } + + broadcast(event: SSEEvent) { + const { type, resource, data } = event + let sentCount = 0 + + for (const client of this.clients.values()) { + if (client.subscriptions.has(resource) || client.subscriptions.has('*')) { + try { + const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n` + client.controller.enqueue(new TextEncoder().encode(message)) + sentCount++ + } catch (error) { + logger.error(`Failed to send SSE to client ${client.id}:`, error) + this.removeClient(client.id) + } + } + } + + if (sentCount > 0) { + logger.debug(`Broadcasted ${type} event for ${resource} to ${sentCount} clients`) + } + } + + sendToClient(clientId: string, event: SSEEvent) { + const client = this.clients.get(clientId) + if (!client) { + logger.warn(`Attempted to send to non-existent client: ${clientId}`) + return + } + + try { + const { type, resource, data } = event + const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n` + client.controller.enqueue(new TextEncoder().encode(message)) + } catch (error) { + logger.error(`Failed to send SSE to client ${clientId}:`, error) + this.removeClient(clientId) + } + } + + getClientCount(): number { + return this.clients.size + } + + getClients(): SSEClient[] { + return Array.from(this.clients.values()) + } +} + +export const sseManager = new SSEManager() diff --git a/apps/api/src/sse/helpers.ts b/apps/api/src/sse/helpers.ts new file mode 100644 index 00000000..c9c52272 --- /dev/null +++ b/apps/api/src/sse/helpers.ts @@ -0,0 +1,30 @@ +import { sseManager, type SSEEvent } from './SSEManager.js' + +/** + * Emit a table update event to all subscribed SSE clients + * @param resource - The resource type (e.g., 'veranstaltung', 'anmeldung', 'person') + * @param data - Optional additional data to send with the event + */ +export function emitTableUpdate(resource: string, data?: Record) { + const event: SSEEvent = { + type: 'table-update', + resource, + data, + } + sseManager.broadcast(event) +} + +/** + * Helper to wrap mutations and automatically emit SSE events + * @param resource - The resource type + * @param mutationFn - The mutation function to execute + * @returns The result of the mutation + */ +export async function withSSENotification( + resource: string, + mutationFn: () => Promise +): Promise { + const result = await mutationFn() + emitTableUpdate(resource) + return result +} diff --git a/apps/api/src/sse/index.ts b/apps/api/src/sse/index.ts new file mode 100644 index 00000000..9502f85d --- /dev/null +++ b/apps/api/src/sse/index.ts @@ -0,0 +1,4 @@ +export { sseRouter } from './routes.js' +export { sseManager } from './SSEManager.js' +export { emitTableUpdate, withSSENotification } from './helpers.js' +export type { SSEClient, SSEEvent } from './SSEManager.js' diff --git a/apps/api/src/sse/routes.ts b/apps/api/src/sse/routes.ts new file mode 100644 index 00000000..1ceeba83 --- /dev/null +++ b/apps/api/src/sse/routes.ts @@ -0,0 +1,104 @@ +import { makeApp } from '../util/make-app.js' +import { getEntityIdFromHeader } from '../authentication.js' +import { sseManager } from './SSEManager.js' +import { logger } from '../logger.js' +import { randomUUID } from 'crypto' + +export const sseRouter = makeApp() + +sseRouter.get('/', async (c) => { + // Authenticate the connection + const authorization = c.req.header('authorization') + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + let accountId: string | undefined + try { + accountId = getEntityIdFromHeader(authorization) + } catch (error) { + logger.error('SSE authentication failed:', error) + return c.json({ error: 'Invalid token' }, 401) + } + + if (!accountId) { + return c.json({ error: 'Unauthorized' }, 401) + } + + // Generate unique client ID + const clientId = randomUUID() + + // Create SSE stream + const stream = new ReadableStream({ + start(controller) { + // Add client to manager + sseManager.addClient(clientId, accountId, controller) + + // Send initial connection message + const message = `event: connected\ndata: ${JSON.stringify({ clientId })}\n\n` + controller.enqueue(new TextEncoder().encode(message)) + + // Send periodic heartbeat to keep connection alive + const heartbeatInterval = setInterval(() => { + try { + const heartbeat = `: heartbeat\n\n` + controller.enqueue(new TextEncoder().encode(heartbeat)) + } catch (error) { + clearInterval(heartbeatInterval) + sseManager.removeClient(clientId) + } + }, 30000) // Every 30 seconds + + // Handle client disconnect + c.req.raw.signal.addEventListener('abort', () => { + clearInterval(heartbeatInterval) + sseManager.removeClient(clientId) + controller.close() + }) + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }) +}) + +// Subscribe endpoint +sseRouter.post('/subscribe', async (c) => { + const authorization = c.req.header('authorization') + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + const body = await c.req.json<{ clientId: string; resource: string }>() + const { clientId, resource } = body + + if (!clientId || !resource) { + return c.json({ error: 'Missing clientId or resource' }, 400) + } + + sseManager.subscribe(clientId, resource) + return c.json({ success: true }) +}) + +// Unsubscribe endpoint +sseRouter.post('/unsubscribe', async (c) => { + const authorization = c.req.header('authorization') + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + const body = await c.req.json<{ clientId: string; resource: string }>() + const { clientId, resource } = body + + if (!clientId || !resource) { + return c.json({ error: 'Missing clientId or resource' }, 400) + } + + sseManager.unsubscribe(clientId, resource) + return c.json({ success: true }) +}) From 3595f7bf4faf4e5ca4d24745e5caf234e8f8ba03 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Feb 2026 21:41:37 +0000 Subject: [PATCH 3/6] Add SSE frontend integration with table components Co-authored-by: danielswiatek <9080990+danielswiatek@users.noreply.github.com> --- apps/api/src/sse/routes.ts | 10 +- .../src/components/data/AnmeldungenTable.vue | 7 + apps/frontend/src/composables/useSSE.ts | 245 ++++++++++++++++++ .../Veranstaltungen/VeranstaltungList.vue | 12 +- 4 files changed, 271 insertions(+), 3 deletions(-) create mode 100644 apps/frontend/src/composables/useSSE.ts diff --git a/apps/api/src/sse/routes.ts b/apps/api/src/sse/routes.ts index 1ceeba83..101ac491 100644 --- a/apps/api/src/sse/routes.ts +++ b/apps/api/src/sse/routes.ts @@ -7,8 +7,14 @@ import { randomUUID } from 'crypto' export const sseRouter = makeApp() sseRouter.get('/', async (c) => { - // Authenticate the connection - const authorization = c.req.header('authorization') + // Authenticate the connection - support both header and query parameter + let authorization = c.req.header('authorization') + const tokenParam = c.req.query('token') + + if (tokenParam) { + authorization = `Bearer ${tokenParam}` + } + if (!authorization) { return c.json({ error: 'Unauthorized' }, 401) } diff --git a/apps/frontend/src/components/data/AnmeldungenTable.vue b/apps/frontend/src/components/data/AnmeldungenTable.vue index e73332e3..441fcf10 100644 --- a/apps/frontend/src/components/data/AnmeldungenTable.vue +++ b/apps/frontend/src/components/data/AnmeldungenTable.vue @@ -13,6 +13,7 @@ import initialData from '@/components/Table/initialData' import Tab from '@/components/UIComponents/components/Tab.vue' import Tabs from '@/components/UIComponents/Tabs.vue' import { loggedInAccount } from '@/composables/useAuthentication' +import { useSSE } from '@/composables/useSSE' import { getAnmeldungStatusColor } from '@/helpers/getAnmeldungStatusColors' import { AnmeldungStatusMapping, @@ -270,6 +271,12 @@ const query: Query = (pagination, filter) => initialData, placeholderData: keepPreviousData, }) + +// Set up SSE to auto-refresh table when data changes +useSSE('anmeldung', () => { + console.log('Anmeldung table updated, refreshing...') + dataTable.value?.query.refetch() +})