Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/restore-archived-sessions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@moonshot-ai/kimi-code": minor
---

Add server APIs to restore archived sessions and list only archived sessions.
109 changes: 109 additions & 0 deletions packages/server/src/lib/sessionArchive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { readFile, writeFile } from 'node:fs/promises';
import { basename, isAbsolute, join, relative, resolve } from 'node:path';

import { SessionNotFoundError } from '@moonshot-ai/agent-core';

/**
* Temporary server-side session restore.
*
* TODO: remove once `@moonshot-ai/agent-core` exposes `ISessionService.restore`
* natively. At that point the `:restore` route should delegate to the service
* instead of rewriting `state.json` here.
*
* Archive is a boolean flag (`archived`) persisted in each session's
* `<sessionDir>/state.json`. agent-core's `SessionStore` can set it to `true`
* (`archive`) but has no inverse; while agent-core is being refactored we flip
* it back from the server by:
* 1. reading `<homeDir>/session_index.jsonl` to resolve `sessionId -> sessionDir`;
* 2. validating the resolved dir is inside `<homeDir>/sessions` (defense
* against a tampered index);
* 3. read-modify-write `state.json` with `archived: false`.
*
* This mirrors `SessionStore.archive` and publishes no event (same as archive).
* The query read-model rebuilds from the store on every call, so a restored
* session shows up in subsequent lists with no extra invalidation.
*/

interface SessionIndexEntry {
readonly sessionId: string;
readonly sessionDir: string;
readonly workDir: string;
}

export async function restoreArchivedSession(homeDir: string, sessionId: string): Promise<void> {
const sessionDir = await findSessionDir(homeDir, sessionId);
if (sessionDir === undefined) {
throw new SessionNotFoundError(sessionId);
}

const statePath = join(sessionDir, 'state.json');
let parsed: unknown;
try {
parsed = JSON.parse(await readFile(statePath, 'utf-8')) as unknown;
} catch {
throw new SessionNotFoundError(sessionId);
}
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
throw new SessionNotFoundError(sessionId);
}

const next: Record<string, unknown> = {
...(parsed as Record<string, unknown>),
archived: false,
updatedAt: new Date().toISOString(),
};
await writeFile(statePath, `${JSON.stringify(next, null, 2)}\n`, 'utf-8');
}

async function findSessionDir(homeDir: string, sessionId: string): Promise<string | undefined> {
const indexPath = join(homeDir, 'session_index.jsonl');
let raw: string;
try {
raw = await readFile(indexPath, 'utf-8');
} catch {
return undefined;
}

const sessionsDir = join(homeDir, 'sessions');
let found: string | undefined;
for (const line of raw.split(/\r?\n/)) {
const trimmed = line.trim();
if (trimmed === '') continue;
const entry = parseIndexLine(trimmed);
if (entry === undefined || entry.sessionId !== sessionId) continue;
const sessionDir = resolve(entry.sessionDir);
if (!isAbsolute(entry.sessionDir)) continue;
if (!isPathInside(sessionsDir, sessionDir)) continue;
if (basename(sessionDir) !== entry.sessionId) continue;
// Last valid line wins, matching `readSessionIndex`'s Map semantics.
found = sessionDir;
}
return found;
}

function parseIndexLine(line: string): SessionIndexEntry | undefined {
try {
const parsed = JSON.parse(line) as unknown;
if (typeof parsed !== 'object' || parsed === null) return undefined;
const entry = parsed as Partial<SessionIndexEntry>;
if (
typeof entry.sessionId !== 'string' ||
typeof entry.sessionDir !== 'string' ||
typeof entry.workDir !== 'string'
) {
return undefined;
}
return {
sessionId: entry.sessionId,
sessionDir: entry.sessionDir,
workDir: entry.workDir,
};
} catch {
return undefined;
}
}

function isPathInside(parent: string, child: string): boolean {
const rel = relative(resolve(parent), resolve(child));
return rel !== '' && !rel.startsWith('..') && !isAbsolute(rel);
}
112 changes: 99 additions & 13 deletions packages/server/src/routes/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import {
undoSessionResponseSchema,
workspaceIdSchema,
} from '@moonshot-ai/protocol';
import { IPromptService, ISessionService, SessionNotFoundError, SessionUndoUnavailableError, ErrorCodes, KimiError, IWorkspaceRegistry, WorkspaceNotFoundError, type IInstantiationService, type SessionClientTelemetry } from '@moonshot-ai/agent-core';
import { IPromptService, ISessionService, SessionNotFoundError, SessionUndoUnavailableError, ErrorCodes, KimiError, IEnvironmentService, IWorkspaceRegistry, WorkspaceNotFoundError, type IInstantiationService, type SessionClientTelemetry } from '@moonshot-ai/agent-core';
import { z } from 'zod';


import { errEnvelope, okEnvelope } from '../envelope';
import { restoreArchivedSession } from '../lib/sessionArchive';
import { defineRoute } from '../middleware/defineRoute';
import { parseActionSuffix } from './action-suffix';

Expand Down Expand Up @@ -82,6 +83,7 @@ const sessionsListQueryCoercion = z
page_size: z.coerce.number().int().min(1).max(100).optional(),
status: sessionStatusSchema.optional(),
include_archive: booleanQueryParam,
archived_only: booleanQueryParam,

workspace_id: workspaceIdSchema.optional(),
})
Expand All @@ -94,6 +96,14 @@ const sessionsListQueryCoercion = z
params: { code: ErrorCode.VALIDATION_FAILED },
});
}
if (value.archived_only === true && value.include_archive === true) {
ctx.addIssue({
code: 'custom',
message: 'archived_only and include_archive are mutually exclusive',
path: ['archived_only'],
params: { code: ErrorCode.VALIDATION_FAILED },
});
}
});

const sessionChildrenListQueryCoercion = z
Expand Down Expand Up @@ -155,6 +165,56 @@ function headerString(headers: Record<string, unknown>, key: string): string | u
return trimmed.length === 0 ? undefined : trimmed;
}

const DEFAULT_SESSION_LIST_PAGE_SIZE = 20;
const MAX_SESSION_LIST_PAGE_SIZE = 100;

type SessionListRequest = Parameters<ISessionService['list']>[0];
type SessionListPage = Awaited<ReturnType<ISessionService['list']>>;
type SessionListItem = SessionListPage['items'][number];
type SessionListBaseQuery = Omit<SessionListRequest, 'before_id' | 'after_id' | 'page_size' | 'status'>;
type SessionListCursor = Pick<SessionListRequest, 'before_id' | 'after_id' | 'page_size'>;

function normalizeSessionListPageSize(cursor: SessionListCursor): number {
const requested = cursor.page_size ?? DEFAULT_SESSION_LIST_PAGE_SIZE;
return Math.min(Math.max(requested, 1), MAX_SESSION_LIST_PAGE_SIZE);
}

async function listSessionsWithRouteFilter(
fetchPage: (query: SessionListRequest) => Promise<SessionListPage>,
baseQuery: SessionListBaseQuery,
cursor: SessionListCursor,
predicate: (session: SessionListItem) => boolean,
): Promise<SessionListPage> {
const targetSize = normalizeSessionListPageSize(cursor);
const matches: SessionListItem[] = [];
let beforeId = cursor.before_id;
let afterId = cursor.after_id;
let coreHasMore = true;

while (matches.length <= targetSize && coreHasMore) {
const page = await fetchPage({
...baseQuery,
before_id: beforeId,
after_id: afterId,
page_size: MAX_SESSION_LIST_PAGE_SIZE,
});
if (page.items.length === 0) break;

matches.push(...page.items.filter(predicate));
coreHasMore = page.has_more;

const nextBeforeId = page.items[page.items.length - 1]?.id;
if (!coreHasMore || nextBeforeId === undefined || nextBeforeId === beforeId) break;
beforeId = nextBeforeId;
afterId = undefined;
}

return {
items: matches.slice(0, targetSize),
has_more: matches.length > targetSize,
};
}

export function registerSessionsRoutes(
app: SessionRouteHost,
ix: IInstantiationService,
Expand Down Expand Up @@ -265,14 +325,11 @@ export function registerSessionsRoutes(
async (req, reply) => {
try {
const raw = req.query;
const baseQuery = {
before_id: raw.before_id,
after_id: raw.after_id,
page_size: raw.page_size,
status: raw.status,
includeArchive: raw.include_archive,
const archivedOnly = raw.archived_only === true;
const status = raw.status;
let baseQuery: SessionListBaseQuery = {
includeArchive: archivedOnly ? true : raw.include_archive,
};
let query;
if (raw.workspace_id !== undefined) {
const registry = ix.invokeFunction((a) => a.get(IWorkspaceRegistry));
let root: string;
Expand All @@ -287,11 +344,30 @@ export function registerSessionsRoutes(
}
throw err;
}
query = { ...baseQuery, workDir: root };
} else {
query = baseQuery;
baseQuery = { ...baseQuery, workDir: root };
}

if (archivedOnly) {
const page = await listSessionsWithRouteFilter(
(query) => ix.invokeFunction((a) => a.get(ISessionService).list(query)),
baseQuery,
raw,
(session) =>
session.archived === true && (status === undefined || session.status === status),
);
reply.send(okEnvelope(page, req.id));
return;
}
const page = await ix.invokeFunction((a) => a.get(ISessionService).list(query));

const page = await ix.invokeFunction((a) =>
a.get(ISessionService).list({
...baseQuery,
before_id: raw.before_id,
after_id: raw.after_id,
page_size: raw.page_size,
status,
}),
);
reply.send(okEnvelope(page, req.id));
} catch (err) {
sendMappedError(reply, req.id, err);
Expand Down Expand Up @@ -410,7 +486,7 @@ export function registerSessionsRoutes(
const { tail } = req.params;
const parsed = parseActionSuffix({
tail,
allowedActions: ['fork', 'compact', 'undo', 'abort', 'btw', 'archive'] as const,
allowedActions: ['fork', 'compact', 'undo', 'abort', 'btw', 'archive', 'restore'] as const,
resourceLabel: 'session',
});
if (parsed.kind !== 'action') {
Expand Down Expand Up @@ -468,6 +544,16 @@ export function registerSessionsRoutes(
return;
}

if (parsed.action === 'restore') {
const homeDir = ix.invokeFunction((a) => a.get(IEnvironmentService)).homeDir;
await restoreArchivedSession(homeDir, parsed.id);
const session = await ix.invokeFunction((a) =>
a.get(ISessionService).get(parsed.id),
);
reply.send(okEnvelope(session, req.id));
return;
}

const body = undoSessionRequestSchema.parse(req.body);
const result = await ix.invokeFunction((a) =>
a.get(ISessionService).undo(parsed.id, body),
Expand Down
4 changes: 2 additions & 2 deletions packages/server/test/question.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ afterEach(async () => {
// ignore
}
server = undefined;
rmSync(tmpDir, { recursive: true, force: true });
rmSync(bridgeHome, { recursive: true, force: true });
rmSync(tmpDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });
rmSync(bridgeHome, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });
});

async function bootDaemon(): Promise<RunningServer> {
Expand Down
Loading
Loading