From eca01a925341c8dc5148c1178ebd1adb17dcc566 Mon Sep 17 00:00:00 2001 From: "zhoujiahui.01" Date: Sat, 20 Jun 2026 22:19:53 +0800 Subject: [PATCH] feat(server): add session auto commit scheduling feat(server): add session auto commit scheduling fix: harden session auto commit e2e coverage refactor: shorten session auto commit index path docs: sync session auto commit design and api docs feat: refine session auto commit idle indexing feat: expose session auto commit in sdk and cli fix: pin setuptools-scm for editable builds fix: harden session auto commit index sync --- crates/ov_cli/README.md | 2 + crates/ov_cli/src/commands/session.rs | 109 +++- crates/ov_cli/src/handlers.rs | 16 + crates/ov_cli/src/main.rs | 24 + docs/en/api/05-sessions.md | 22 +- docs/en/guides/01-configuration.md | 29 + docs/zh/api/05-sessions.md | 22 +- docs/zh/guides/01-configuration.md | 29 + openviking/server/config.py | 21 +- openviking/server/routers/sessions.py | 88 ++- openviking/service/core.py | 30 + openviking/service/session_auto_commit.py | 518 ++++++++++++++++++ openviking/service/session_service.py | 200 ++++++- openviking/session/session.py | 22 + openviking_cli/client/base.py | 4 + pyproject.toml | 6 +- sdk/python/README.md | 17 + sdk/python/README_CN.md | 17 + sdk/python/openviking_sdk/client.py | 54 +- .../tests/test_async_client_behaviors.py | 126 ++++- .../misc/test_root_docker_image_packaging.py | 9 + tests/server/test_api_sessions.py | 263 +++++++++ .../service/test_core_encryption_startup.py | 146 +++++ .../unit/service/test_session_auto_commit.py | 100 ++++ 24 files changed, 1851 insertions(+), 23 deletions(-) create mode 100644 openviking/service/session_auto_commit.py create mode 100644 tests/unit/service/test_session_auto_commit.py diff --git a/crates/ov_cli/README.md b/crates/ov_cli/README.md index 5d8b4a0bdd..6b78417c1e 100644 --- a/crates/ov_cli/README.md +++ b/crates/ov_cli/README.md @@ -142,6 +142,8 @@ ov glob "**/*.md" --uri viking://resources # Session workflow SESSION=$(ov -o json session new | jq -r '.result.session_id') ov session add-message --session-id $SESSION --role user --content "Hello" +ov session add-message --session-id $SESSION --role user --content "remember this" \ + --auto-commit-enabled true --token-threshold 512 --idle-timeout-seconds 60 --keep-recent-count 2 ov session commit --session-id $SESSION ``` diff --git a/crates/ov_cli/src/commands/session.rs b/crates/ov_cli/src/commands/session.rs index 6746d67653..36da7bda08 100644 --- a/crates/ov_cli/src/commands/session.rs +++ b/crates/ov_cli/src/commands/session.rs @@ -301,12 +301,49 @@ fn message_body(client: &HttpClient, role: &str, content: &str) -> serde_json::V body } +fn apply_auto_commit_policy( + body: &mut serde_json::Value, + enabled: Option, + token_threshold: Option, + idle_timeout_seconds: Option, + keep_recent_count: Option, +) { + if enabled.is_none() + && token_threshold.is_none() + && idle_timeout_seconds.is_none() + && keep_recent_count.is_none() + { + return; + } + let mut policy = serde_json::Map::new(); + if let Some(enabled) = enabled { + policy.insert("enabled".to_string(), json!(enabled)); + } + if let Some(token_threshold) = token_threshold { + policy.insert("token_threshold".to_string(), json!(token_threshold)); + } + if let Some(idle_timeout_seconds) = idle_timeout_seconds { + policy.insert( + "idle_timeout_seconds".to_string(), + json!(idle_timeout_seconds), + ); + } + if let Some(keep_recent_count) = keep_recent_count { + policy.insert("keep_recent_count".to_string(), json!(keep_recent_count)); + } + body["auto_commit_policy"] = serde_json::Value::Object(policy); +} + pub async fn add_message( client: &HttpClient, session_id: &str, role: &str, content: &str, peer_id: Option<&str>, + auto_commit_enabled: Option, + token_threshold: Option, + idle_timeout_seconds: Option, + keep_recent_count: Option, output_format: OutputFormat, compact: bool, ) -> Result<()> { @@ -327,6 +364,13 @@ pub async fn add_message( body["agent_id"] = json!(agent_id); } } + apply_auto_commit_policy( + &mut body, + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, + ); let response: serde_json::Value = client.post(&path, &body).await?; output_success(&response, output_format, compact); @@ -337,6 +381,10 @@ pub async fn add_messages( client: &HttpClient, session_id: &str, input: &str, + auto_commit_enabled: Option, + token_threshold: Option, + idle_timeout_seconds: Option, + keep_recent_count: Option, output_format: OutputFormat, compact: bool, ) -> Result<()> { @@ -346,7 +394,14 @@ pub async fn add_messages( .iter() .map(|(role, content)| message_body(client, role, content)) .collect(); - let body = json!({"messages": messages_json}); + let mut body = json!({"messages": messages_json}); + apply_auto_commit_policy( + &mut body, + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, + ); let response: serde_json::Value = client.post(&path, &body).await?; output_success(&response, output_format, compact); Ok(()) @@ -429,7 +484,8 @@ fn url_encode(s: &str) -> String { #[cfg(test)] mod tests { - use super::{parse_messages, render_session_get_for_table}; + use super::{apply_auto_commit_policy, message_body, parse_messages, render_session_get_for_table}; + use crate::client::HttpClient; use crate::error::Error; use serde_json::json; @@ -529,6 +585,55 @@ mod tests { assert!(render_session_get_for_table(&result).is_none()); } + #[test] + fn apply_auto_commit_policy_skips_empty_policy() { + let mut body = json!({"role": "user", "content": "hello"}); + + apply_auto_commit_policy(&mut body, None, None, None, None); + + assert!(body.get("auto_commit_policy").is_none()); + } + + #[test] + fn apply_auto_commit_policy_adds_top_level_policy() { + let mut body = json!({"messages": [{"role": "user", "content": "hello"}]}); + + apply_auto_commit_policy(&mut body, Some(true), Some(128), Some(30), Some(2)); + + assert_eq!( + body, + json!({ + "messages": [{"role": "user", "content": "hello"}], + "auto_commit_policy": { + "enabled": true, + "token_threshold": 128, + "idle_timeout_seconds": 30, + "keep_recent_count": 2 + } + }) + ); + } + + #[test] + fn message_body_preserves_message_shape_without_auto_commit_policy() { + let client = HttpClient::new( + "http://localhost:1933".to_string(), + None, + None, + None, + None, + None, + 5.0, + false, + None, + ); + + let body = message_body(&client, "user", "hello"); + + assert_eq!(body, json!({"role": "user", "content": "hello"})); + assert!(body.get("auto_commit_policy").is_none()); + } + fn strip_ansi(input: &str) -> String { let mut output = String::with_capacity(input.len()); let mut chars = input.chars().peekable(); diff --git a/crates/ov_cli/src/handlers.rs b/crates/ov_cli/src/handlers.rs index e71f0ee2fe..2f611d3ef0 100644 --- a/crates/ov_cli/src/handlers.rs +++ b/crates/ov_cli/src/handlers.rs @@ -464,6 +464,10 @@ pub async fn handle_session(cmd: SessionCommands, ctx: CliContext) -> Result<()> role, content, peer_id, + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, } => { commands::session::add_message( &client, @@ -471,6 +475,10 @@ pub async fn handle_session(cmd: SessionCommands, ctx: CliContext) -> Result<()> &role, &content, peer_id.as_deref(), + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, ctx.output_format, ctx.compact, ) @@ -479,11 +487,19 @@ pub async fn handle_session(cmd: SessionCommands, ctx: CliContext) -> Result<()> SessionCommands::AddMessages { session_id, messages, + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, } => { commands::session::add_messages( &client, &session_id, &messages, + auto_commit_enabled, + token_threshold, + idle_timeout_seconds, + keep_recent_count, ctx.output_format, ctx.compact, ) diff --git a/crates/ov_cli/src/main.rs b/crates/ov_cli/src/main.rs index 0a453c384f..d28ab69c74 100644 --- a/crates/ov_cli/src/main.rs +++ b/crates/ov_cli/src/main.rs @@ -1132,6 +1132,18 @@ enum SessionCommands { /// Stable interaction peer id. Omit for self memory. #[arg(long = "peer-id", value_name = "peer-id")] peer_id: Option, + /// Enable or disable session auto commit. + #[arg(long = "auto-commit-enabled", value_name = "bool")] + auto_commit_enabled: Option, + /// Trigger auto commit when pending tokens reach this threshold. + #[arg(long = "token-threshold", value_name = "tokens")] + token_threshold: Option, + /// Trigger auto commit after this idle timeout in seconds. + #[arg(long = "idle-timeout-seconds", value_name = "seconds")] + idle_timeout_seconds: Option, + /// Keep this many recent live messages after commit. + #[arg(long = "keep-recent-count", value_name = "count")] + keep_recent_count: Option, }, /// Add multiple messages to a session AddMessages { @@ -1141,6 +1153,18 @@ enum SessionCommands { /// Messages as JSON array of {role, content} objects #[arg(value_name = "messages-json")] messages: String, + /// Enable or disable session auto commit. + #[arg(long = "auto-commit-enabled", value_name = "bool")] + auto_commit_enabled: Option, + /// Trigger auto commit when pending tokens reach this threshold. + #[arg(long = "token-threshold", value_name = "tokens")] + token_threshold: Option, + /// Trigger auto commit after this idle timeout in seconds. + #[arg(long = "idle-timeout-seconds", value_name = "seconds")] + idle_timeout_seconds: Option, + /// Keep this many recent live messages after commit. + #[arg(long = "keep-recent-count", value_name = "count")] + keep_recent_count: Option, }, /// Commit a session (archive messages and extract memories) Commit { diff --git a/docs/en/api/05-sessions.md b/docs/en/api/05-sessions.md index 7837639dad..d6763e77f6 100644 --- a/docs/en/api/05-sessions.md +++ b/docs/en/api/05-sessions.md @@ -654,6 +654,7 @@ Add a message to the session. Supports two modes: simple text mode and Parts mod | content | str | Conditional | - | Message text content (HTTP API simple mode, mutually exclusive with parts) | | created_at | str | No | None | Optional ISO 8601 timestamp to persist on the message | | peer_id | str | No | None | Optional stable interaction peer identity | +| auto_commit_policy | object | No | None | Optional session-level auto-commit policy. When provided, it is persisted into session metadata and reused by later server-side automatic triggers | > **Note**: HTTP API supports two modes: > 1. **Simple mode**: Use `content` string (backward compatible) @@ -661,6 +662,21 @@ Add a message to the session. Supports two modes: simple text mode and Parts mod > > If both `content` and `parts` are provided, `parts` takes precedence. +`auto_commit_policy` fields: + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | bool | No | Enables or disables automatic commit for the session. Setting it to `false` turns off auto-triggering for that session | +| `token_threshold` | int | No | Token threshold. After a message write, OpenViking tries an immediate auto commit when `pending_tokens` reaches this value | +| `idle_timeout_seconds` | int | No | Idle timeout in seconds. When configured, the session becomes eligible for the server-side idle scheduler | +| `keep_recent_count` | int | No | Number of recent live messages to keep after commit. Updating it also updates session metadata and rebuilds `pending_tokens` | + +Additional notes: + +- `auto_commit_policy` is a session-level policy, not a per-message policy. +- Once written, later server-side automatic triggering uses the persisted value from session metadata. +- `idle_timeout_seconds` only takes effect when the server-wide `server.session_auto_commit.idle_enabled` switch is enabled. + **Part Types (Python SDK)** ```python @@ -849,9 +865,13 @@ Add multiple messages to a session in a single request. Suitable for scenarios t |------|------|------|--------|------| | session_id | str | Yes | - | Session ID | | messages | List[AddMessageRequest] | Yes | - | List of messages, each following the same format as `add_message()`, max 100 | +| auto_commit_policy | object | No | None | Optional session-level auto-commit policy. It may only be provided once at the batch top level | | telemetry | bool | No | False | Whether to attach operation telemetry data | -> **Note**: Each message follows the exact same format as `add_message()`, supporting both `content` (simple mode) and `parts` (Parts mode). If you need to add more than 100 messages, call in batches. +> **Note**: +> 1. Each message supports both `content` (simple mode) and `parts` (Parts mode). If you need to add more than 100 messages, call in batches. +> 2. `auto_commit_policy` is only allowed at the batch top level. +> 3. `messages[*].auto_commit_policy` is rejected by the server. #### 3. Usage Examples diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index 59a83e69de..ec578f8c64 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -1069,6 +1069,35 @@ Legacy compatibility example: } ``` +##### Session Auto Commit Configuration + +`server.session_auto_commit` controls server-wide automatic session commit behavior. + +```json +{ + "server": { + "session_auto_commit": { + "idle_enabled": true, + "check_interval_seconds": 60.0 + } + } +} +``` + +| Parameter | Type | Description | Default | +|-----------|------|-------------|---------| +| `idle_enabled` | bool | Enables the server-side idle-timeout auto-commit scheduler. When disabled, the idle scheduler is not started and the idle index is not maintained. Token-threshold immediate triggering still works | `true` | +| `check_interval_seconds` | float | Poll interval for the idle scheduler in seconds. Must be greater than `0` | `60.0` | + +Notes: + +- `server.session_auto_commit` is a server-wide control surface, not a per-session business policy. +- Per-session auto-commit behavior is configured through `auto_commit_policy` on message write APIs and persisted into session metadata. +- When `idle_enabled=false`: + - `SessionAutoCommitScheduler` is not started + - `/local/_system/session_auto_commit/index.json` is not maintained +- Token-threshold auto commit does not depend on the scheduler and is unaffected by this switch. + ##### S3 Backend Configuration diff --git a/docs/zh/api/05-sessions.md b/docs/zh/api/05-sessions.md index ea54e3af4d..f4e53a3bbd 100644 --- a/docs/zh/api/05-sessions.md +++ b/docs/zh/api/05-sessions.md @@ -653,6 +653,7 @@ ov session delete a1b2c3d4 | content | str | 条件必填 | - | 消息文本内容(HTTP API 简单模式,与 parts 二选一) | | created_at | str | 否 | None | 可选的 ISO 8601 时间戳,会原样保存到消息中 | | peer_id | str | 否 | None | 可选的稳定交互对象 ID | +| auto_commit_policy | object | 否 | None | 可选的 session 级自动 commit 策略。传入后会持久化到 session meta,并用于后续服务端自动触发 | > **注意**:HTTP API 支持两种模式: > 1. **简单模式**:使用 `content` 字符串(向后兼容) @@ -660,6 +661,21 @@ ov session delete a1b2c3d4 > > 如果同时提供 `content` 和 `parts`,`parts` 优先。 +`auto_commit_policy` 字段: + +| 字段 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `enabled` | bool | 否 | 是否启用该 session 的自动 commit。设为 `false` 时会关闭该 session 的自动触发 | +| `token_threshold` | int | 否 | token 阈值。消息写入后若 `pending_tokens` 达到该值,会尝试立即触发自动 commit | +| `idle_timeout_seconds` | int | 否 | idle 超时时间,单位秒。配置后该 session 会进入服务端 idle 调度范围 | +| `keep_recent_count` | int | 否 | commit 后保留最近多少条 live message,不参与归档删除。修改后会同步更新 session meta,并触发 `pending_tokens` 重建 | + +补充说明: + +- `auto_commit_policy` 是 session 级配置,不是单条 message 级配置。 +- 一旦写入,后续服务端自动触发以 session meta 中的持久化值为准。 +- `idle_timeout_seconds` 是否生效,还取决于服务端全局配置 `server.session_auto_commit.idle_enabled` 是否开启。 + **Part 类型(Python SDK)** ```python @@ -823,9 +839,13 @@ ov session add-message a1b2c3d4 --role user --content "How do I authenticate use |------|------|------|--------|------| | session_id | str | 是 | - | 会话 ID | | messages | List[AddMessageRequest] | 是 | - | 消息列表,每条消息格式与 `add_message()` 相同,最多 100 条 | +| auto_commit_policy | object | 否 | None | 可选的 session 级自动 commit 策略。只允许在 batch 顶层传一次 | | telemetry | bool | 否 | False | 是否附加操作遥测数据 | -> **注意**:每条消息的格式与 `add_message()` 完全一致,支持 `content`(简单模式)和 `parts`(Parts 模式)。超过 100 条需分批调用。 +> **注意**: +> 1. 每条消息支持 `content`(简单模式)和 `parts`(Parts 模式),超过 100 条需分批调用。 +> 2. `auto_commit_policy` 只允许出现在 batch 顶层。 +> 3. `messages[*].auto_commit_policy` 不允许,服务端会直接拒绝请求。 #### 3. 使用示例 diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index 93b32e9ed1..0e6531d608 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -1041,6 +1041,35 @@ RAGFS 默认使用 Rust binding 模式,通过 Rust 实现直接访问文件系 } ``` +##### Session Auto Commit 配置 + +`server.session_auto_commit` 用于控制服务端 session 自动 commit 的全局行为。 + +```json +{ + "server": { + "session_auto_commit": { + "idle_enabled": true, + "check_interval_seconds": 60.0 + } + } +} +``` + +| 参数 | 类型 | 说明 | 默认值 | +|------|------|------|--------| +| `idle_enabled` | bool | 是否启用服务端 idle timeout 自动 commit 调度器。关闭后,不会启动 idle scheduler,也不会维护 idle 索引;但 token threshold 的即时触发仍然生效 | `true` | +| `check_interval_seconds` | float | idle scheduler 的检查周期,单位秒,必须大于 `0` | `60.0` | + +说明: + +- `server.session_auto_commit` 是服务端全局配置,不是单个 session 的业务 policy。 +- session 级别的自动触发参数通过消息写入接口中的 `auto_commit_policy` 设置,并持久化到 session meta。 +- `idle_enabled=false` 时: + - 不会启动 `SessionAutoCommitScheduler` + - 不会维护 `/local/_system/session_auto_commit/index.json` +- token threshold 自动触发不依赖 scheduler,所以不受这个开关影响。 + ##### S3 后端配置 diff --git a/openviking/server/config.py b/openviking/server/config.py index 27fc87423c..5cf790f6d2 100644 --- a/openviking/server/config.py +++ b/openviking/server/config.py @@ -7,11 +7,10 @@ from pydantic import BaseModel, Field, ValidationError -from openviking.server.identity import AuthMode -from openviking_cli.utils import get_logger - # Import auth plugin registry for config validation from openviking.server.auth.registry import get_registry +from openviking.server.identity import AuthMode +from openviking_cli.utils import get_logger from openviking_cli.utils.config.config_loader import ( load_json_config, resolve_config_path, @@ -159,6 +158,15 @@ class ToolOutputExternalizationConfig(BaseModel): model_config = {"extra": "forbid"} +class SessionAutoCommitConfig(BaseModel): + """Server-wide controls for automatic session commits.""" + + idle_enabled: bool = True + check_interval_seconds: float = Field(default=60.0, gt=0) + + model_config = {"extra": "forbid"} + + class ServerConfig(BaseModel): host: str = "127.0.0.1" port: int = 1933 @@ -180,6 +188,7 @@ class ServerConfig(BaseModel): public_base_url: Optional[str] = None upload_signed_ttl_seconds: int = 600 temp_upload: TempUploadConfig = Field(default_factory=TempUploadConfig) + session_auto_commit: SessionAutoCommitConfig = Field(default_factory=SessionAutoCommitConfig) tool_output_externalization: ToolOutputExternalizationConfig = Field( default_factory=ToolOutputExternalizationConfig ) @@ -351,7 +360,8 @@ def validate_server_config(config: ServerConfig) -> None: # Ensure built-in plugins are registered before validation. # If a non-built-in plugin has already claimed a built-in mode name, # log a security warning and forcefully override it. - from openviking.server.auth.plugins import DevAuthPlugin, ApiKeyAuthPlugin, TrustedAuthPlugin + from openviking.server.auth.plugins import ApiKeyAuthPlugin, DevAuthPlugin, TrustedAuthPlugin + registry = get_registry() _BUILTIN_PLUGINS = { "dev": DevAuthPlugin, @@ -375,8 +385,7 @@ def validate_server_config(config: ServerConfig) -> None: plugin_cls = registry.get(effective_auth_mode) if plugin_cls is None: logger.error( - "Unknown auth_mode: %r. No auth plugin registered for this mode. " - "Registered modes: %s.", + "Unknown auth_mode: %r. No auth plugin registered for this mode. Registered modes: %s.", effective_auth_mode, ", ".join(registry.list_modes()), ) diff --git a/openviking/server/routers/sessions.py b/openviking/server/routers/sessions.py index bb86b4e568..cfb3944d46 100644 --- a/openviking/server/routers/sessions.py +++ b/openviking/server/routers/sessions.py @@ -73,6 +73,17 @@ class ToolPartRequest(BaseModel): PartRequest = TextPartRequest | ContextPartRequest | ToolPartRequest +class AutoCommitPolicyRequest(BaseModel): + """Session-level server-side auto-commit policy.""" + + enabled: bool + token_threshold: Optional[int] = Field(default=None, ge=0) + idle_timeout_seconds: Optional[int] = Field(default=None, gt=0) + keep_recent_count: int = Field(default=0, ge=0) + + model_config = {"extra": "forbid"} + + class AddMessageRequest(BaseModel): """Request model for adding a message. @@ -89,6 +100,7 @@ class AddMessageRequest(BaseModel): agent_uri: Optional[str] = None content: Optional[str] = None parts: Optional[List[Dict[str, Any]]] = None + auto_commit_policy: Optional[AutoCommitPolicyRequest] = None created_at: Optional[str] = None telemetry: TelemetryRequest = False @@ -104,12 +116,54 @@ def validate_content_or_parts(self) -> "AddMessageRequest": return self +class BatchMessageRequest(BaseModel): + """Batch message item model without session-level policy fields.""" + + model_config = {"extra": "forbid"} + + role: str + peer_id: Optional[str] = None + agent_id: Optional[str] = None + agent_uri: Optional[str] = None + content: Optional[str] = None + parts: Optional[List[Dict[str, Any]]] = None + created_at: Optional[str] = None + + @model_validator(mode="after") + def validate_content_or_parts(self) -> "BatchMessageRequest": + if self.content is None and self.parts is None: + raise ValueError("Either 'content' or 'parts' must be provided") + self.peer_id = normalize_peer_selector( + self.peer_id, + agent_id=self.agent_id, + agent_uri=self.agent_uri, + ) + return self + + class BatchAddMessageRequest(BaseModel): """Request model for adding multiple messages in a single request.""" - messages: List[AddMessageRequest] = Field(..., max_length=100) + messages: List[BatchMessageRequest] = Field(..., max_length=100) + auto_commit_policy: Optional[AutoCommitPolicyRequest] = None telemetry: TelemetryRequest = False + @model_validator(mode="before") + @classmethod + def reject_per_message_auto_commit_policy(cls, data: Any) -> Any: + if not isinstance(data, dict): + return data + messages = data.get("messages") + if not isinstance(messages, list): + return data + for message in messages: + if isinstance(message, dict) and "auto_commit_policy" in message: + raise ValueError( + "Per-message auto_commit_policy is not allowed in batch requests; " + "use the top-level auto_commit_policy field instead" + ) + return data + class UsedRequest(BaseModel): """Request model for recording usage.""" @@ -445,6 +499,12 @@ async def add_message( async def _add() -> dict[str, Any]: session = await service.sessions.get(session_id, _ctx, auto_create=True) parts = _resolve_message_parts(request) + policy_payload = ( + request.auto_commit_policy.model_dump() + if request.auto_commit_policy is not None + else None + ) + policy_provided = "auto_commit_policy" in request.model_fields_set session.add_messages( [ @@ -456,6 +516,16 @@ async def _add() -> dict[str, Any]: } ] ) + await service.sessions.persist_auto_commit_policy_and_schedule( + session, + auto_commit_policy=policy_payload, + policy_provided=policy_provided, + ) + await service.sessions.maybe_schedule_auto_commit( + session_id, + _ctx, + reason_hint="token_threshold", + ) return { "session_id": session_id, "message_count": len(session.messages), @@ -485,6 +555,12 @@ async def batch_add_messages( async def _batch_add() -> dict[str, Any]: session = await service.sessions.get(session_id, _ctx, auto_create=True) specs = [] + policy_payload = ( + request.auto_commit_policy.model_dump() + if request.auto_commit_policy is not None + else None + ) + policy_provided = "auto_commit_policy" in request.model_fields_set for msg_request in request.messages: parts = _resolve_message_parts(msg_request) specs.append( @@ -496,6 +572,16 @@ async def _batch_add() -> dict[str, Any]: } ) msgs = session.add_messages(specs) + await service.sessions.persist_auto_commit_policy_and_schedule( + session, + auto_commit_policy=policy_payload, + policy_provided=policy_provided, + ) + await service.sessions.maybe_schedule_auto_commit( + session_id, + _ctx, + reason_hint="token_threshold", + ) return { "session_id": session_id, "message_count": len(session.messages), diff --git a/openviking/service/core.py b/openviking/service/core.py index b170675b70..2bcc84454d 100644 --- a/openviking/service/core.py +++ b/openviking/service/core.py @@ -12,6 +12,7 @@ from openviking.core.directories import DirectoryInitializer from openviking.privacy import UserPrivacyConfigService from openviking.resource.watch_scheduler import WatchScheduler +from openviking.server.config import SessionAutoCommitConfig from openviking.server.identity import RequestContext, Role from openviking.service.debug_service import DebugService from openviking.service.fs_service import FSService @@ -20,6 +21,7 @@ from openviking.service.resource_memory_link_service import ResourceMemoryLinkService from openviking.service.resource_service import ResourceService from openviking.service.search_service import SearchService +from openviking.service.session_auto_commit import SessionAutoCommitScheduler from openviking.service.session_service import SessionService from openviking.service.task_tracker import set_task_tracker from openviking.session import create_session_compressor @@ -86,6 +88,7 @@ def __init__( self._lock_manager: Optional[LockManager] = None self._directory_initializer: Optional[DirectoryInitializer] = None self._watch_scheduler: Optional[WatchScheduler] = None + self._session_auto_commit_scheduler: Optional[SessionAutoCommitScheduler] = None self._encryptor: Optional[Any] = None self._privacy_config_service: Optional[UserPrivacyConfigService] = None self._data_dir_lock_acquired = False @@ -405,6 +408,28 @@ async def initialize(self) -> None: viking_fs=self._viking_fs, session_compressor=self._session_compressor, ) + from openviking.server.dependencies import get_server_config + + server_config = get_server_config() + session_auto_commit_config = ( + server_config.session_auto_commit + if server_config is not None + else SessionAutoCommitConfig() + ) + self._session_service.set_session_auto_commit_config(session_auto_commit_config) + if session_auto_commit_config.idle_enabled: + self._session_auto_commit_scheduler = SessionAutoCommitScheduler( + self._session_service, + session_auto_commit_config, + check_interval=session_auto_commit_config.check_interval_seconds, + ) + await self._session_auto_commit_scheduler.start() + if self._session_auto_commit_scheduler.index is not None: + self._session_service.set_auto_commit_index( + self._session_auto_commit_scheduler.index + ) + else: + self._session_auto_commit_scheduler = None self._debug_service.set_dependencies( vikingdb=self._vikingdb_manager, config=self._config, @@ -423,6 +448,11 @@ async def close(self) -> None: self._watch_scheduler = None logger.info("WatchScheduler stopped") + if self._session_auto_commit_scheduler: + await self._session_auto_commit_scheduler.stop() + self._session_auto_commit_scheduler = None + logger.info("SessionAutoCommitScheduler stopped") + if self._lock_manager: await self._lock_manager.stop() self._lock_manager = None diff --git a/openviking/service/session_auto_commit.py b/openviking/service/session_auto_commit.py new file mode 100644 index 0000000000..3eec26bfaf --- /dev/null +++ b/openviking/service/session_auto_commit.py @@ -0,0 +1,518 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Runtime helpers for server-side automatic session commits.""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Tuple + +from openviking.pyagfs import AsyncAGFSClient +from openviking.server.identity import RequestContext, Role +from openviking.session import Session +from openviking.utils.time_utils import get_current_timestamp +from openviking_cli.session.user_id import UserIdentifier +from openviking_cli.utils import get_logger + +logger = get_logger(__name__) + +SESSION_AUTO_COMMIT_INDEX_URI = "/local/_system/session_auto_commit/index.json" + + +@dataclass(frozen=True) +class IndexedSession: + account_id: str + user_id: str + session_id: str + next_check_at: str = "" + + +class SessionAutoCommitIndex: + """Persistent membership index of active idle auto-commit candidates.""" + + def __init__(self, viking_fs: Any): + self._viking_fs = viking_fs + self._agfs = AsyncAGFSClient(viking_fs.agfs) + self._lock = asyncio.Lock() + self._ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.ROOT) + self._initialized = False + self._runtime_next_check_at: Dict[Tuple[str, str, str], str] = {} + + async def initialize(self) -> None: + if self._initialized: + return + async with self._lock: + if self._initialized: + return + logger.info("SessionAutoCommitIndex ready at %s", SESSION_AUTO_COMMIT_INDEX_URI) + self._initialized = True + + async def list_sessions(self) -> List[IndexedSession]: + await self.initialize() + async with self._lock: + data = await self._read_index_file(SESSION_AUTO_COMMIT_INDEX_URI) + normalized = _normalize_index_data(data) + results: List[IndexedSession] = [] + for item in _iter_indexed_sessions(normalized): + results.append( + IndexedSession( + account_id=item.account_id, + user_id=item.user_id, + session_id=item.session_id, + next_check_at=self._runtime_next_check_at.get(_session_key(item), ""), + ) + ) + return results + + async def get_next_due_sessions(self, now: datetime) -> List[IndexedSession]: + await self.initialize() + due: List[IndexedSession] = [] + async with self._lock: + for item in _iter_runtime_sessions(self._runtime_next_check_at): + is_due = is_next_check_due(self._runtime_next_check_at[_session_key(item)], now) + if is_due is None: + continue + if is_due: + due.append(item) + return due + + async def upsert_session( + self, + account_id: str, + user_id: str, + session_id: str, + *, + next_check_at: str, + ) -> None: + await self.initialize() + async with self._lock: + data = await self._read_current_index_locked() + session_node = ( + data.setdefault("data", {}).setdefault(account_id, {}).setdefault(user_id, {}) + ) + key = _session_key_from_parts(account_id, user_id, session_id) + self._runtime_next_check_at[key] = next_check_at + if session_id in session_node: + return + session_node[session_id] = {} + data.setdefault("meta", {})["updated_at"] = get_current_timestamp() + await self._persist_locked(data) + + async def remove_session(self, account_id: str, user_id: str, session_id: str) -> None: + await self.initialize() + async with self._lock: + self._runtime_next_check_at.pop( + _session_key_from_parts(account_id, user_id, session_id), None + ) + data = await self._read_current_index_locked() + users = data.get("data", {}).get(account_id) + if not isinstance(users, dict): + return + sessions = users.get(user_id) + if not isinstance(sessions, dict) or session_id not in sessions: + return + sessions.pop(session_id, None) + if not sessions: + users.pop(user_id, None) + if not users: + data.get("data", {}).pop(account_id, None) + data.setdefault("meta", {})["updated_at"] = get_current_timestamp() + await self._persist_locked(data) + + async def refresh_runtime_session(self, session: Session) -> None: + await self.initialize() + async with self._lock: + next_check_at = _compute_runtime_next_check_at(session) + key = _session_key_from_parts( + session.ctx.account_id, session.ctx.user.user_id, session.session_id + ) + if next_check_at: + self._runtime_next_check_at[key] = next_check_at + else: + self._runtime_next_check_at.pop(key, None) + + async def sync_runtime_state(self, load_session: Any) -> List[IndexedSession]: + await self.initialize() + async with self._lock: + data = await self._read_current_index_locked() + indexed_items = list(_iter_indexed_sessions(data)) + current_keys = {_session_key(item) for item in indexed_items} + stale_keys = [key for key in self._runtime_next_check_at if key not in current_keys] + for key in stale_keys: + self._runtime_next_check_at.pop(key, None) + missing_items = [ + item + for item in indexed_items + if _session_key(item) not in self._runtime_next_check_at + ] + + resolved_next_check_at: Dict[Tuple[str, str, str], str] = {} + removable_keys: set[Tuple[str, str, str]] = set() + for item in missing_items: + try: + session = await load_session(item.account_id, item.user_id, item.session_id) + except Exception: + logger.debug( + "SessionAutoCommitIndex failed to sync runtime session %s/%s/%s", + item.account_id, + item.user_id, + item.session_id, + exc_info=True, + ) + continue + if session is None: + removable_keys.add(_session_key(item)) + continue + next_check_at = _compute_runtime_next_check_at(session) + if not next_check_at: + removable_keys.add(_session_key(item)) + continue + resolved_next_check_at[_session_key(item)] = next_check_at + + async with self._lock: + data = await self._read_current_index_locked() + indexed_items = list(_iter_indexed_sessions(data)) + changed = False + + current_keys = {_session_key(item) for item in indexed_items} + stale_keys = [key for key in self._runtime_next_check_at if key not in current_keys] + for key in stale_keys: + self._runtime_next_check_at.pop(key, None) + + for key, next_check_at in resolved_next_check_at.items(): + if key in current_keys: + self._runtime_next_check_at[key] = next_check_at + + for item in indexed_items: + key = _session_key(item) + if key in removable_keys: + changed = _remove_membership_from_data(data, item) or changed + self._runtime_next_check_at.pop(key, None) + + if changed: + data.setdefault("meta", {})["updated_at"] = get_current_timestamp() + await self._persist_locked(data) + indexed_items = list(_iter_indexed_sessions(data)) + + results: List[IndexedSession] = [] + for item in indexed_items: + results.append( + IndexedSession( + account_id=item.account_id, + user_id=item.user_id, + session_id=item.session_id, + next_check_at=self._runtime_next_check_at.get(_session_key(item), ""), + ) + ) + return results + + async def _read_current_index_locked(self) -> Dict[str, Any]: + data = await self._read_index_file(SESSION_AUTO_COMMIT_INDEX_URI) + return _normalize_index_data(data) + + async def _read_index_file(self, path: str) -> Optional[Dict[str, Any]]: + try: + raw = await self._agfs.read(path) + except Exception as exc: + if _is_index_file_missing(exc, path): + logger.debug("SessionAutoCommitIndex index file missing: %s", path) + return None + logger.warning("SessionAutoCommitIndex failed to read %s: %s", path, exc) + return None + content = raw.decode("utf-8") if isinstance(raw, bytes) else str(raw) + if not content or not content.strip(): + logger.debug("SessionAutoCommitIndex read empty content from %s", path) + return None + try: + return json.loads(content) + except json.JSONDecodeError as exc: + logger.warning("Invalid session auto-commit index JSON in %s: %s", path, exc) + return None + + async def _persist_locked(self, data: Dict[str, Any]) -> None: + content = json.dumps(data, ensure_ascii=False, indent=2) + json.loads(content) + + await self._ensure_parent_dirs(SESSION_AUTO_COMMIT_INDEX_URI) + await self._agfs.write(SESSION_AUTO_COMMIT_INDEX_URI, content.encode("utf-8")) + logger.debug( + "SessionAutoCommitIndex persisted sessions=%d updated_at=%s", + len(_iter_indexed_sessions(data)), + data.get("meta", {}).get("updated_at", ""), + ) + + async def _ensure_parent_dirs(self, path: str) -> None: + parent = path.rsplit("/", 1)[0] + try: + await self._agfs.ensure_parent_dirs(path) + return + except AttributeError: + if parent: + await self._agfs.mkdir(parent) + return + except Exception as exc: + logger.warning("Failed to ensure session auto-commit index parent dirs: %s", exc) + raise + + +class SessionAutoCommitScheduler: + """Scheduler for idle-based automatic session commits.""" + + DEFAULT_CHECK_INTERVAL = 60.0 + + def __init__( + self, + session_service: Any, + config: Any, + *, + check_interval: Optional[float] = None, + ): + self._session_service = session_service + self._config = config + self._check_interval = ( + self.DEFAULT_CHECK_INTERVAL if check_interval is None else float(check_interval) + ) + self._index: Optional[SessionAutoCommitIndex] = None + self._running = False + self._task: Optional[asyncio.Task] = None + + @property + def index(self) -> Optional[SessionAutoCommitIndex]: + return self._index + + async def start(self) -> None: + if self._running: + return + self._index = SessionAutoCommitIndex(self._session_service.viking_fs) + await self._index.initialize() + self._running = True + logger.info( + "SessionAutoCommitScheduler started with check interval %.3fs", self._check_interval + ) + self._task = asyncio.create_task(self._run_loop()) + + async def stop(self) -> None: + self._running = False + if self._task is not None: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def _run_loop(self) -> None: + while self._running: + try: + if self._config.idle_enabled and self._index is not None: + indexed = await self._index.sync_runtime_state(self._load_session_for_runtime) + due = await self._index.get_next_due_sessions(datetime.now()) + if due: + due_details = [ + f"{item.account_id}/{item.user_id}/{item.session_id}@{item.next_check_at}" + for item in due + ] + logger.info( + "SessionAutoCommitScheduler indexed=%d due=%d sessions=%s", + len(indexed), + len(due), + due_details, + ) + for item in due: + ctx = RequestContext( + user=UserIdentifier(account_id=item.account_id, user_id=item.user_id), + role=Role.USER, + ) + await self._session_service.maybe_schedule_auto_commit( + item.session_id, + ctx, + reason_hint="idle_timeout", + ) + except Exception as exc: + logger.error("Session auto-commit scheduler loop failed: %s", exc, exc_info=True) + try: + await asyncio.sleep(self._check_interval) + except asyncio.CancelledError: + break + + async def _load_session_for_runtime( + self, account_id: str, user_id: str, session_id: str + ) -> Optional[Session]: + ctx = RequestContext( + user=UserIdentifier(account_id=account_id, user_id=user_id), + role=Role.USER, + ) + try: + return await self._session_service.get(session_id, ctx, auto_create=False) + except Exception: + logger.debug( + "SessionAutoCommitScheduler failed to load session for runtime sync: %s/%s/%s", + account_id, + user_id, + session_id, + exc_info=True, + ) + return None + + +def should_enable_auto_commit(policy: Optional[Dict[str, Any]]) -> bool: + return bool(isinstance(policy, dict) and policy.get("enabled") is True) + + +def get_idle_timeout_seconds(policy: Optional[Dict[str, Any]]) -> Optional[int]: + if not isinstance(policy, dict): + return None + value = policy.get("idle_timeout_seconds") + if value is None: + return None + try: + seconds = int(value) + except (TypeError, ValueError): + return None + return seconds if seconds > 0 else None + + +def get_token_threshold(policy: Optional[Dict[str, Any]]) -> Optional[int]: + if not isinstance(policy, dict): + return None + value = policy.get("token_threshold") + if value is None: + return None + try: + threshold = int(value) + except (TypeError, ValueError): + return None + return threshold if threshold >= 0 else None + + +def compute_next_check_at(last_message_at: str, idle_timeout_seconds: int) -> Optional[str]: + if not last_message_at: + return None + try: + base = datetime.fromisoformat(last_message_at) + except Exception: + return None + return (base + timedelta(seconds=idle_timeout_seconds)).isoformat() + + +def is_next_check_due(next_check_at: str, now: datetime) -> Optional[bool]: + try: + next_dt = datetime.fromisoformat(next_check_at) + except Exception: + return None + + compare_now = now + if next_dt.tzinfo is not None: + if compare_now.tzinfo is None: + compare_now = datetime.fromtimestamp(compare_now.timestamp(), tz=next_dt.tzinfo) + else: + compare_now = compare_now.astimezone(next_dt.tzinfo) + elif compare_now.tzinfo is not None: + compare_now = compare_now.replace(tzinfo=None) + + return next_dt <= compare_now + + +def _normalize_index_data(data: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not isinstance(data, dict): + return {"meta": {"updated_at": ""}, "data": {}} + normalized = json.loads(json.dumps(data, ensure_ascii=False)) + if not isinstance(normalized.get("meta"), dict): + normalized["meta"] = {"updated_at": ""} + if not isinstance(normalized.get("data"), dict): + normalized["data"] = {} + return normalized + + +def _session_key(item: IndexedSession) -> Tuple[str, str, str]: + return _session_key_from_parts(item.account_id, item.user_id, item.session_id) + + +def _session_key_from_parts(account_id: str, user_id: str, session_id: str) -> Tuple[str, str, str]: + return (account_id, user_id, session_id) + + +def _iter_runtime_sessions( + runtime_next_check_at: Dict[Tuple[str, str, str], str], +) -> List[IndexedSession]: + results: List[IndexedSession] = [] + for key in runtime_next_check_at: + account_id, user_id, session_id = key + results.append( + IndexedSession( + account_id=account_id, + user_id=user_id, + session_id=session_id, + next_check_at=runtime_next_check_at[key], + ) + ) + return results + + +def _compute_runtime_next_check_at(session: Session) -> Optional[str]: + policy = session.meta.auto_commit_policy + if not should_enable_auto_commit(policy): + return None + idle_timeout = get_idle_timeout_seconds(policy) + if idle_timeout is None: + return None + keep_recent_count = int(session.meta.keep_recent_count or 0) + has_uncommitted = bool( + int(session.meta.pending_tokens or 0) > 0 + or int(session.meta.message_count or 0) > keep_recent_count + ) + if not has_uncommitted: + return None + return compute_next_check_at(session.meta.last_message_at, idle_timeout) + + +def _remove_membership_from_data(data: Dict[str, Any], item: IndexedSession) -> bool: + users = data.get("data", {}).get(item.account_id) + if not isinstance(users, dict): + return False + sessions = users.get(item.user_id) + if not isinstance(sessions, dict) or item.session_id not in sessions: + return False + sessions.pop(item.session_id, None) + if not sessions: + users.pop(item.user_id, None) + if not users: + data.get("data", {}).pop(item.account_id, None) + return True + + +def _is_index_file_missing(exc: Exception, path: str) -> bool: + text = str(exc).strip() + if text == path: + return True + lower = text.lower() + return path in text and ( + "not found" in lower or "no such file" in lower or "does not exist" in lower + ) + + +def _iter_indexed_sessions(index_data: Dict[str, Any]) -> List[IndexedSession]: + results: List[IndexedSession] = [] + data = index_data.get("data", {}) + if not isinstance(data, dict): + return results + for account_id, users in data.items(): + if not isinstance(users, dict): + continue + for user_id, sessions in users.items(): + if not isinstance(sessions, dict): + continue + for session_id, payload in sessions.items(): + if not isinstance(payload, dict): + continue + results.append( + IndexedSession( + account_id=account_id, + user_id=user_id, + session_id=session_id, + ) + ) + return results diff --git a/openviking/service/session_service.py b/openviking/service/session_service.py index 806af8ffdd..c939381b6e 100644 --- a/openviking/service/session_service.py +++ b/openviking/service/session_service.py @@ -6,17 +6,28 @@ Provides session management operations: session, sessions, add_message, commit, delete. """ +import asyncio +from datetime import datetime from typing import TYPE_CHECKING, Any, Dict, List, Optional from openviking.core.namespace import canonical_session_uri -from openviking.server.config import ToolOutputExternalizationConfig +from openviking.server.config import SessionAutoCommitConfig, ToolOutputExternalizationConfig from openviking.server.identity import RequestContext +from openviking.service.session_auto_commit import ( + SessionAutoCommitIndex, + compute_next_check_at, + get_idle_timeout_seconds, + get_token_threshold, + is_next_check_due, + should_enable_auto_commit, +) from openviking.service.task_tracker import get_task_tracker from openviking.session import Session from openviking.session.memory.memory_type_registry import MemoryTypeRegistry from openviking.session.memory_policy import MemoryPolicy from openviking.storage import VikingDBManager from openviking.storage.viking_fs import VikingFS +from openviking.utils.time_utils import get_current_timestamp from openviking_cli.exceptions import ( AlreadyExistsError, NotFoundError, @@ -43,6 +54,10 @@ def __init__( self._viking_fs = viking_fs self._session_compressor = session_compressor self._tool_output_externalization_config = ToolOutputExternalizationConfig() + self._session_auto_commit_config = SessionAutoCommitConfig() + self._auto_commit_index: Optional[SessionAutoCommitIndex] = None + self._auto_commit_claims: set[tuple[str, str, str]] = set() + self._auto_commit_claims_lock = asyncio.Lock() def set_dependencies( self, @@ -61,11 +76,24 @@ def set_tool_output_externalization_config( """Set tool output externalization controls for newly created sessions.""" self._tool_output_externalization_config = config.model_copy(deep=True) + def set_session_auto_commit_config(self, config: SessionAutoCommitConfig) -> None: + """Set server-wide controls for automatic session commits.""" + self._session_auto_commit_config = config.model_copy(deep=True) + + def set_auto_commit_index(self, index: SessionAutoCommitIndex) -> None: + """Set shared idle auto-commit index.""" + self._auto_commit_index = index + def _ensure_initialized(self) -> None: """Ensure all dependencies are initialized.""" if not self._viking_fs: raise NotInitializedError("VikingFS") + @property + def viking_fs(self) -> VikingFS: + self._ensure_initialized() + return self._viking_fs + @staticmethod def _record_lifecycle_metric(action: str, status: str) -> None: """Best-effort session lifecycle metrics should never break the main flow.""" @@ -240,6 +268,7 @@ async def delete(self, session_id: str, ctx: RequestContext) -> bool: raise NotFoundError(session_id, "session") await self._viking_fs.rm(session_uri, recursive=True, ctx=ctx) + await self._remove_from_auto_commit_index(ctx.account_id, ctx.user.user_id, session_id) logger.info(f"Deleted session: {session_id}") self._record_lifecycle_metric("delete", "ok") return True @@ -292,6 +321,8 @@ async def commit_async( result = await session.commit_async(keep_recent_count=keep_recent_count) self._record_lifecycle_metric("commit", "ok" if result.get("status") else "error") self._record_archive_metric("ok" if result.get("archived") else "skip") + if result.get("archived"): + await self._reconcile_auto_commit_index(session) return result async def get_commit_task(self, task_id: str, ctx: RequestContext) -> Optional[Dict[str, Any]]: @@ -328,3 +359,170 @@ async def extract(self, session_id: str, ctx: RequestContext) -> List[Any]: ) self._record_lifecycle_metric("extract", "ok") return memories + + async def persist_auto_commit_policy_and_schedule( + self, + session: Session, + *, + auto_commit_policy: Any, + policy_provided: bool, + ) -> None: + """Persist message-time auto-commit policy/meta updates and maintain idle index.""" + if policy_provided: + session.meta.auto_commit_policy = auto_commit_policy + next_keep_recent_count = 0 + if isinstance(auto_commit_policy, dict): + try: + next_keep_recent_count = max( + 0, + int(auto_commit_policy.get("keep_recent_count", 0) or 0), + ) + except (TypeError, ValueError): + next_keep_recent_count = 0 + current_keep_recent_count = max(0, int(session.meta.keep_recent_count or 0)) + session.meta.keep_recent_count = next_keep_recent_count + if current_keep_recent_count != next_keep_recent_count: + session._rebuild_pending_tokens() + session.meta.last_message_at = get_current_timestamp() + await session._save_meta() + await self._reconcile_auto_commit_index(session) + + async def maybe_schedule_auto_commit( + self, + session_id: str, + ctx: RequestContext, + *, + reason_hint: str, + ) -> bool: + """Best-effort automatic commit scheduler entrypoint.""" + session = await self.get(session_id, ctx, auto_create=False) + policy = session.meta.auto_commit_policy + if not should_enable_auto_commit(policy): + return False + + if reason_hint == "token_threshold": + threshold = get_token_threshold(policy) + if threshold is None or int(session.meta.pending_tokens or 0) < threshold: + return False + elif reason_hint == "idle_timeout": + if not self._session_auto_commit_config.idle_enabled: + return False + idle_timeout = get_idle_timeout_seconds(policy) + if idle_timeout is None or not self._has_uncommitted_content(session): + await self._remove_from_auto_commit_index( + ctx.account_id, ctx.user.user_id, session_id + ) + return False + next_check_at = compute_next_check_at(session.meta.last_message_at, idle_timeout) + if not next_check_at: + return False + is_due = is_next_check_due(next_check_at, datetime.now()) + if is_due is None: + return False + if not is_due: + return False + else: + return False + + claim = (ctx.account_id, ctx.user.user_id, session_id) + async with self._auto_commit_claims_lock: + if claim in self._auto_commit_claims: + return False + tracker = get_task_tracker() + if await tracker.has_running( + "session_commit", + session_id, + account_id=ctx.account_id, + user_id=ctx.user.user_id, + ): + return False + self._auto_commit_claims.add(claim) + + asyncio.create_task(self.run_auto_commit(session_id, ctx, reason=reason_hint)) + return True + + async def run_auto_commit(self, session_id: str, ctx: RequestContext, *, reason: str) -> None: + """Run one best-effort automatic commit and release process-local claim.""" + claim = (ctx.account_id, ctx.user.user_id, session_id) + try: + tracker = get_task_tracker() + if await tracker.has_running( + "session_commit", + session_id, + account_id=ctx.account_id, + user_id=ctx.user.user_id, + ): + return + + session = await self.get(session_id, ctx, auto_create=False) + policy = session.meta.auto_commit_policy + if not should_enable_auto_commit(policy): + return + + keep_recent_count = int((policy or {}).get("keep_recent_count", 0) or 0) + result = await session.commit_async(keep_recent_count=keep_recent_count) + if result.get("archived"): + session.meta.auto_commit_last_error = "" + session.meta.auto_commit_last_error_at = "" + await session._save_meta() + await self._reconcile_auto_commit_index(session) + except Exception as exc: + logger.warning("Automatic session commit failed for %s: %s", session_id, exc) + try: + session = await self.get(session_id, ctx, auto_create=False) + session.meta.auto_commit_last_error = str(exc) + session.meta.auto_commit_last_error_at = get_current_timestamp() + await session._save_meta() + except Exception: + logger.debug( + "Failed to persist auto-commit error for %s", session_id, exc_info=True + ) + finally: + async with self._auto_commit_claims_lock: + self._auto_commit_claims.discard(claim) + + async def _reconcile_auto_commit_index(self, session: Session) -> None: + if self._auto_commit_index is None: + return + policy = session.meta.auto_commit_policy + enabled = should_enable_auto_commit(policy) + idle_timeout = get_idle_timeout_seconds(policy) + account_id = session.ctx.account_id + user_id = session.ctx.user.user_id + session_id = session.session_id + + if ( + not enabled + or idle_timeout is None + or not self._session_auto_commit_config.idle_enabled + or not self._has_uncommitted_content(session) + ): + await self._remove_from_auto_commit_index(account_id, user_id, session_id) + return + + next_check_at = compute_next_check_at(session.meta.last_message_at, idle_timeout) + if not next_check_at: + await self._remove_from_auto_commit_index(account_id, user_id, session_id) + return + await self._auto_commit_index.upsert_session( + account_id, + user_id, + session_id, + next_check_at=next_check_at, + ) + await self._auto_commit_index.refresh_runtime_session(session) + + async def _remove_from_auto_commit_index( + self, account_id: str, user_id: str, session_id: str + ) -> None: + if self._auto_commit_index is None: + return + await self._auto_commit_index.remove_session(account_id, user_id, session_id) + + @staticmethod + def _has_uncommitted_content(session: Session) -> bool: + keep_recent_count = int(session.meta.keep_recent_count or 0) + return bool( + int(session.meta.pending_tokens or 0) > 0 + or int(session.meta.message_count or 0) > keep_recent_count + ) diff --git a/openviking/session/session.py b/openviking/session/session.py index 9585e7393f..5d8097657a 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -271,6 +271,10 @@ class SessionMeta: # process restarts. keep_recent_count: int = 0 memory_policy: Optional[Dict[str, Any]] = None + auto_commit_policy: Optional[Dict[str, Any]] = None + last_message_at: str = "" + auto_commit_last_error: str = "" + auto_commit_last_error_at: str = "" def to_dict(self) -> Dict[str, Any]: data = { @@ -288,6 +292,12 @@ def to_dict(self) -> Dict[str, Any]: "pending_tokens": self.pending_tokens, "keep_recent_count": self.keep_recent_count, "memory_policy": dict(self.memory_policy) if self.memory_policy is not None else None, + "auto_commit_policy": ( + dict(self.auto_commit_policy) if self.auto_commit_policy is not None else None + ), + "last_message_at": self.last_message_at, + "auto_commit_last_error": self.auto_commit_last_error, + "auto_commit_last_error_at": self.auto_commit_last_error_at, } if self.total_message_count is not None: data["total_message_count"] = self.total_message_count @@ -331,6 +341,10 @@ def from_dict(cls, data: Dict[str, Any]) -> "SessionMeta": pending_tokens=max(0, int(data.get("pending_tokens", 0) or 0)), keep_recent_count=max(0, int(data.get("keep_recent_count", 0) or 0)), memory_policy=data.get("memory_policy"), + auto_commit_policy=data.get("auto_commit_policy"), + last_message_at=data.get("last_message_at", ""), + auto_commit_last_error=data.get("auto_commit_last_error", ""), + auto_commit_last_error_at=data.get("auto_commit_last_error_at", ""), ) @@ -439,6 +453,14 @@ async def load(self): self._meta.created_by_account_id = self.ctx.account_id if not self._meta.created_by_user_id: self._meta.created_by_user_id = self.ctx.user.user_id + # Always reconcile live message counters from messages.jsonl so restart + # recovery does not trust stale .meta.json values. + self._meta.message_count = len(self._messages) + if self._meta.total_message_count is not None: + self._meta.total_message_count = max( + int(self._meta.total_message_count or 0), + self._meta.commit_count + len(self._messages), + ) # WM v2: always rebuild pending_tokens from current messages so the # counter stays consistent across restarts and is also backfilled for # legacy sessions whose .meta.json predates these fields. O(n) once, diff --git a/openviking_cli/client/base.py b/openviking_cli/client/base.py index d23a30de73..549863f775 100644 --- a/openviking_cli/client/base.py +++ b/openviking_cli/client/base.py @@ -316,6 +316,7 @@ async def add_message( parts: list[dict] | None = None, created_at: str | None = None, peer_id: str | None = None, + auto_commit_policy: dict[str, Any] | None = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add a message to a session. @@ -327,6 +328,7 @@ async def add_message( parts: Parts array (full Part support: TextPart, ContextPart, ImagePart, ToolPart) created_at: Message creation time (ISO format string) peer_id: Optional stable interaction peer identity. + auto_commit_policy: Optional session-level auto-commit policy persisted by the server. telemetry: Whether to attach operation telemetry data to the result. If both content and parts are provided, parts takes precedence. @@ -338,6 +340,7 @@ async def batch_add_messages( self, session_id: str, messages: list[dict], + auto_commit_policy: dict[str, Any] | None = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add multiple messages to a session in a single request. @@ -346,6 +349,7 @@ async def batch_add_messages( session_id: Session ID messages: List of message dicts, each with "role" and optionally "content", "parts", "created_at", "peer_id". + auto_commit_policy: Optional session-level auto-commit policy sent once at batch top level. telemetry: Whether to attach operation telemetry data to the result. Returns: diff --git a/pyproject.toml b/pyproject.toml index b36c306e22..5a79a0fde1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [build-system] requires = [ "setuptools>=61.0", - "setuptools-scm>=8.0", + "setuptools-scm>=8.0,<10.0", "cmake>=3.15", "maturin>=1.0,<2.0", "wheel", @@ -106,7 +106,7 @@ opengauss = [ dev = [ "mypy>=1.0.0", "ruff>=0.1.0", - "setuptools_scm>=10.0.0", + "setuptools_scm>=8.0,<10.0", ] doc = [ "sphinx>=7.0.0", @@ -130,7 +130,7 @@ ocr = [ ] build = [ "setuptools>=61.0", - "setuptools-scm>=8.0", + "setuptools-scm>=8.0,<10.0", "cmake>=3.15", "wheel", "build", diff --git a/sdk/python/README.md b/sdk/python/README.md index 4344a999c0..d96d7bdd5f 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -98,6 +98,23 @@ context = client.session("demo-session").get_session_context(token_budget=4096) print("context:", context) ``` +Enable session auto commit from the SDK: + +```python +policy = { + "enabled": True, + "token_threshold": 512, + "idle_timeout_seconds": 60, + "keep_recent_count": 2, +} + +client.session("demo-session").add_message( + "user", + "remember this automatically", + auto_commit_policy=policy, +) +``` + ## Quick Start: Async Client ```python diff --git a/sdk/python/README_CN.md b/sdk/python/README_CN.md index 3c84adac16..dcde43f2fb 100644 --- a/sdk/python/README_CN.md +++ b/sdk/python/README_CN.md @@ -98,6 +98,23 @@ context = client.session("demo-session").get_session_context(token_budget=4096) print("context:", context) ``` +也可以通过 SDK 直接开启 session 自动 commit: + +```python +policy = { + "enabled": True, + "token_threshold": 512, + "idle_timeout_seconds": 60, + "keep_recent_count": 2, +} + +client.session("demo-session").add_message( + "user", + "让它自动提交", + auto_commit_policy=policy, +) +``` + ## 快速开始:异步客户端 ```python diff --git a/sdk/python/openviking_sdk/client.py b/sdk/python/openviking_sdk/client.py index fa907092c2..c2ee9d6c29 100644 --- a/sdk/python/openviking_sdk/client.py +++ b/sdk/python/openviking_sdk/client.py @@ -86,6 +86,7 @@ async def add_message( parts: list[dict] | None = None, created_at: str | None = None, peer_id: str | None = None, + auto_commit_policy: dict | None = None, ) -> Dict[str, Any]: return await self._client.add_message( self.session_id, @@ -94,10 +95,19 @@ async def add_message( parts=parts, created_at=created_at, peer_id=peer_id, + auto_commit_policy=auto_commit_policy, ) - async def batch_add_messages(self, messages: list[dict]) -> Dict[str, Any]: - return await self._client.batch_add_messages(self.session_id, messages) + async def batch_add_messages( + self, + messages: list[dict], + auto_commit_policy: dict | None = None, + ) -> Dict[str, Any]: + return await self._client.batch_add_messages( + self.session_id, + messages, + auto_commit_policy=auto_commit_policy, + ) async def commit(self, keep_recent_count: int = 0) -> Dict[str, Any]: return await self._client.commit_session( @@ -129,6 +139,7 @@ def add_message( parts: list[dict] | None = None, created_at: str | None = None, peer_id: str | None = None, + auto_commit_policy: dict | None = None, ) -> Dict[str, Any]: return self._client.add_message( self.session_id, @@ -137,10 +148,19 @@ def add_message( parts=parts, created_at=created_at, peer_id=peer_id, + auto_commit_policy=auto_commit_policy, ) - def batch_add_messages(self, messages: list[dict]) -> Dict[str, Any]: - return self._client.batch_add_messages(self.session_id, messages) + def batch_add_messages( + self, + messages: list[dict], + auto_commit_policy: dict | None = None, + ) -> Dict[str, Any]: + return self._client.batch_add_messages( + self.session_id, + messages, + auto_commit_policy=auto_commit_policy, + ) def commit( self, @@ -444,10 +464,13 @@ async def batch_add_messages( self, session_id: str, messages: list[dict], + auto_commit_policy: dict | None = None, telemetry: Any = False, ) -> Dict[str, Any]: session_path = self._path_segment(session_id) payload: Dict[str, Any] = {"messages": messages} + if auto_commit_policy is not None: + payload["auto_commit_policy"] = auto_commit_policy if telemetry is not False: payload["telemetry"] = telemetry response = await self._http.post( @@ -1021,6 +1044,7 @@ async def add_message( parts: list[dict] | None = None, created_at: str | None = None, peer_id: str | None = None, + auto_commit_policy: dict | None = None, telemetry: Any = False, ) -> Dict[str, Any]: payload: Dict[str, Any] = {"role": role} @@ -1034,6 +1058,8 @@ async def add_message( payload["created_at"] = created_at if peer_id is not None: payload["peer_id"] = peer_id + if auto_commit_policy is not None: + payload["auto_commit_policy"] = auto_commit_policy if telemetry is not False: payload["telemetry"] = telemetry session_path = self._path_segment(session_id) @@ -1291,11 +1317,25 @@ def batch_add_messages( self, session_id: str, messages: list[dict], + auto_commit_policy: dict | None = None, telemetry: Any = False, ) -> Dict[str, Any]: if telemetry is False: - return run_async(self._async_client.batch_add_messages(session_id, messages)) - return run_async(self._async_client.batch_add_messages(session_id, messages, telemetry)) + return run_async( + self._async_client.batch_add_messages( + session_id, + messages, + auto_commit_policy=auto_commit_policy, + ) + ) + return run_async( + self._async_client.batch_add_messages( + session_id, + messages, + auto_commit_policy=auto_commit_policy, + telemetry=telemetry, + ) + ) def add_skill( self, @@ -1711,6 +1751,7 @@ def add_message( parts: list[dict] | None = None, created_at: str | None = None, peer_id: str | None = None, + auto_commit_policy: dict | None = None, telemetry: Any = False, ) -> Dict[str, Any]: kwargs = { @@ -1719,6 +1760,7 @@ def add_message( "parts": parts, "created_at": created_at, "peer_id": peer_id, + "auto_commit_policy": auto_commit_policy, } if telemetry is not False: kwargs["telemetry"] = telemetry diff --git a/sdk/python/tests/test_async_client_behaviors.py b/sdk/python/tests/test_async_client_behaviors.py index a78751c3b2..825cdba250 100644 --- a/sdk/python/tests/test_async_client_behaviors.py +++ b/sdk/python/tests/test_async_client_behaviors.py @@ -1,6 +1,6 @@ from pathlib import Path from types import SimpleNamespace -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest from openviking_sdk import AsyncHTTPClient, SyncHTTPClient @@ -36,6 +36,38 @@ async def test_async_http_client_batch_add_messages_posts_batch_payload(): ) +@pytest.mark.asyncio +async def test_async_http_client_add_message_posts_auto_commit_policy(): + client = AsyncHTTPClient(url="http://localhost:1933") + fake_http = SimpleNamespace(post=AsyncMock(return_value=object())) + client._http = fake_http + client._handle_response_data = lambda _response: {"result": {"message_id": "msg-1"}} + + policy = { + "enabled": True, + "token_threshold": 128, + "idle_timeout_seconds": 30, + "keep_recent_count": 2, + } + + result = await client.add_message( + "demo-session", + role="user", + content="hello", + auto_commit_policy=policy, + ) + + assert result == {"message_id": "msg-1"} + fake_http.post.assert_awaited_once_with( + "/api/v1/sessions/demo-session/messages", + json={ + "role": "user", + "content": "hello", + "auto_commit_policy": policy, + }, + ) + + @pytest.mark.asyncio async def test_async_http_client_batch_add_messages_url_encodes_session_id(): client = AsyncHTTPClient(url="http://localhost:1933") @@ -61,6 +93,35 @@ async def test_async_http_client_batch_add_messages_url_encodes_session_id(): ) +@pytest.mark.asyncio +async def test_async_http_client_batch_add_messages_posts_top_level_auto_commit_policy(): + client = AsyncHTTPClient(url="http://localhost:1933") + fake_http = SimpleNamespace(post=AsyncMock(return_value=object())) + client._http = fake_http + client._handle_response_data = lambda _response: { + "result": {"session_id": "batch-session", "message_count": 2, "added": 2} + } + + messages = [{"role": "user", "content": "hello"}] + policy = { + "enabled": True, + "idle_timeout_seconds": 45, + "keep_recent_count": 1, + } + + result = await client.batch_add_messages( + "batch-session", + messages, + auto_commit_policy=policy, + ) + + assert result == {"session_id": "batch-session", "message_count": 2, "added": 2} + fake_http.post.assert_awaited_once_with( + "/api/v1/sessions/batch-session/messages/batch", + json={"messages": messages, "auto_commit_policy": policy}, + ) + + @pytest.mark.asyncio async def test_async_http_client_reindex_posts_content_reindex(): client = AsyncHTTPClient(url="http://localhost:1933") @@ -132,7 +193,40 @@ def test_sync_http_client_batch_add_messages_forwards_to_async_client(): assert result == {"session_id": "batch-session", "message_count": 2, "added": 2} assert mock_run.called - mock_batch.assert_called_once_with("batch-session", messages) + mock_batch.assert_called_once_with( + "batch-session", + messages, + auto_commit_policy=None, + ) + + +def test_sync_http_client_batch_add_messages_forwards_auto_commit_policy(): + client = SyncHTTPClient(url="http://localhost:1933") + messages = [{"role": "user", "content": "hello"}] + policy = {"enabled": True, "idle_timeout_seconds": 20} + + with patch.object( + client._async_client, + "batch_add_messages", + return_value={"session_id": "batch-session", "message_count": 1, "added": 1}, + ) as mock_batch: + with patch( + "openviking_sdk.client.run_async", + return_value={"session_id": "batch-session", "message_count": 1, "added": 1}, + ) as mock_run: + result = client.batch_add_messages( + "batch-session", + messages, + auto_commit_policy=policy, + ) + + assert result == {"session_id": "batch-session", "message_count": 1, "added": 1} + assert mock_run.called + mock_batch.assert_called_once_with( + "batch-session", + messages, + auto_commit_policy=policy, + ) def test_sync_http_client_session_returns_sync_session_wrapper(): @@ -168,6 +262,34 @@ def test_sync_session_add_message_wraps_async_client(): parts=None, created_at=None, peer_id=None, + auto_commit_policy=None, + ) + + +def test_sync_session_add_message_wraps_auto_commit_policy(): + client = SyncHTTPClient(url="http://localhost:1933") + session = client.session("demo-session") + policy = {"enabled": True, "token_threshold": 64} + + with patch.object( + client._async_client, "add_message", Mock(return_value=object()) + ) as mock_add_message: + with patch( + "openviking_sdk.client.run_async", + return_value={"message_id": "msg-1"}, + ) as mock_run: + result = session.add_message("user", content="hello", auto_commit_policy=policy) + + assert result == {"message_id": "msg-1"} + assert mock_run.called + mock_add_message.assert_called_once_with( + "demo-session", + role="user", + content="hello", + parts=None, + created_at=None, + peer_id=None, + auto_commit_policy=policy, ) diff --git a/tests/misc/test_root_docker_image_packaging.py b/tests/misc/test_root_docker_image_packaging.py index f45aef729e..7230906467 100644 --- a/tests/misc/test_root_docker_image_packaging.py +++ b/tests/misc/test_root_docker_image_packaging.py @@ -85,6 +85,15 @@ def test_root_build_system_includes_maturin_for_isolated_builds(): assert 'shutil.which("maturin")' not in setup_py +def test_root_build_system_pins_setuptools_scm_below_v10_for_isolated_builds(): + pyproject = _read_text("pyproject.toml") + + assert '"setuptools-scm>=8.0,<10.0",' in pyproject + assert '"setuptools_scm>=8.0,<10.0",' in pyproject + assert '"setuptools-scm>=8.0",' not in pyproject + assert '"setuptools_scm>=10.0.0",' not in pyproject + + def test_root_build_system_honors_ci_compiler_overrides_and_requires_ragfs_for_wheels(): setup_py = _read_text("setup.py") build_workflow = _read_text(".github/workflows/_build.yml") diff --git a/tests/server/test_api_sessions.py b/tests/server/test_api_sessions.py index 5aaed5880c..74fc9fb353 100644 --- a/tests/server/test_api_sessions.py +++ b/tests/server/test_api_sessions.py @@ -13,11 +13,14 @@ from starlette.requests import Request from openviking.message import ImagePart, Message, TextPart +from openviking.pyagfs import AsyncAGFSClient +from openviking.pyagfs.exceptions import AGFSNotFoundError from openviking.server.app import create_app from openviking.server.config import ServerConfig, ToolOutputExternalizationConfig from openviking.server.dependencies import set_service from openviking.server.identity import RequestContext, Role from openviking.server.routers import sessions as sessions_router +from openviking.service import session_auto_commit from openviking_cli.session.user_id import UserIdentifier from openviking_cli.utils.config import OPENVIKING_CONFIG_ENV from openviking_cli.utils.config.open_viking_config import OpenVikingConfigSingleton @@ -424,6 +427,241 @@ async def test_add_message(client: httpx.AsyncClient): assert body["result"]["message_count"] == 1 +async def test_add_message_persists_auto_commit_policy_and_last_message_at( + client: httpx.AsyncClient, +): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "Hello, world!", + "auto_commit_policy": { + "enabled": True, + "token_threshold": 123, + "idle_timeout_seconds": 60, + "keep_recent_count": 5, + }, + }, + ) + assert resp.status_code == 200 + + session_resp = await client.get(f"/api/v1/sessions/{session_id}") + assert session_resp.status_code == 200 + result = session_resp.json()["result"] + assert result["auto_commit_policy"] == { + "enabled": True, + "token_threshold": 123, + "idle_timeout_seconds": 60, + "keep_recent_count": 5, + } + assert result["last_message_at"] + assert result["keep_recent_count"] == 5 + + +async def test_add_message_policy_updates_keep_recent_count_for_pending_tokens( + client: httpx.AsyncClient, +): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + first_resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "one two three four five six seven eight nine ten", + "auto_commit_policy": { + "enabled": True, + "token_threshold": 999999, + "idle_timeout_seconds": None, + "keep_recent_count": 0, + }, + }, + ) + assert first_resp.status_code == 200 + + before_resp = await client.get(f"/api/v1/sessions/{session_id}") + before_result = before_resp.json()["result"] + assert before_result["pending_tokens"] > 0 + + second_resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "eleven twelve thirteen fourteen fifteen sixteen", + "auto_commit_policy": { + "enabled": True, + "token_threshold": 999999, + "idle_timeout_seconds": None, + "keep_recent_count": 2, + }, + }, + ) + assert second_resp.status_code == 200 + + after_resp = await client.get(f"/api/v1/sessions/{session_id}") + after_result = after_resp.json()["result"] + assert after_result["keep_recent_count"] == 2 + assert after_result["message_count"] == 2 + assert after_result["pending_tokens"] == 0 + + +async def test_token_only_auto_commit_policy_does_not_enter_idle_index( + client: httpx.AsyncClient, + service, +): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "Hello, world!", + "auto_commit_policy": { + "enabled": True, + "token_threshold": 1, + "idle_timeout_seconds": None, + "keep_recent_count": 0, + }, + }, + ) + assert resp.status_code == 200 + + indexed = await service.sessions._auto_commit_index.list_sessions() + assert [item.session_id for item in indexed] == [] + + +async def test_idle_auto_commit_policy_enters_idle_index(client: httpx.AsyncClient, service): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "Hello, world!", + "auto_commit_policy": { + "enabled": True, + "token_threshold": None, + "idle_timeout_seconds": 60, + "keep_recent_count": 0, + }, + }, + ) + assert resp.status_code == 200 + + indexed = await service.sessions._auto_commit_index.list_sessions() + assert len(indexed) == 1 + assert indexed[0].session_id == session_id + assert indexed[0].next_check_at + + +async def test_idle_global_switch_disables_idle_indexing(client: httpx.AsyncClient, service): + service.sessions._session_auto_commit_config.idle_enabled = False + + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + resp = await client.post( + f"/api/v1/sessions/{session_id}/messages", + json={ + "role": "user", + "content": "Hello, world!", + "auto_commit_policy": { + "enabled": True, + "token_threshold": None, + "idle_timeout_seconds": 60, + "keep_recent_count": 0, + }, + }, + ) + assert resp.status_code == 200 + + indexed = await service.sessions._auto_commit_index.list_sessions() + assert [item.session_id for item in indexed] == [] + + +async def test_idle_index_persists_to_global_system_path(service): + index = session_auto_commit.SessionAutoCommitIndex(service.viking_fs) + await index.initialize() + await index.upsert_session( + "acct_a", + "user_b", + "session_c", + next_check_at="2026-06-22T12:00:00+08:00", + ) + + agfs = AsyncAGFSClient(service._agfs) + raw = await agfs.read("/local/_system/session_auto_commit/index.json") + content = raw.decode("utf-8") if isinstance(raw, bytes) else str(raw) + + assert '"session_c"' in content + + with pytest.raises(AGFSNotFoundError): + await agfs.read("/local/default/_system/session_auto_commit/index.json") + + +async def test_idle_index_does_not_rewrite_existing_session_membership(service): + index = session_auto_commit.SessionAutoCommitIndex(service.viking_fs) + await index.initialize() + await index.upsert_session( + "acct_a", + "user_b", + "session_c", + next_check_at="2026-06-22T12:00:00+08:00", + ) + + agfs = AsyncAGFSClient(service._agfs) + first_raw = await agfs.read("/local/_system/session_auto_commit/index.json") + first_content = first_raw.decode("utf-8") if isinstance(first_raw, bytes) else str(first_raw) + + await index.upsert_session( + "acct_a", + "user_b", + "session_c", + next_check_at="2026-06-22T13:00:00+08:00", + ) + + second_raw = await agfs.read("/local/_system/session_auto_commit/index.json") + second_content = ( + second_raw.decode("utf-8") if isinstance(second_raw, bytes) else str(second_raw) + ) + + assert second_content == first_content + + +async def test_session_load_recovers_message_count_from_live_messages(service): + ctx = RequestContext(user=UserIdentifier("acct_a", "user_b"), role=Role.ADMIN) + await service.initialize_account_directories(ctx) + await service.initialize_user_directories(ctx) + + session = await service.sessions.create(ctx) + session.add_message("user", [TextPart("我爱吃西瓜")]) + + session = await service.sessions.get(session.session_id, ctx, auto_create=False) + session.meta.message_count = 0 + session.meta.pending_tokens = 0 + await session._save_meta() + + reloaded = service.sessions.session(ctx, session.session_id) + await reloaded.load() + + assert len(reloaded.messages) == 1 + assert reloaded.meta.message_count == 1 + + +def test_auto_commit_index_uses_internal_control_path(): + assert ( + session_auto_commit.SESSION_AUTO_COMMIT_INDEX_URI + == "/local/_system/session_auto_commit/index.json" + ) + assert "/resources/" not in session_auto_commit.SESSION_AUTO_COMMIT_INDEX_URI + assert "/default/" not in session_auto_commit.SESSION_AUTO_COMMIT_INDEX_URI + + async def test_add_message_accepts_image_part(client: httpx.AsyncClient, service): create_resp = await client.post("/api/v1/sessions", json={}) session_id = create_resp.json()["result"]["session_id"] @@ -587,6 +825,31 @@ async def test_batch_add_message_accepts_mixed_parts(client: httpx.AsyncClient, assert isinstance(session.messages[0].parts[1], ImagePart) +async def test_batch_add_message_rejects_per_message_auto_commit_policy(client: httpx.AsyncClient): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + resp = await client.post( + f"/api/v1/sessions/{session_id}/messages/batch", + json={ + "messages": [ + { + "role": "user", + "content": "hello", + "auto_commit_policy": { + "enabled": True, + "token_threshold": 1, + "idle_timeout_seconds": None, + "keep_recent_count": 0, + }, + } + ] + }, + ) + + assert resp.status_code == 400 + + async def test_add_message_splits_tool_result_aggregate(client: httpx.AsyncClient): create_resp = await client.post("/api/v1/sessions", json={}) session_id = create_resp.json()["result"]["session_id"] diff --git a/tests/unit/service/test_core_encryption_startup.py b/tests/unit/service/test_core_encryption_startup.py index c6ac2bb9c1..86db9ab9aa 100644 --- a/tests/unit/service/test_core_encryption_startup.py +++ b/tests/unit/service/test_core_encryption_startup.py @@ -8,6 +8,7 @@ import pytest +from openviking.server.config import SessionAutoCommitConfig from openviking.service.core import OpenVikingService from openviking.utils.agfs_utils import RagfsBindingConfig @@ -122,3 +123,148 @@ def _acquire(path: str) -> str: assert calls == [] assert service._data_dir_lock_acquired is True + + +def test_session_auto_commit_config_defaults_to_idle_enabled(): + config = SessionAutoCommitConfig() + + assert config.idle_enabled is True + assert config.check_interval_seconds == 60.0 + + +def test_session_auto_commit_config_accepts_check_interval_override(): + config = SessionAutoCommitConfig(idle_enabled=True, check_interval_seconds=3.5) + + assert config.idle_enabled is True + assert config.check_interval_seconds == 3.5 + + +@pytest.mark.asyncio +async def test_initialize_skips_session_auto_commit_scheduler_when_idle_disabled(monkeypatch): + """Do not create or start the idle scheduler when idle auto-commit is globally disabled.""" + + scheduler_events: list[str] = [] + auto_commit_index_events: list[object] = [] + + async def _fake_init_context_collection(*_args, **_kwargs): + return None + + class _FakeWatchScheduler: + def __init__(self, resource_service, viking_fs): + self.resource_service = resource_service + self.viking_fs = viking_fs + + async def start(self): + return None + + class _FakeSessionAutoCommitScheduler: + def __init__(self, *args, **kwargs): + scheduler_events.append("init") + self.index = object() + + async def start(self): + scheduler_events.append("start") + + class _FakeDirectoryInitializer: + def __init__(self, vikingdb, viking_fs): + self.vikingdb = vikingdb + self.viking_fs = viking_fs + + async def initialize_account_directories(self, _ctx): + return 0 + + async def initialize_user_directories(self, _ctx): + return 0 + + class _FakeQueueManager: + def start(self): + return None + + class _FakeLockManager: + async def start(self): + return None + + class _FakeVikingDBManager: + def mark_closing(self): + return None + + monkeypatch.setattr( + "openviking.service.core.init_context_collection", + _fake_init_context_collection, + ) + monkeypatch.setattr("openviking.service.core.init_viking_fs", lambda **_kwargs: object()) + monkeypatch.setattr("openviking.service.core.DirectoryInitializer", _FakeDirectoryInitializer) + monkeypatch.setattr("openviking.service.core.WatchScheduler", _FakeWatchScheduler) + monkeypatch.setattr( + "openviking.service.core.SessionAutoCommitScheduler", + _FakeSessionAutoCommitScheduler, + ) + monkeypatch.setattr( + "openviking.service.core.create_session_compressor", + lambda **_kwargs: object(), + ) + monkeypatch.setattr("openviking.service.core.ResourceProcessor", lambda **_kwargs: object()) + monkeypatch.setattr("openviking.service.core.SkillProcessor", lambda **_kwargs: object()) + monkeypatch.setattr( + "openviking.service.core.get_openviking_config", + lambda: SimpleNamespace(rerank=object(), retrieval=object()), + ) + monkeypatch.setattr( + "openviking.server.dependencies.get_server_config", + lambda: SimpleNamespace(session_auto_commit=SessionAutoCommitConfig(idle_enabled=False)), + ) + + service = OpenVikingService.__new__(OpenVikingService) + service._initialized = False + service._data_dir_lock_acquired = True + service._config = SimpleNamespace( + embedding=SimpleNamespace( + max_concurrent=1, + dimension=1024, + get_embedder=lambda: SimpleNamespace(is_sparse=False), + ), + vlm=SimpleNamespace(max_concurrent=1), + storage=SimpleNamespace(skip_process_lock=True), + ) + service._user = SimpleNamespace() + service._encryptor = None + service._agfs_client = object() + service._queue_manager = _FakeQueueManager() + service._vikingdb_manager = _FakeVikingDBManager() + service._viking_fs = None + service._embedder = object() + service._resource_processor = None + service._skill_processor = None + service._session_compressor = None + service._lock_manager = _FakeLockManager() + service._directory_initializer = None + service._watch_scheduler = None + service._session_auto_commit_scheduler = None + service._privacy_config_service = None + service._fs_service = SimpleNamespace(set_dependencies=lambda **_kwargs: None) + service._relation_service = SimpleNamespace(set_viking_fs=lambda _fs: None) + service._pack_service = SimpleNamespace(set_dependencies=lambda **_kwargs: None) + service._search_service = SimpleNamespace(set_viking_fs=lambda _fs: None) + service._resource_memory_link_service = SimpleNamespace(set_dependencies=lambda **_kwargs: None) + service._resource_service = SimpleNamespace( + set_dependencies=lambda **_kwargs: None, + close_background_tasks=lambda: None, + ) + service._session_service = SimpleNamespace( + set_dependencies=lambda **_kwargs: None, + set_session_auto_commit_config=lambda config: setattr( + service, "_captured_session_auto_commit_config", config + ), + set_auto_commit_index=lambda index: auto_commit_index_events.append(index), + ) + service._debug_service = SimpleNamespace(set_dependencies=lambda **_kwargs: None) + service._init_storage = lambda *_args, **_kwargs: None + service._build_ragfs_binding_config = lambda: None + service._ensure_data_dir_lock_acquired = lambda: None + + await service.initialize() + + assert scheduler_events == [] + assert service._session_auto_commit_scheduler is None + assert auto_commit_index_events == [] + assert service._captured_session_auto_commit_config.idle_enabled is False diff --git a/tests/unit/service/test_session_auto_commit.py b/tests/unit/service/test_session_auto_commit.py new file mode 100644 index 0000000000..da438dfd5b --- /dev/null +++ b/tests/unit/service/test_session_auto_commit.py @@ -0,0 +1,100 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from openviking.server.identity import RequestContext, Role +from openviking.service.session_auto_commit import SessionAutoCommitIndex +from openviking_cli.session.user_id import UserIdentifier + + +class _FakeAGFS: + def __init__(self) -> None: + self._files: dict[str, bytes] = {} + + def read(self, path: str, **_kwargs) -> bytes: + if path not in self._files: + raise FileNotFoundError(path) + return self._files[path] + + def write(self, path: str, content: bytes, **_kwargs) -> None: + self._files[path] = content + + def ensure_parent_dirs(self, _path: str, **_kwargs) -> None: + return None + + +class _FakeVikingFS: + def __init__(self) -> None: + self.agfs = _FakeAGFS() + + +def _fake_session(account_id: str, user_id: str, session_id: str, *, next_check_at: str): + return SimpleNamespace( + ctx=RequestContext( + user=UserIdentifier(account_id=account_id, user_id=user_id), + role=Role.USER, + ), + session_id=session_id, + meta=SimpleNamespace( + auto_commit_policy={ + "enabled": True, + "idle_timeout_seconds": 60, + "keep_recent_count": 0, + }, + keep_recent_count=0, + pending_tokens=1, + message_count=1, + last_message_at="2026-06-22T12:00:00+08:00", + ), + ) + + +@pytest.mark.asyncio +async def test_runtime_index_handles_ids_containing_colons(): + index = SessionAutoCommitIndex(_FakeVikingFS()) + await index.initialize() + + await index.upsert_session( + "acct:west", + "user:red", + "session:42", + next_check_at="2026-06-22T12:01:00+08:00", + ) + + listed = await index.list_sessions() + + assert len(listed) == 1 + assert listed[0].account_id == "acct:west" + assert listed[0].user_id == "user:red" + assert listed[0].session_id == "session:42" + assert listed[0].next_check_at == "2026-06-22T12:01:00+08:00" + + +@pytest.mark.asyncio +async def test_sync_runtime_state_loads_sessions_outside_index_lock(): + index = SessionAutoCommitIndex(_FakeVikingFS()) + await index.initialize() + await index.upsert_session( + "acct_a", + "user_b", + "session_c", + next_check_at="2026-06-22T12:01:00+08:00", + ) + + async def slow_load_session(_account_id: str, _user_id: str, _session_id: str): + assert not index._lock.locked() + return _fake_session( + "acct_a", + "user_b", + "session_c", + next_check_at="2026-06-22T12:01:00+08:00", + ) + + synced = await index.sync_runtime_state(slow_load_session) + + assert [item.session_id for item in synced] == ["session_c"]