Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cloud-agent-next/src/persistence/CloudAgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
createLeaseQueries,
type ExecutionQueries,
type EventQueries,
type EventQueryFilters,
type LeaseQueries,
type LeaseAcquireError,
} from '../session/queries/index.js';
Expand Down Expand Up @@ -1129,6 +1130,15 @@ export class CloudAgentSession extends DurableObject {
return this.executionQueries.getActiveExecutionId();
}

/**
* Query stored events by filters.
* Used by server-side consumers (e.g. callback handlers) to retrieve
* execution events without a WebSocket connection.
*/
queryEvents(filters: EventQueryFilters): StoredEvent[] {
return this.eventQueries.findByFilters(filters);
}

/**
* Check if interrupt was requested for the current execution.
* Note: This is different from the legacy isInterrupted() method which uses 'interrupted' key.
Expand Down
207 changes: 207 additions & 0 deletions cloud-agent-next/src/router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,5 +836,212 @@ describe('router sessionId validation', () => {
});
});
});

describe('getSessionEvents procedure', () => {
let mockContext: TRPCContext;
let caller: ReturnType<typeof appRouter.createCaller>;
let cloudAgentSession: MockCAS;
let mockQueryEvents: ReturnType<typeof vi.fn>;

beforeEach(() => {
vi.clearAllMocks();

mockQueryEvents = vi.fn();

mockContext = {
userId: 'test-user-123',
authToken: 'test-token',
botId: undefined,
request: {} as Request,
env: {
Sandbox: {} as TRPCContext['env']['Sandbox'],
CLOUD_AGENT_SESSION: {
idFromName: vi.fn(id => ({ id })),
get: vi.fn(() => ({
queryEvents: mockQueryEvents,
})),
} as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'],
SESSION_INGEST: {
fetch: vi.fn(),
} as unknown as TRPCContext['env']['SESSION_INGEST'],
NEXTAUTH_SECRET: 'test-secret',
},
};
cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS;

caller = appRouter.createCaller(mockContext);
});

describe('successful retrieval', () => {
it('should return events for a valid session', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
const mockEvents = [
{
id: 1,
execution_id: 'exc_001',
session_id: sessionId,
stream_event_type: 'kilocode',
payload: JSON.stringify({ type: 'session.created' }),
timestamp: 1700000000000,
},
{
id: 2,
execution_id: 'exc_001',
session_id: sessionId,
stream_event_type: 'output',
payload: JSON.stringify({ text: 'hello' }),
timestamp: 1700000001000,
},
];

mockQueryEvents.mockResolvedValue(mockEvents);

const result = await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
});

expect(result).toHaveLength(2);
expect(result[0].id).toBe(1);
expect(result[0].stream_event_type).toBe('kilocode');
expect(result[1].stream_event_type).toBe('output');

expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`);
});

it('should return empty array when no events exist', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';

mockQueryEvents.mockResolvedValue([]);

const result = await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
});

expect(result).toHaveLength(0);
});
});

describe('filter forwarding', () => {
it('should pass eventTypes filter to DO', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
eventTypes: ['kilocode', 'error'],
});

expect(mockQueryEvents).toHaveBeenCalledWith(
expect.objectContaining({
eventTypes: ['kilocode', 'error'],
})
);
});

it('should convert executionId to executionIds array', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
executionId: 'exc_123',
});

expect(mockQueryEvents).toHaveBeenCalledWith(
expect.objectContaining({
executionIds: ['exc_123'],
})
);
});

it('should pass fromId filter to DO', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
fromId: 42,
});

expect(mockQueryEvents).toHaveBeenCalledWith(
expect.objectContaining({
fromId: 42,
})
);
});

it('should use default limit of 500', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
});

expect(mockQueryEvents).toHaveBeenCalledWith(
expect.objectContaining({
limit: 500,
})
);
});

it('should respect custom limit', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({
cloudAgentSessionId: sessionId,
limit: 100,
});

expect(mockQueryEvents).toHaveBeenCalledWith(
expect.objectContaining({
limit: 100,
})
);
});
});

describe('authorization', () => {
it('should require authentication', async () => {
const unauthenticatedContext: TRPCContext = {
userId: undefined,
authToken: undefined,
botId: undefined,
env: mockContext.env,
} as unknown as TRPCContext;

const unauthenticatedCaller = appRouter.createCaller(unauthenticatedContext);

await expect(
unauthenticatedCaller.getSessionEvents({
cloudAgentSessionId: 'agent_12345678-1234-1234-1234-123456789abc',
})
).rejects.toThrow('Authentication required');
});
});

describe('error handling', () => {
it('should propagate DO errors as tRPC errors', async () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
mockQueryEvents.mockRejectedValue(new Error('DO unavailable'));

await expect(
caller.getSessionEvents({ cloudAgentSessionId: sessionId })
).rejects.toThrow();
});
});

describe('user isolation', () => {
it('should key DO lookup by userId for user isolation', async () => {
const sessionId: SessionId = 'agent_22222222-2222-2222-2222-222222222222';
mockQueryEvents.mockResolvedValue([]);

await caller.getSessionEvents({ cloudAgentSessionId: sessionId });

expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`);
});
});
});
});
});
47 changes: 46 additions & 1 deletion cloud-agent-next/src/router/handlers/session-management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ import {
} from '../../workspace.js';
import { withDORetry } from '../../utils/do-retry.js';
import { protectedProcedure, publicProcedure } from '../auth.js';
import { sessionIdSchema, GetSessionInput, GetSessionOutput } from '../schemas.js';
import {
sessionIdSchema,
GetSessionInput,
GetSessionOutput,
GetSessionEventsInput,
GetSessionEventsOutput,
} from '../schemas.js';
import { computeExecutionHealth } from '../../core/execution.js';

/**
Expand Down Expand Up @@ -542,6 +548,45 @@ export function createSessionManagementHandlers() {
});
}),

/**
* Get stored execution events for a session.
*
* Used by server-side consumers (e.g. callback handlers) to retrieve
* execution events without requiring a WebSocket connection.
*/
getSessionEvents: protectedProcedure
.input(GetSessionEventsInput)
.output(GetSessionEventsOutput)
.query(async ({ input, ctx }) => {
return withLogTags({ source: 'getSessionEvents' }, async () => {
const sessionId = input.cloudAgentSessionId as SessionId;
const { userId, env } = ctx;

logger.setTags({ userId, sessionId });
logger.info('Fetching session events');

const doKey = `${userId}:${sessionId}`;
const getStub = () =>
env.CLOUD_AGENT_SESSION.get(env.CLOUD_AGENT_SESSION.idFromName(doKey));

const events = await withDORetry(
getStub,
stub =>
stub.queryEvents({
fromId: input.fromId,
executionIds: input.executionId ? [input.executionId] : undefined,
eventTypes: input.eventTypes,
limit: input.limit,
}),
'queryEvents'
);

logger.withFields({ eventCount: events.length }).info('Session events retrieved');

return events;
});
}),

/**
* Health check endpoint
*/
Expand Down
37 changes: 37 additions & 0 deletions cloud-agent-next/src/router/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,43 @@ export const GetSessionOutput = z.object({

export type GetSessionResponse = z.infer<typeof GetSessionOutput>;

/**
* Input schema for getSessionEvents endpoint.
* Retrieves stored execution events for server-side consumers
* (e.g. callback handlers) without requiring a WebSocket connection.
*/
export const GetSessionEventsInput = z.object({
cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID'),
eventTypes: z
.array(z.string().min(1))
.optional()
.describe('Filter by event types (e.g. kilocode, output, error)'),
executionId: z.string().min(1).optional().describe('Filter by execution ID'),
fromId: z.number().int().min(0).optional().describe('Return events with ID > fromId (exclusive)'),
limit: z
.number()
.int()
.min(1)
.max(1000)
.default(500)
.describe('Max events to return (default 500, max 1000)'),
});

/**
* Output schema for getSessionEvents endpoint.
* Returns an array of stored events matching the SQLite row structure.
*/
export const StoredEventSchema = z.object({
id: z.number(),
execution_id: z.string(),
session_id: z.string(),
stream_event_type: z.string(),
payload: z.string(),
timestamp: z.number(),
});

export const GetSessionEventsOutput = z.array(StoredEventSchema);

/**
* Response schema for V2 execution endpoints.
* Returns acknowledgment when execution has started.
Expand Down
Loading