diff --git a/apps/api/src/server.ts b/apps/api/src/server.ts index d7319e59..60e4abef 100644 --- a/apps/api/src/server.ts +++ b/apps/api/src/server.ts @@ -11,6 +11,7 @@ import { createContext } from './context.js' import { appRouter } from './index.js' import { logger as appLogger } from './logger.js' import * as routes from './routes/index.js' +import { sseRouter } from './sse/index.js' import { makeApp } from './util/make-app.js' const staticRoot = resolve('./static') @@ -52,6 +53,7 @@ const app = makeApp() .route('/api/import', routes.importRouter) .route('/api/file', routes.fileRouter) .route('/api/connect', routes.oidcRouter) + .route('/api/events', sseRouter) .use( serveStatic({ root: staticRoot, diff --git a/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts b/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts index 857e69a7..d4a847f5 100644 --- a/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts +++ b/apps/api/src/services/anmeldung/anmeldungPublicCreate.ts @@ -7,6 +7,7 @@ import prisma from '../../prisma.js' import { customFieldValuesCreateMany, defineCustomFieldValues } from '../../types/defineCustomFieldValues.js' import { definePublicMutateProcedure } from '../../types/defineProcedure.js' import logActivity from '../../util/activity.js' +import { emitTableUpdate } from '../../sse/index.js' import { sendMail } from '../../util/mail.js' import { getPersonCreateData, personSchema } from '../person/schema/person.schema.js' import { updateMeiliPerson } from '../../meilisearch/person.js' @@ -216,5 +217,7 @@ export const anmeldungPublicCreateProcedure = definePublicMutateProcedure({ accessToken, }, }) + + emitTableUpdate('anmeldung') }, }) diff --git a/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts b/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts index c69a1acb..86a61551 100644 --- a/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts +++ b/apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure({ @@ -15,7 +16,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure( }), }), async handler(options) { - return prisma.anmeldung.update({ + const result = await prisma.anmeldung.update({ where: { id: options.input.id, }, @@ -24,5 +25,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure( id: true, }, }) + emitTableUpdate('anmeldung') + return result }, }) diff --git a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts index 6d116ff5..dd12815a 100644 --- a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts +++ b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungCreate.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProcedure({ @@ -26,11 +27,13 @@ export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProce }), }), async handler(options) { - return prisma.veranstaltung.create({ + const result = await prisma.veranstaltung.create({ data: options.input.data, select: { id: true, }, }) + emitTableUpdate('veranstaltung') + return result }, }) diff --git a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts index e2e8111d..033c1523 100644 --- a/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts +++ b/apps/api/src/services/veranstaltung/veranstaltungVerwaltungPatch.ts @@ -2,6 +2,7 @@ import { Role } from '@prisma/client' import z from 'zod' import prisma from '../../prisma.js' +import { emitTableUpdate } from '../../sse/index.js' import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js' export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProcedure({ @@ -27,7 +28,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced }), }), async handler(options) { - return prisma.veranstaltung.update({ + const result = await prisma.veranstaltung.update({ where: { id: options.input.id, }, @@ -51,5 +52,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced id: true, }, }) + emitTableUpdate('veranstaltung') + return result }, }) diff --git a/apps/api/src/sse/SSEManager.ts b/apps/api/src/sse/SSEManager.ts new file mode 100644 index 00000000..a5f17fdc --- /dev/null +++ b/apps/api/src/sse/SSEManager.ts @@ -0,0 +1,103 @@ +import { logger } from '../logger.js' + +export type SSEClient = { + id: string + accountId: string + controller: ReadableStreamDefaultController + subscriptions: Set +} + +export type SSEEvent = { + type: 'table-update' + resource: string + data?: Record +} + +class SSEManager { + private clients: Map = new Map() + + addClient(clientId: string, accountId: string, controller: ReadableStreamDefaultController) { + const client: SSEClient = { + id: clientId, + accountId, + controller, + subscriptions: new Set(), + } + this.clients.set(clientId, client) + logger.info(`SSE client connected: ${clientId} (account: ${accountId})`) + return client + } + + removeClient(clientId: string) { + const client = this.clients.get(clientId) + if (client) { + this.clients.delete(clientId) + logger.info(`SSE client disconnected: ${clientId}`) + } + } + + subscribe(clientId: string, resource: string) { + const client = this.clients.get(clientId) + if (client) { + client.subscriptions.add(resource) + logger.debug(`Client ${clientId} subscribed to ${resource}`) + } + } + + unsubscribe(clientId: string, resource: string) { + const client = this.clients.get(clientId) + if (client) { + client.subscriptions.delete(resource) + logger.debug(`Client ${clientId} unsubscribed from ${resource}`) + } + } + + broadcast(event: SSEEvent) { + const { type, resource, data } = event + let sentCount = 0 + + for (const client of this.clients.values()) { + if (client.subscriptions.has(resource) || client.subscriptions.has('*')) { + try { + const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n` + client.controller.enqueue(new TextEncoder().encode(message)) + sentCount++ + } catch (error) { + logger.error(`Failed to send SSE to client ${client.id}:`, error) + this.removeClient(client.id) + } + } + } + + if (sentCount > 0) { + logger.debug(`Broadcasted ${type} event for ${resource} to ${sentCount} clients`) + } + } + + sendToClient(clientId: string, event: SSEEvent) { + const client = this.clients.get(clientId) + if (!client) { + logger.warn(`Attempted to send to non-existent client: ${clientId}`) + return + } + + try { + const { type, resource, data } = event + const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n` + client.controller.enqueue(new TextEncoder().encode(message)) + } catch (error) { + logger.error(`Failed to send SSE to client ${clientId}:`, error) + this.removeClient(clientId) + } + } + + getClientCount(): number { + return this.clients.size + } + + getClients(): SSEClient[] { + return Array.from(this.clients.values()) + } +} + +export const sseManager = new SSEManager() diff --git a/apps/api/src/sse/helpers.ts b/apps/api/src/sse/helpers.ts new file mode 100644 index 00000000..c9c52272 --- /dev/null +++ b/apps/api/src/sse/helpers.ts @@ -0,0 +1,30 @@ +import { sseManager, type SSEEvent } from './SSEManager.js' + +/** + * Emit a table update event to all subscribed SSE clients + * @param resource - The resource type (e.g., 'veranstaltung', 'anmeldung', 'person') + * @param data - Optional additional data to send with the event + */ +export function emitTableUpdate(resource: string, data?: Record) { + const event: SSEEvent = { + type: 'table-update', + resource, + data, + } + sseManager.broadcast(event) +} + +/** + * Helper to wrap mutations and automatically emit SSE events + * @param resource - The resource type + * @param mutationFn - The mutation function to execute + * @returns The result of the mutation + */ +export async function withSSENotification( + resource: string, + mutationFn: () => Promise +): Promise { + const result = await mutationFn() + emitTableUpdate(resource) + return result +} diff --git a/apps/api/src/sse/index.ts b/apps/api/src/sse/index.ts new file mode 100644 index 00000000..9502f85d --- /dev/null +++ b/apps/api/src/sse/index.ts @@ -0,0 +1,4 @@ +export { sseRouter } from './routes.js' +export { sseManager } from './SSEManager.js' +export { emitTableUpdate, withSSENotification } from './helpers.js' +export type { SSEClient, SSEEvent } from './SSEManager.js' diff --git a/apps/api/src/sse/routes.ts b/apps/api/src/sse/routes.ts new file mode 100644 index 00000000..79898b59 --- /dev/null +++ b/apps/api/src/sse/routes.ts @@ -0,0 +1,110 @@ +import { makeApp } from '../util/make-app.js' +import { getEntityIdFromHeader } from '../authentication.js' +import { sseManager } from './SSEManager.js' +import { logger } from '../logger.js' +import { randomUUID } from 'crypto' + +export const sseRouter = makeApp() + +sseRouter.get('/', (c) => { + // Authenticate the connection - support both header and query parameter + let authorization = c.req.header('authorization') + const tokenParam = c.req.query('token') + + if (tokenParam) { + authorization = `Bearer ${tokenParam}` + } + + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + let accountId: string | undefined + try { + accountId = getEntityIdFromHeader(authorization) + } catch (err) { + logger.error('SSE authentication failed:', err) + return c.json({ error: 'Invalid token' }, 401) + } + + if (!accountId) { + return c.json({ error: 'Unauthorized' }, 401) + } + + // Generate unique client ID + const clientId = randomUUID() + + // Create SSE stream + const stream = new ReadableStream({ + start(controller) { + // Add client to manager + sseManager.addClient(clientId, accountId, controller) + + // Send initial connection message + const message = `event: connected\ndata: ${JSON.stringify({ clientId })}\n\n` + controller.enqueue(new TextEncoder().encode(message)) + + // Send periodic heartbeat to keep connection alive + const heartbeatInterval = setInterval(() => { + try { + const heartbeat = `: heartbeat\n\n` + controller.enqueue(new TextEncoder().encode(heartbeat)) + } catch { + clearInterval(heartbeatInterval) + sseManager.removeClient(clientId) + } + }, 30000) // Every 30 seconds + + // Handle client disconnect + c.req.raw.signal.addEventListener('abort', () => { + clearInterval(heartbeatInterval) + sseManager.removeClient(clientId) + controller.close() + }) + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }) +}) + +// Subscribe endpoint +sseRouter.post('/subscribe', async (c) => { + const authorization = c.req.header('authorization') + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + const body = await c.req.json<{ clientId: string; resource: string }>() + const { clientId, resource } = body + + if (!clientId || !resource) { + return c.json({ error: 'Missing clientId or resource' }, 400) + } + + sseManager.subscribe(clientId, resource) + return c.json({ success: true }) +}) + +// Unsubscribe endpoint +sseRouter.post('/unsubscribe', async (c) => { + const authorization = c.req.header('authorization') + if (!authorization) { + return c.json({ error: 'Unauthorized' }, 401) + } + + const body = await c.req.json<{ clientId: string; resource: string }>() + const { clientId, resource } = body + + if (!clientId || !resource) { + return c.json({ error: 'Missing clientId or resource' }, 400) + } + + sseManager.unsubscribe(clientId, resource) + return c.json({ success: true }) +}) diff --git a/apps/frontend/src/components/data/AnmeldungenTable.vue b/apps/frontend/src/components/data/AnmeldungenTable.vue index e73332e3..441fcf10 100644 --- a/apps/frontend/src/components/data/AnmeldungenTable.vue +++ b/apps/frontend/src/components/data/AnmeldungenTable.vue @@ -13,6 +13,7 @@ import initialData from '@/components/Table/initialData' import Tab from '@/components/UIComponents/components/Tab.vue' import Tabs from '@/components/UIComponents/Tabs.vue' import { loggedInAccount } from '@/composables/useAuthentication' +import { useSSE } from '@/composables/useSSE' import { getAnmeldungStatusColor } from '@/helpers/getAnmeldungStatusColors' import { AnmeldungStatusMapping, @@ -270,6 +271,12 @@ const query: Query = (pagination, filter) => initialData, placeholderData: keepPreviousData, }) + +// Set up SSE to auto-refresh table when data changes +useSSE('anmeldung', () => { + console.log('Anmeldung table updated, refreshing...') + dataTable.value?.query.refetch() +})