Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/api/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { createContext } from './context.js'
import { appRouter } from './index.js'
import { logger as appLogger } from './logger.js'
import * as routes from './routes/index.js'
import { sseRouter } from './sse/index.js'
import { makeApp } from './util/make-app.js'

const staticRoot = resolve('./static')
Expand Down Expand Up @@ -52,6 +53,7 @@ const app = makeApp()
.route('/api/import', routes.importRouter)
.route('/api/file', routes.fileRouter)
.route('/api/connect', routes.oidcRouter)
.route('/api/events', sseRouter)
.use(
serveStatic({
root: staticRoot,
Expand Down
3 changes: 3 additions & 0 deletions apps/api/src/services/anmeldung/anmeldungPublicCreate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import prisma from '../../prisma.js'
import { customFieldValuesCreateMany, defineCustomFieldValues } from '../../types/defineCustomFieldValues.js'
import { definePublicMutateProcedure } from '../../types/defineProcedure.js'
import logActivity from '../../util/activity.js'
import { emitTableUpdate } from '../../sse/index.js'
import { sendMail } from '../../util/mail.js'
import { getPersonCreateData, personSchema } from '../person/schema/person.schema.js'
import { updateMeiliPerson } from '../../meilisearch/person.js'
Expand Down Expand Up @@ -216,5 +217,7 @@ export const anmeldungPublicCreateProcedure = definePublicMutateProcedure({
accessToken,
},
})

emitTableUpdate('anmeldung')
},
})
5 changes: 4 additions & 1 deletion apps/api/src/services/anmeldung/anmeldungVerwaltungPatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Role } from '@prisma/client'
import z from 'zod'

import prisma from '../../prisma.js'
import { emitTableUpdate } from '../../sse/index.js'
import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js'

export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure({
Expand All @@ -15,7 +16,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure(
}),
}),
async handler(options) {
return prisma.anmeldung.update({
const result = await prisma.anmeldung.update({
where: {
id: options.input.id,
},
Expand All @@ -24,5 +25,7 @@ export const anmeldungVerwaltungPatchProcedure = defineProtectedMutateProcedure(
id: true,
},
})
emitTableUpdate('anmeldung')
return result
},
})
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Role } from '@prisma/client'
import z from 'zod'

import prisma from '../../prisma.js'
import { emitTableUpdate } from '../../sse/index.js'
import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js'

export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProcedure({
Expand All @@ -26,11 +27,13 @@ export const veranstaltungVerwaltungCreateProcedure = defineProtectedMutateProce
}),
}),
async handler(options) {
return prisma.veranstaltung.create({
const result = await prisma.veranstaltung.create({
data: options.input.data,
select: {
id: true,
},
})
emitTableUpdate('veranstaltung')
return result
},
})
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Role } from '@prisma/client'
import z from 'zod'

import prisma from '../../prisma.js'
import { emitTableUpdate } from '../../sse/index.js'
import { defineProtectedMutateProcedure } from '../../types/defineProcedure.js'

export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProcedure({
Expand All @@ -27,7 +28,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced
}),
}),
async handler(options) {
return prisma.veranstaltung.update({
const result = await prisma.veranstaltung.update({
where: {
id: options.input.id,
},
Expand All @@ -51,5 +52,7 @@ export const veranstaltungVerwaltungPatchProcedure = defineProtectedMutateProced
id: true,
},
})
emitTableUpdate('veranstaltung')
return result
},
})
103 changes: 103 additions & 0 deletions apps/api/src/sse/SSEManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { logger } from '../logger.js'

export type SSEClient = {
id: string
accountId: string
controller: ReadableStreamDefaultController
subscriptions: Set<string>
}

export type SSEEvent = {
type: 'table-update'
resource: string
data?: Record<string, unknown>
}

class SSEManager {
private clients: Map<string, SSEClient> = new Map()

addClient(clientId: string, accountId: string, controller: ReadableStreamDefaultController) {
const client: SSEClient = {
id: clientId,
accountId,
controller,
subscriptions: new Set(),
}
this.clients.set(clientId, client)
logger.info(`SSE client connected: ${clientId} (account: ${accountId})`)
return client
}

removeClient(clientId: string) {
const client = this.clients.get(clientId)
if (client) {
this.clients.delete(clientId)
logger.info(`SSE client disconnected: ${clientId}`)
}
}

subscribe(clientId: string, resource: string) {
const client = this.clients.get(clientId)
if (client) {
client.subscriptions.add(resource)
logger.debug(`Client ${clientId} subscribed to ${resource}`)
}
}

unsubscribe(clientId: string, resource: string) {
const client = this.clients.get(clientId)
if (client) {
client.subscriptions.delete(resource)
logger.debug(`Client ${clientId} unsubscribed from ${resource}`)
}
}

broadcast(event: SSEEvent) {
const { type, resource, data } = event
let sentCount = 0

for (const client of this.clients.values()) {
if (client.subscriptions.has(resource) || client.subscriptions.has('*')) {
try {
const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n`
client.controller.enqueue(new TextEncoder().encode(message))
sentCount++
} catch (error) {
logger.error(`Failed to send SSE to client ${client.id}:`, error)
this.removeClient(client.id)
}
}
}

if (sentCount > 0) {
logger.debug(`Broadcasted ${type} event for ${resource} to ${sentCount} clients`)
}
}

sendToClient(clientId: string, event: SSEEvent) {
const client = this.clients.get(clientId)
if (!client) {
logger.warn(`Attempted to send to non-existent client: ${clientId}`)
return
}

try {
const { type, resource, data } = event
const message = `event: ${type}\ndata: ${JSON.stringify({ resource, ...data })}\n\n`
client.controller.enqueue(new TextEncoder().encode(message))
} catch (error) {
logger.error(`Failed to send SSE to client ${clientId}:`, error)
this.removeClient(clientId)
}
}

getClientCount(): number {
return this.clients.size
}

getClients(): SSEClient[] {
return Array.from(this.clients.values())
}
}

export const sseManager = new SSEManager()
30 changes: 30 additions & 0 deletions apps/api/src/sse/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { sseManager, type SSEEvent } from './SSEManager.js'

/**
* Emit a table update event to all subscribed SSE clients
* @param resource - The resource type (e.g., 'veranstaltung', 'anmeldung', 'person')
* @param data - Optional additional data to send with the event
*/
export function emitTableUpdate(resource: string, data?: Record<string, unknown>) {
const event: SSEEvent = {
type: 'table-update',
resource,
data,
}
sseManager.broadcast(event)
}

/**
* Helper to wrap mutations and automatically emit SSE events
* @param resource - The resource type
* @param mutationFn - The mutation function to execute
* @returns The result of the mutation
*/
export async function withSSENotification<T>(
resource: string,
mutationFn: () => Promise<T>
): Promise<T> {
const result = await mutationFn()
emitTableUpdate(resource)
return result
}
4 changes: 4 additions & 0 deletions apps/api/src/sse/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export { sseRouter } from './routes.js'
export { sseManager } from './SSEManager.js'
export { emitTableUpdate, withSSENotification } from './helpers.js'
export type { SSEClient, SSEEvent } from './SSEManager.js'
110 changes: 110 additions & 0 deletions apps/api/src/sse/routes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { makeApp } from '../util/make-app.js'
import { getEntityIdFromHeader } from '../authentication.js'
import { sseManager } from './SSEManager.js'
import { logger } from '../logger.js'
import { randomUUID } from 'crypto'

export const sseRouter = makeApp()

sseRouter.get('/', (c) => {
// Authenticate the connection - support both header and query parameter
let authorization = c.req.header('authorization')
const tokenParam = c.req.query('token')

if (tokenParam) {
authorization = `Bearer ${tokenParam}`
}

if (!authorization) {
return c.json({ error: 'Unauthorized' }, 401)
}

let accountId: string | undefined
try {
accountId = getEntityIdFromHeader(authorization)
} catch (err) {
logger.error('SSE authentication failed:', err)
return c.json({ error: 'Invalid token' }, 401)
}

if (!accountId) {
return c.json({ error: 'Unauthorized' }, 401)
}

// Generate unique client ID
const clientId = randomUUID()

// Create SSE stream
const stream = new ReadableStream({
start(controller) {
// Add client to manager
sseManager.addClient(clientId, accountId, controller)

// Send initial connection message
const message = `event: connected\ndata: ${JSON.stringify({ clientId })}\n\n`
controller.enqueue(new TextEncoder().encode(message))

// Send periodic heartbeat to keep connection alive
const heartbeatInterval = setInterval(() => {
try {
const heartbeat = `: heartbeat\n\n`
controller.enqueue(new TextEncoder().encode(heartbeat))
} catch {
clearInterval(heartbeatInterval)
sseManager.removeClient(clientId)
}
}, 30000) // Every 30 seconds

// Handle client disconnect
c.req.raw.signal.addEventListener('abort', () => {
clearInterval(heartbeatInterval)
sseManager.removeClient(clientId)
controller.close()
})
},
})

return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
})
})

// Subscribe endpoint
sseRouter.post('/subscribe', async (c) => {
const authorization = c.req.header('authorization')
if (!authorization) {
return c.json({ error: 'Unauthorized' }, 401)
}

const body = await c.req.json<{ clientId: string; resource: string }>()
const { clientId, resource } = body

if (!clientId || !resource) {
return c.json({ error: 'Missing clientId or resource' }, 400)
}

sseManager.subscribe(clientId, resource)
return c.json({ success: true })
})

// Unsubscribe endpoint
sseRouter.post('/unsubscribe', async (c) => {
const authorization = c.req.header('authorization')
if (!authorization) {
return c.json({ error: 'Unauthorized' }, 401)
}

const body = await c.req.json<{ clientId: string; resource: string }>()
const { clientId, resource } = body

if (!clientId || !resource) {
return c.json({ error: 'Missing clientId or resource' }, 400)
}

sseManager.unsubscribe(clientId, resource)
return c.json({ success: true })
})
7 changes: 7 additions & 0 deletions apps/frontend/src/components/data/AnmeldungenTable.vue
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import initialData from '@/components/Table/initialData'
import Tab from '@/components/UIComponents/components/Tab.vue'
import Tabs from '@/components/UIComponents/Tabs.vue'
import { loggedInAccount } from '@/composables/useAuthentication'
import { useSSE } from '@/composables/useSSE'
import { getAnmeldungStatusColor } from '@/helpers/getAnmeldungStatusColors'
import {
AnmeldungStatusMapping,
Expand Down Expand Up @@ -270,6 +271,12 @@ const query: Query<Anmeldung> = (pagination, filter) =>
initialData,
placeholderData: keepPreviousData,
})

// Set up SSE to auto-refresh table when data changes
useSSE('anmeldung', () => {
console.log('Anmeldung table updated, refreshing...')
dataTable.value?.query.refetch()
})
</script>

<template>
Expand Down
Loading