Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/kimi_cli/acp/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
- `prompt_capabilities`: `embedded_context=False`, `image=True`, `audio=False`.
- `mcp_capabilities`: `http=True`, `sse=False`.
- Single-session: `load_session=False`, no session list capabilities.
- Multi-session: `load_session=True`, `session_capabilities.list` supported.
- Multi-session: `load_session=True`, `session_capabilities.list` supported, and
`session_capabilities._meta.kimi.sessionHistoryReplay=True`.
- `auth_methods=[]` (no authentication methods advertised).

## Session lifecycle (implemented behavior)
Expand All @@ -31,8 +32,14 @@
- MCP servers passed by ACP are converted via `acp_mcp_servers_to_mcp_config`.
- `session/load`
- Multi-session only: loads by `Session.find`, then builds `KimiCLI` and `ACPSession`.
- No history replay yet (TODO).
- Replays persisted `wire.jsonl` history as ACP `session/update` notifications, falls back to
context text history for older sessions without wire history, and returns initial modes/models
state plus title metadata in `_meta.kimi.session`.
- Sends `SessionInfoUpdate` with title/updatedAt before replaying history.
- Single-session: not implemented.
- `session/resume`
- Multi-session only: returns initial modes/models state plus title metadata in
`_meta.kimi.session`, and sends `SessionInfoUpdate` with title/updatedAt.
- `session/list`
- Multi-session only: lists sessions via `Session.list`, no pagination.
- Single-session: not implemented.
Expand All @@ -54,6 +61,9 @@
- `TodoDisplayBlock` is converted into `AgentPlanUpdate`.
- Available commands:
- `AvailableCommandsUpdate` is sent right after session creation.
- Session metadata:
- `SessionInfoUpdate` is sent after prompt completion when Kimi auto-generates a title, and
during `session/load`/`session/resume` so clients can hydrate title/updatedAt.

## Prompt/content conversion
- Incoming prompt blocks:
Expand Down
57 changes: 51 additions & 6 deletions src/kimi_cli/acp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def initialize(
),
mcp_capabilities=acp.schema.McpCapabilities(http=True, sse=False),
session_capabilities=acp.schema.SessionCapabilities(
field_meta={"kimi": {"sessionHistoryReplay": True}},
list=acp.schema.SessionListCapabilities(),
resume=acp.schema.SessionResumeCapabilities(),
),
Expand Down Expand Up @@ -231,18 +232,44 @@ async def _setup_session(

async def load_session(
self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any
) -> None:
) -> acp.schema.LoadSessionResponse:
logger.info("Loading session: {id} for working directory: {cwd}", id=session_id, cwd=cwd)

if session_id in self.sessions:
logger.warning("Session already loaded: {id}", id=session_id)
return
acp_session, model_id_conv = self.sessions[session_id]
else:
# Check authentication before loading session
self._check_auth()

# Check authentication before loading session
self._check_auth()
acp_session, model_id_conv = await self._setup_session(cwd, session_id, mcp_servers)

await self._setup_session(cwd, session_id, mcp_servers)
# TODO: replay session history?
await acp_session.send_session_info_update()
replayed_updates = await acp_session.replay_history()
logger.info(
"Replayed {count} ACP history updates for session: {id}",
count=replayed_updates,
id=session_id,
)

config = acp_session.cli.soul.runtime.config
return acp.schema.LoadSessionResponse(
field_meta=_session_response_meta(acp_session),
modes=acp.schema.SessionModeState(
available_modes=[
acp.schema.SessionMode(
id="default",
name="Default",
description="The default mode.",
),
],
current_mode_id="default",
),
models=acp.schema.SessionModelState(
available_models=_expand_llm_models(config.models),
current_model_id=model_id_conv.to_acp_model_id(),
),
)

async def resume_session(
self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any
Expand All @@ -253,8 +280,10 @@ async def resume_session(
await self._setup_session(cwd, session_id, mcp_servers)

acp_session, model_id_conv = self.sessions[session_id]
await acp_session.send_session_info_update()
config = acp_session.cli.soul.runtime.config
return acp.schema.ResumeSessionResponse(
field_meta=_session_response_meta(acp_session),
modes=acp.schema.SessionModeState(
available_modes=[
acp.schema.SessionMode(
Expand Down Expand Up @@ -445,3 +474,19 @@ def _expand_llm_models(models: dict[str, LLMModel]) -> list[acp.schema.ModelInfo
)
)
return expanded_models


def _session_response_meta(acp_session: ACPSession) -> dict[str, Any]:
session = acp_session.cli.soul.runtime.session
title = session.state.custom_title or session.title
updated_at = (
datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat()
if session.context_file.exists()
else None
)
meta: dict[str, Any] = {"sessionId": session.id}
if title != "Untitled":
meta["title"] = title
if updated_at is not None:
meta["updatedAt"] = updated_at
return {"kimi": {"session": meta}}
191 changes: 190 additions & 1 deletion src/kimi_cli/acp/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import asyncio
import uuid
from contextvars import ContextVar
from contextvars import ContextVar, Token
from datetime import datetime

import acp
import streamingjson # type: ignore[reportMissingTypeStubs]
Expand All @@ -22,9 +23,11 @@
from kimi_cli.wire.types import (
ApprovalRequest,
ApprovalResponse,
AudioURLPart,
CompactionBegin,
CompactionEnd,
ContentPart,
ImageURLPart,
MCPLoadingBegin,
MCPLoadingEnd,
Notification,
Expand Down Expand Up @@ -75,6 +78,44 @@ def should_hide_terminal_output(tool_call_id: str) -> bool:
return calls is not None and tool_call_id in calls


def _content_part_to_acp_block(part: ContentPart) -> ACPContentBlock:
if isinstance(part, TextPart):
return acp.schema.TextContentBlock(type="text", text=part.text)
if isinstance(part, ImageURLPart):
mime_type, data = _split_data_url(part.image_url.url)
if data is not None:
return acp.schema.ImageContentBlock(type="image", mime_type=mime_type, data=data)
return acp.schema.ResourceContentBlock(
type="resource_link",
uri=part.image_url.url,
name="image",
mime_type=mime_type,
)
if isinstance(part, AudioURLPart):
mime_type, data = _split_data_url(part.audio_url.url)
if data is not None:
return acp.schema.AudioContentBlock(type="audio", mime_type=mime_type, data=data)
return acp.schema.ResourceContentBlock(
type="resource_link",
uri=part.audio_url.url,
name="audio",
mime_type=mime_type,
)

logger.warning("Unsupported replay user content part: {part}", part=part)
return acp.schema.TextContentBlock(type="text", text=f"[{part.__class__.__name__}]")


def _split_data_url(url: str) -> tuple[str, str | None]:
if not url.startswith("data:"):
return "application/octet-stream", None
header, sep, data = url.partition(",")
if not sep or ";base64" not in header:
return "application/octet-stream", None
mime_type = header.removeprefix("data:").removesuffix(";base64")
return mime_type or "application/octet-stream", data


class _ToolCallState:
"""Manages the state of a single tool call for streaming updates."""

Expand Down Expand Up @@ -261,15 +302,163 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse:
reset_current_kaos(kaos_token)
_terminal_tool_call_ids.reset(terminal_tool_calls_token)
_current_turn_id.reset(token)
await self.send_session_info_update()
return acp.PromptResponse(stop_reason="end_turn")

async def replay_history(self) -> int:
"""Replay persisted wire history as ACP session updates."""
old_turn_state = self._turn_state
turn_token: Token[str | None] | None = None
replayed_updates = 0
self._turn_state = None
try:
async for record in self._cli.soul.runtime.session.wire_file.iter_records():
msg = record.to_wire_message()
match msg:
case TurnBegin(user_input=user_input):
if turn_token is not None:
_current_turn_id.reset(turn_token)
self._turn_state = _TurnState()
turn_token = _current_turn_id.set(self._turn_state.id)
replayed_updates += await self._send_user_input(user_input)
case SteerInput(user_input=user_input):
self._reset_content_run()
replayed_updates += await self._send_user_input(user_input)
case TurnEnd() | StepInterrupted():
if turn_token is not None:
_current_turn_id.reset(turn_token)
turn_token = None
self._turn_state = None
case StepBegin():
if turn_token is None:
turn_token = self._begin_replay_turn()
case ThinkPart(think=think):
await self._send_thinking(think)
replayed_updates += 1
case TextPart(text=text):
await self._send_text(text)
replayed_updates += 1
case ContentPart():
logger.warning("Unsupported replay content part: {part}", part=msg)
await self._send_text(f"[{msg.__class__.__name__}]")
replayed_updates += 1
case ToolCall():
if turn_token is None:
turn_token = self._begin_replay_turn()
await self._send_tool_call(msg)
replayed_updates += 1
case ToolCallPart():
if self._turn_state is not None:
await self._send_tool_call_part(msg)
replayed_updates += 1
case ToolResult():
if self._turn_state is not None:
await self._send_tool_result(msg)
replayed_updates += 1
case Notification():
await self._send_notification(msg)
replayed_updates += 1
case _:
pass
finally:
if turn_token is not None:
_current_turn_id.reset(turn_token)
self._turn_state = old_turn_state
if replayed_updates == 0:
replayed_updates = await self._replay_context_history()
return replayed_updates

def _begin_replay_turn(self) -> Token[str | None]:
self._turn_state = _TurnState()
return _current_turn_id.set(self._turn_state.id)

async def _replay_context_history(self) -> int:
old_turn_state = self._turn_state
turn_token: Token[str | None] | None = None
replayed_updates = 0
self._turn_state = None
try:
for message in self._cli.soul.context.history:
if turn_token is not None:
_current_turn_id.reset(turn_token)
turn_token = self._begin_replay_turn()

if message.role == "user":
replayed_updates += await self._send_user_input(list(message.content))
elif message.role == "assistant":
for part in message.content:
if isinstance(part, ThinkPart):
await self._send_thinking(part.think)
elif isinstance(part, TextPart):
await self._send_text(part.text)
else:
logger.warning("Unsupported context replay part: {part}", part=part)
await self._send_text(f"[{part.__class__.__name__}]")
replayed_updates += 1
finally:
if turn_token is not None:
_current_turn_id.reset(turn_token)
self._turn_state = old_turn_state
return replayed_updates

async def _send_user_input(self, user_input: str | list[ContentPart]) -> int:
blocks: list[ACPContentBlock]
if isinstance(user_input, str):
blocks = [acp.schema.TextContentBlock(type="text", text=user_input)]
else:
blocks = [_content_part_to_acp_block(part) for part in user_input]

for block in blocks:
await self._send_user_block(block)
return len(blocks)

async def _send_user_block(self, block: ACPContentBlock) -> None:
if not self._id or not self._conn:
return

await self._conn.session_update(
session_id=self._id,
update=acp.schema.UserMessageChunk(
content=block,
message_id=self._content_run_id("user"),
session_update="user_message_chunk",
),
)

async def cancel(self) -> None:
if self._turn_state is None:
logger.warning("Cancel requested but no prompt is running")
return

self._turn_state.cancel_event.set()

async def send_session_info_update(self) -> None:
"""Send current session metadata, including title, if available."""
if not self._id or not self._conn:
return

try:
session = self._cli.soul.runtime.session
except AttributeError:
return
title = session.state.custom_title or session.title
updated_at = (
datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat()
if session.context_file.exists()
else None
)
if title == "Untitled" and updated_at is None:
return

await self._conn.session_update(
session_id=self._id,
update=acp.schema.SessionInfoUpdate(
session_update="session_info_update",
title=title if title != "Untitled" else None,
updated_at=updated_at,
),
)

def _reset_content_run(self) -> None:
if self._turn_state is not None:
self._turn_state.reset_content_run()
Expand Down
1 change: 1 addition & 0 deletions src/kimi_cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ async def _mirror_external_cancel() -> None:
user_input,
_ui_loop_fn,
run_cancel_event,
wire_file=self._runtime.session.wire_file,
runtime=self._runtime,
)
)
Expand Down
Loading