diff --git a/src/kimi_cli/acp/AGENTS.md b/src/kimi_cli/acp/AGENTS.md index 24efeacaa..6ebd4c37b 100644 --- a/src/kimi_cli/acp/AGENTS.md +++ b/src/kimi_cli/acp/AGENTS.md @@ -20,7 +20,8 @@ - `prompt_capabilities`: `embedded_context=False`, `image=True`, `audio=False`. - `mcp_capabilities`: `http=True`, `sse=False`. - Single-session: `load_session=False`, no session list capabilities. -- Multi-session: `load_session=True`, `session_capabilities.list` supported. +- Multi-session: `load_session=True`, `session_capabilities.list` supported, and + `session_capabilities._meta.kimi.sessionHistoryReplay=True`. - `auth_methods=[]` (no authentication methods advertised). ## Session lifecycle (implemented behavior) @@ -31,8 +32,14 @@ - MCP servers passed by ACP are converted via `acp_mcp_servers_to_mcp_config`. - `session/load` - Multi-session only: loads by `Session.find`, then builds `KimiCLI` and `ACPSession`. - - No history replay yet (TODO). + - Replays persisted `wire.jsonl` history as ACP `session/update` notifications, falls back to + context text history for older sessions without wire history, and returns initial modes/models + state plus title metadata in `_meta.kimi.session`. + - Sends `SessionInfoUpdate` with title/updatedAt before replaying history. - Single-session: not implemented. +- `session/resume` + - Multi-session only: returns initial modes/models state plus title metadata in + `_meta.kimi.session`, and sends `SessionInfoUpdate` with title/updatedAt. - `session/list` - Multi-session only: lists sessions via `Session.list`, no pagination. - Single-session: not implemented. @@ -54,6 +61,9 @@ - `TodoDisplayBlock` is converted into `AgentPlanUpdate`. - Available commands: - `AvailableCommandsUpdate` is sent right after session creation. +- Session metadata: + - `SessionInfoUpdate` is sent after prompt completion when Kimi auto-generates a title, and + during `session/load`/`session/resume` so clients can hydrate title/updatedAt. ## Prompt/content conversion - Incoming prompt blocks: diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 84b5f4bfa..9711670b0 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -91,6 +91,7 @@ async def initialize( ), mcp_capabilities=acp.schema.McpCapabilities(http=True, sse=False), session_capabilities=acp.schema.SessionCapabilities( + field_meta={"kimi": {"sessionHistoryReplay": True}}, list=acp.schema.SessionListCapabilities(), resume=acp.schema.SessionResumeCapabilities(), ), @@ -231,18 +232,44 @@ async def _setup_session( async def load_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any - ) -> None: + ) -> acp.schema.LoadSessionResponse: logger.info("Loading session: {id} for working directory: {cwd}", id=session_id, cwd=cwd) if session_id in self.sessions: logger.warning("Session already loaded: {id}", id=session_id) - return + acp_session, model_id_conv = self.sessions[session_id] + else: + # Check authentication before loading session + self._check_auth() - # Check authentication before loading session - self._check_auth() + acp_session, model_id_conv = await self._setup_session(cwd, session_id, mcp_servers) - await self._setup_session(cwd, session_id, mcp_servers) - # TODO: replay session history? + await acp_session.send_session_info_update() + replayed_updates = await acp_session.replay_history() + logger.info( + "Replayed {count} ACP history updates for session: {id}", + count=replayed_updates, + id=session_id, + ) + + config = acp_session.cli.soul.runtime.config + return acp.schema.LoadSessionResponse( + field_meta=_session_response_meta(acp_session), + modes=acp.schema.SessionModeState( + available_modes=[ + acp.schema.SessionMode( + id="default", + name="Default", + description="The default mode.", + ), + ], + current_mode_id="default", + ), + models=acp.schema.SessionModelState( + available_models=_expand_llm_models(config.models), + current_model_id=model_id_conv.to_acp_model_id(), + ), + ) async def resume_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any @@ -253,8 +280,10 @@ async def resume_session( await self._setup_session(cwd, session_id, mcp_servers) acp_session, model_id_conv = self.sessions[session_id] + await acp_session.send_session_info_update() config = acp_session.cli.soul.runtime.config return acp.schema.ResumeSessionResponse( + field_meta=_session_response_meta(acp_session), modes=acp.schema.SessionModeState( available_modes=[ acp.schema.SessionMode( @@ -445,3 +474,19 @@ def _expand_llm_models(models: dict[str, LLMModel]) -> list[acp.schema.ModelInfo ) ) return expanded_models + + +def _session_response_meta(acp_session: ACPSession) -> dict[str, Any]: + session = acp_session.cli.soul.runtime.session + title = session.state.custom_title or session.title + updated_at = ( + datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat() + if session.context_file.exists() + else None + ) + meta: dict[str, Any] = {"sessionId": session.id} + if title != "Untitled": + meta["title"] = title + if updated_at is not None: + meta["updatedAt"] = updated_at + return {"kimi": {"session": meta}} diff --git a/src/kimi_cli/acp/session.py b/src/kimi_cli/acp/session.py index 7c984db32..7c0295f47 100644 --- a/src/kimi_cli/acp/session.py +++ b/src/kimi_cli/acp/session.py @@ -2,7 +2,8 @@ import asyncio import uuid -from contextvars import ContextVar +from contextvars import ContextVar, Token +from datetime import datetime import acp import streamingjson # type: ignore[reportMissingTypeStubs] @@ -22,9 +23,11 @@ from kimi_cli.wire.types import ( ApprovalRequest, ApprovalResponse, + AudioURLPart, CompactionBegin, CompactionEnd, ContentPart, + ImageURLPart, MCPLoadingBegin, MCPLoadingEnd, Notification, @@ -75,6 +78,44 @@ def should_hide_terminal_output(tool_call_id: str) -> bool: return calls is not None and tool_call_id in calls +def _content_part_to_acp_block(part: ContentPart) -> ACPContentBlock: + if isinstance(part, TextPart): + return acp.schema.TextContentBlock(type="text", text=part.text) + if isinstance(part, ImageURLPart): + mime_type, data = _split_data_url(part.image_url.url) + if data is not None: + return acp.schema.ImageContentBlock(type="image", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.image_url.url, + name="image", + mime_type=mime_type, + ) + if isinstance(part, AudioURLPart): + mime_type, data = _split_data_url(part.audio_url.url) + if data is not None: + return acp.schema.AudioContentBlock(type="audio", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.audio_url.url, + name="audio", + mime_type=mime_type, + ) + + logger.warning("Unsupported replay user content part: {part}", part=part) + return acp.schema.TextContentBlock(type="text", text=f"[{part.__class__.__name__}]") + + +def _split_data_url(url: str) -> tuple[str, str | None]: + if not url.startswith("data:"): + return "application/octet-stream", None + header, sep, data = url.partition(",") + if not sep or ";base64" not in header: + return "application/octet-stream", None + mime_type = header.removeprefix("data:").removesuffix(";base64") + return mime_type or "application/octet-stream", data + + class _ToolCallState: """Manages the state of a single tool call for streaming updates.""" @@ -261,8 +302,129 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse: reset_current_kaos(kaos_token) _terminal_tool_call_ids.reset(terminal_tool_calls_token) _current_turn_id.reset(token) + await self.send_session_info_update() return acp.PromptResponse(stop_reason="end_turn") + async def replay_history(self) -> int: + """Replay persisted wire history as ACP session updates.""" + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + async for record in self._cli.soul.runtime.session.wire_file.iter_records(): + msg = record.to_wire_message() + match msg: + case TurnBegin(user_input=user_input): + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = _TurnState() + turn_token = _current_turn_id.set(self._turn_state.id) + replayed_updates += await self._send_user_input(user_input) + case SteerInput(user_input=user_input): + self._reset_content_run() + replayed_updates += await self._send_user_input(user_input) + case TurnEnd() | StepInterrupted(): + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = None + self._turn_state = None + case StepBegin(): + if turn_token is None: + turn_token = self._begin_replay_turn() + case ThinkPart(think=think): + await self._send_thinking(think) + replayed_updates += 1 + case TextPart(text=text): + await self._send_text(text) + replayed_updates += 1 + case ContentPart(): + logger.warning("Unsupported replay content part: {part}", part=msg) + await self._send_text(f"[{msg.__class__.__name__}]") + replayed_updates += 1 + case ToolCall(): + if turn_token is None: + turn_token = self._begin_replay_turn() + await self._send_tool_call(msg) + replayed_updates += 1 + case ToolCallPart(): + if self._turn_state is not None: + await self._send_tool_call_part(msg) + replayed_updates += 1 + case ToolResult(): + if self._turn_state is not None: + await self._send_tool_result(msg) + replayed_updates += 1 + case Notification(): + await self._send_notification(msg) + replayed_updates += 1 + case _: + pass + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + if replayed_updates == 0: + replayed_updates = await self._replay_context_history() + return replayed_updates + + def _begin_replay_turn(self) -> Token[str | None]: + self._turn_state = _TurnState() + return _current_turn_id.set(self._turn_state.id) + + async def _replay_context_history(self) -> int: + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + for message in self._cli.soul.context.history: + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = self._begin_replay_turn() + + if message.role == "user": + replayed_updates += await self._send_user_input(list(message.content)) + elif message.role == "assistant": + for part in message.content: + if isinstance(part, ThinkPart): + await self._send_thinking(part.think) + elif isinstance(part, TextPart): + await self._send_text(part.text) + else: + logger.warning("Unsupported context replay part: {part}", part=part) + await self._send_text(f"[{part.__class__.__name__}]") + replayed_updates += 1 + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + return replayed_updates + + async def _send_user_input(self, user_input: str | list[ContentPart]) -> int: + blocks: list[ACPContentBlock] + if isinstance(user_input, str): + blocks = [acp.schema.TextContentBlock(type="text", text=user_input)] + else: + blocks = [_content_part_to_acp_block(part) for part in user_input] + + for block in blocks: + await self._send_user_block(block) + return len(blocks) + + async def _send_user_block(self, block: ACPContentBlock) -> None: + if not self._id or not self._conn: + return + + await self._conn.session_update( + session_id=self._id, + update=acp.schema.UserMessageChunk( + content=block, + message_id=self._content_run_id("user"), + session_update="user_message_chunk", + ), + ) + async def cancel(self) -> None: if self._turn_state is None: logger.warning("Cancel requested but no prompt is running") @@ -270,6 +432,33 @@ async def cancel(self) -> None: self._turn_state.cancel_event.set() + async def send_session_info_update(self) -> None: + """Send current session metadata, including title, if available.""" + if not self._id or not self._conn: + return + + try: + session = self._cli.soul.runtime.session + except AttributeError: + return + title = session.state.custom_title or session.title + updated_at = ( + datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat() + if session.context_file.exists() + else None + ) + if title == "Untitled" and updated_at is None: + return + + await self._conn.session_update( + session_id=self._id, + update=acp.schema.SessionInfoUpdate( + session_update="session_info_update", + title=title if title != "Untitled" else None, + updated_at=updated_at, + ), + ) + def _reset_content_run(self) -> None: if self._turn_state is not None: self._turn_state.reset_content_run() diff --git a/src/kimi_cli/app.py b/src/kimi_cli/app.py index fb3b6ab31..66bdf1dd1 100644 --- a/src/kimi_cli/app.py +++ b/src/kimi_cli/app.py @@ -652,6 +652,7 @@ async def _mirror_external_cancel() -> None: user_input, _ui_loop_fn, run_cancel_event, + wire_file=self._runtime.session.wire_file, runtime=self._runtime, ) ) diff --git a/tests/acp/test_protocol_v1.py b/tests/acp/test_protocol_v1.py index 70e413caa..53fbfcf1d 100644 --- a/tests/acp/test_protocol_v1.py +++ b/tests/acp/test_protocol_v1.py @@ -2,12 +2,14 @@ from __future__ import annotations +import os + import acp import pytest from kimi_cli.acp.version import CURRENT_VERSION -from .conftest import ACPTestClient +from .conftest import ACPTestClient, _kimi_bin, _repo_root pytestmark = pytest.mark.asyncio @@ -74,6 +76,10 @@ async def test_prompt_with_scripted_echo( assert resp.stop_reason in ("end_turn", "max_tokens", "max_turn_requests") # The scripted echo provider should have sent session updates assert len(test_client.updates) > 0 + assert any( + update.session_update == "session_info_update" and update.title == "Say hello" + for update in test_client.updates + ) async def test_list_sessions( @@ -128,6 +134,8 @@ async def test_resume_session( assert resume_resp.models is not None assert isinstance(resume_resp.models.current_model_id, str) assert len(resume_resp.models.available_models) > 0 + assert resume_resp.field_meta is not None + assert resume_resp.field_meta["kimi"]["session"]["title"] == "Hello" async def test_resume_session_not_found( @@ -148,6 +156,133 @@ async def test_resume_session_not_found( ) +async def test_load_session_replays_history(acp_share_dir, tmp_path): + """session/load rebinds the session and replays persisted transcript updates.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay this please")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + load_resp = await conn.load_session(cwd=str(work_dir), session_id=session_id) + + assert load_resp.modes is not None + assert load_resp.models is not None + assert load_resp.field_meta is not None + assert load_resp.field_meta["kimi"]["session"]["title"] == "Replay this please" + assert any( + update.session_update == "session_info_update" and update.title == "Replay this please" + for update in second_client.updates + ) + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay this please" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + +async def test_load_session_replays_context_when_wire_history_missing(acp_share_dir, tmp_path): + """Older ACP sessions can still replay text history if no wire log was recorded.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay from context")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + wire_files = list(acp_share_dir.rglob("wire.jsonl")) + assert wire_files + for wire_file in wire_files: + wire_file.unlink() + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + load_resp = await conn.load_session(cwd=str(work_dir), session_id=session_id) + + assert load_resp.field_meta is not None + assert load_resp.field_meta["kimi"]["session"]["title"] == "Replay from context" + assert any( + update.session_update == "session_info_update" and update.title == "Replay from context" + for update in second_client.updates + ) + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay from context" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + async def test_cancel_session( acp_client: tuple[acp.ClientSideConnection, ACPTestClient], tmp_path, diff --git a/tests/acp/test_server_initialize.py b/tests/acp/test_server_initialize.py index 1810c7ce3..31db83dcc 100644 --- a/tests/acp/test_server_initialize.py +++ b/tests/acp/test_server_initialize.py @@ -23,3 +23,17 @@ async def test_initialize_advertises_terminal_auth_method(): assert auth_method.type == "terminal" assert auth_method.args == ["login"] assert auth_method.env == {} + + +async def test_initialize_advertises_session_history_replay(): + """loadSession only means bind/resume; Kimi separately advertises transcript replay.""" + server = ACPServer() + + resp = await server.initialize(protocol_version=1) + + assert resp.agent_capabilities is not None + assert resp.agent_capabilities.load_session is True + assert resp.agent_capabilities.session_capabilities is not None + assert resp.agent_capabilities.session_capabilities.field_meta == { + "kimi": {"sessionHistoryReplay": True} + }