Skip to content

Commit 03c6290

Browse files
committed
fix(webapp): harden the realtime session routes
Scope session stream waitpoint delivery to the environment so two environments using the same session externalId can never complete each other's waitpoints. Add the missing authorization checks to the session snapshot-url routes and restrict out-channel appends to secret key auth, so a session-scoped token cannot read other sessions' snapshots or forge assistant output. Appends that carry an X-Part-Id header are now deduplicated on retry, session creation rejects expired sessions, externalId is immutable after creation, and the sessions list endpoint returns friendly run ids.
1 parent 459dce2 commit 03c6290

9 files changed

Lines changed: 274 additions & 38 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
106106
});
107107

108108
// Step 2: Register the waitpoint on the session channel so the next
109-
// append fires it. Keyed by (addressingKey, io) — the canonical
110-
// string for the row. The append handler drains by the same
111-
// canonical key, so writers and readers converge regardless of
112-
// which URL form the agent vs. the appending caller used.
109+
// append fires it. Keyed by (environmentId, addressingKey, io) — the
110+
// canonical string for the row, scoped to the environment because
111+
// externalIds are only unique per environment. The append handler
112+
// drains by the same key, so writers and readers converge regardless
113+
// of which URL form the agent vs. the appending caller used.
113114
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
114115
await addSessionStreamWaitpoint(
116+
authentication.environment.id,
115117
addressingKey,
116118
body.io,
117119
result.waitpoint.id,
@@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
152154
});
153155

154156
await removeSessionStreamWaitpoint(
157+
authentication.environment.id,
155158
addressingKey,
156159
body.io,
157160
result.waitpoint.id

apps/webapp/app/routes/api.v1.sessions.$session.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ const { action } = createActionApiRoute(
7474
return json({ error: "Session not found" }, { status: 404 });
7575
}
7676

77+
// The externalId is the canonical addressing key once set: the S2
78+
// stream names, the waitpoint cache key, and the minted session PAT
79+
// scope all derive from it. Re-keying a session would orphan its
80+
// streams (the chat goes silent) and invalidate the PAT's scope, so
81+
// reject any change. Same-value PATCHes stay idempotent.
82+
if (
83+
body.externalId !== undefined &&
84+
body.externalId !== existing.externalId
85+
) {
86+
return json(
87+
{
88+
error:
89+
"externalId cannot be changed after creation; close this session and create a new one with the desired externalId",
90+
},
91+
{ status: 422 }
92+
);
93+
}
94+
7795
try {
7896
const updated = await prisma.session.update({
7997
where: { id: existing.id },

apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { $replica } from "~/db.server";
44
import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server";
55
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
66
import {
7+
anyResource,
78
createActionApiRoute,
89
createLoaderApiRoute,
910
} from "~/services/routeBuilders/apiBuilder.server";
@@ -21,8 +22,31 @@ const routeConfig = {
2122
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
2223
};
2324

25+
// Authorize against the union of the URL form, friendlyId, and externalId —
26+
// same shape as the sibling session routes. Without an authorization block
27+
// the route builder skips scope checks entirely, so any session-scoped JWT
28+
// in the environment could presign URLs for any other session's snapshot.
29+
function sessionResource(
30+
paramId: string,
31+
session: { friendlyId: string; externalId: string | null } | null | undefined
32+
) {
33+
const ids = new Set<string>([paramId]);
34+
if (session) {
35+
ids.add(session.friendlyId);
36+
if (session.externalId) ids.add(session.externalId);
37+
}
38+
return anyResource([...ids].map((id) => ({ type: "sessions" as const, id })));
39+
}
40+
2441
export const { action } = createActionApiRoute(
25-
{ ...routeConfig, method: "PUT" },
42+
{
43+
...routeConfig,
44+
method: "PUT",
45+
authorization: {
46+
action: "write",
47+
resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session),
48+
},
49+
},
2650
async ({ authentication, resource: session }) => {
2751
if (!session) {
2852
return json({ error: "Session not found" }, { status: 404 });
@@ -42,7 +66,15 @@ export const { action } = createActionApiRoute(
4266
}
4367
);
4468

45-
export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => {
69+
export const loader = createLoaderApiRoute(
70+
{
71+
...routeConfig,
72+
authorization: {
73+
action: "read",
74+
resource: (session, params) => sessionResource(params.sessionId, session),
75+
},
76+
},
77+
async ({ authentication, resource: session }) => {
4678
if (!session) {
4779
return json({ error: "Session not found" }, { status: 404 });
4880
}

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import {
1818
type SessionTriggerConfig,
1919
} from "~/services/realtime/sessionRunManager.server";
2020
import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server";
21-
import { serializeSession } from "~/services/realtime/sessions.server";
21+
import {
22+
serializeSession,
23+
serializeSessionsWithFriendlyRunIds,
24+
} from "~/services/realtime/sessions.server";
2225
import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server";
2326
import {
2427
anyResource,
@@ -91,17 +94,25 @@ export const loader = createLoaderApiRoute(
9194
},
9295
});
9396

97+
// Batched friendlyId translation: `currentRunId` on the wire is the
98+
// public `run_*` form, matching the single-session routes. One `IN`
99+
// lookup per page.
100+
const data = await serializeSessionsWithFriendlyRunIds(
101+
rows.map(
102+
(row) =>
103+
({
104+
...row,
105+
// Columns the list query doesn't select — filled so the
106+
// serializer can operate on a narrowed payload without type errors.
107+
projectId: authentication.environment.projectId,
108+
environmentType: authentication.environment.type,
109+
organizationId: authentication.environment.organizationId,
110+
}) as Session
111+
)
112+
);
113+
94114
return json<ListSessionsResponseBody>({
95-
data: rows.map((row) =>
96-
serializeSession({
97-
...row,
98-
// Columns the list query doesn't select — filled so `serializeSession`
99-
// can operate on a narrowed payload without type errors.
100-
projectId: authentication.environment.projectId,
101-
environmentType: authentication.environment.type,
102-
organizationId: authentication.environment.organizationId,
103-
} as Session)
104-
),
115+
data,
105116
pagination: {
106117
...(pagination.nextCursor ? { next: pagination.nextCursor } : {}),
107118
...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}),
@@ -225,6 +236,17 @@ const { action } = createActionApiRoute(
225236
);
226237
}
227238

239+
// Same guard as the append / end-and-continue handlers: an expired
240+
// row must not spawn a run, because every subsequent `.in/append`
241+
// would 400 on the expiry check — a run boots but the chat can
242+
// never receive input.
243+
if (session.expiresAt && session.expiresAt.getTime() < Date.now()) {
244+
return json(
245+
{ error: "Session is expired; use a different externalId to create a new session" },
246+
{ status: 409 }
247+
);
248+
}
249+
228250
// Session is task-bound — every session has a live run by
229251
// construction. `ensureRunForSession` is idempotent: on the
230252
// cached path it sees `currentRunId` is alive and returns it

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import {
1111
resolveSessionByIdOrExternalId,
1212
} from "~/services/realtime/sessions.server";
1313
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
14-
import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server";
14+
import {
15+
drainSessionStreamWaitpoints,
16+
markSessionStreamPartAppended,
17+
wasSessionStreamPartAppended,
18+
} from "~/services/sessionStreamWaitpointCache.server";
1519
import {
1620
anyResource,
1721
createActionApiRoute,
@@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute(
9195
);
9296
}
9397

98+
// `.out` is the agent→client channel. Only PRIVATE (secret key) auth —
99+
// i.e. the agent run itself — may write to it. Session-scoped JWTs carry
100+
// `write:sessions:<key>` for `.in`; without this gate they could forge
101+
// assistant chunks and complete `.out` waitpoints on their own session.
102+
if (params.io === "out" && authentication.type !== "PRIVATE") {
103+
return json(
104+
{ ok: false, error: "Appending to the out channel requires secret key authentication" },
105+
{ status: 403 }
106+
);
107+
}
108+
94109
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
95110
session,
96111
});
@@ -132,7 +147,26 @@ const { action, loader } = createActionApiRoute(
132147
const addressingKey = canonicalSessionAddressingKey(session, params.session);
133148

134149
const part = await request.text();
135-
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
150+
const clientPartId = request.headers.get("X-Part-Id");
151+
const partId = clientPartId ?? nanoid(7);
152+
153+
// Idempotency on client-supplied part ids: a retried POST whose first
154+
// attempt committed is acknowledged without a second append (which
155+
// would duplicate the record and double-fire the waitpoint drain).
156+
// The marker is only written after a successful append, so retries of
157+
// genuinely failed appends still go through. Server-generated ids are
158+
// per-request and carry no dedupe meaning.
159+
if (
160+
clientPartId &&
161+
(await wasSessionStreamPartAppended(
162+
authentication.environment.id,
163+
addressingKey,
164+
params.io,
165+
clientPartId
166+
))
167+
) {
168+
return json({ ok: true }, { status: 200 });
169+
}
136170

137171
const [appendError] = await tryCatch(
138172
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
@@ -153,14 +187,23 @@ const { action, loader } = createActionApiRoute(
153187
return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 });
154188
}
155189

190+
if (clientPartId) {
191+
await markSessionStreamPartAppended(
192+
authentication.environment.id,
193+
addressingKey,
194+
params.io,
195+
clientPartId
196+
);
197+
}
198+
156199
// Fire any run-scoped waitpoints registered against this channel. Best
157200
// effort — a failure here must not fail the append (the record is
158201
// durable in S2; the SSE tail will still deliver it). Waitpoints are
159202
// keyed on the canonical addressing key the agent registered with via
160203
// `sessions.open(...).in.wait()`, so writers and readers converge
161204
// regardless of which URL form they used.
162205
const [drainError, waitpointIds] = await tryCatch(
163-
drainSessionStreamWaitpoints(addressingKey, params.io)
206+
drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io)
164207
);
165208
if (drainError) {
166209
logger.error("Failed to drain session stream waitpoints", {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
128128
// Drain any waitpoints registered for this channel — same as the
129129
// public append. Best-effort; failure doesn't fail the append.
130130
const [drainError, waitpointIds] = await tryCatch(
131-
drainSessionStreamWaitpoints(addressingKey, io)
131+
drainSessionStreamWaitpoints(environment.id, addressingKey, io)
132132
);
133133
if (drainError) {
134134
logger.error("Failed to drain session stream waitpoints (playground)", {

apps/webapp/app/services/realtime/sessions.server.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey(
7575
*
7676
* Note: `currentRunId` is left as-is — Prisma stores the internal run id
7777
* (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes
78-
* that emit a single `SessionItem` should use
79-
* {@link serializeSessionWithFriendlyRunId} instead, which resolves the
80-
* friendlyId via a TaskRun lookup. List endpoints stay on this raw form
81-
* to avoid N+1 lookups when paginating.
78+
* that emit `SessionItem`s must translate: single-row endpoints via
79+
* {@link serializeSessionWithFriendlyRunId}, list endpoints via the
80+
* batched {@link serializeSessionsWithFriendlyRunIds}. Never put this
81+
* raw form on the wire directly.
8282
*/
8383
export function serializeSession(session: Session): SessionItem {
8484
return {
@@ -125,3 +125,31 @@ export async function serializeSessionWithFriendlyRunId(
125125
currentRunId: run?.friendlyId ?? null,
126126
};
127127
}
128+
129+
/**
130+
* Batched form of {@link serializeSessionWithFriendlyRunId} for list
131+
* endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on
132+
* the wire is always the public `run_*` friendlyId — the raw
133+
* {@link serializeSession} form leaks the internal cuid, which customers
134+
* can't use with `runs.retrieve(...)`.
135+
*/
136+
export async function serializeSessionsWithFriendlyRunIds(
137+
sessions: Session[]
138+
): Promise<SessionItem[]> {
139+
const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))];
140+
141+
const runs = runIds.length
142+
? await $replica.taskRun.findMany({
143+
where: { id: { in: runIds } },
144+
select: { id: true, friendlyId: true },
145+
})
146+
: [];
147+
const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId]));
148+
149+
return sessions.map((session) => ({
150+
...serializeSession(session),
151+
currentRunId: session.currentRunId
152+
? friendlyIdByRunId.get(session.currentRunId) ?? null
153+
: null,
154+
}));
155+
}

0 commit comments

Comments
 (0)