From 5a168e90424d4327bcc94afe82a912d8a6fe3892 Mon Sep 17 00:00:00 2001 From: Jorben Date: Mon, 8 Jun 2026 08:53:00 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix(core):=20=F0=9F=90=9B=20converge=20orph?= =?UTF-8?q?aned=20subagents=20and=20account=20final=20turns=20for=20judge-?= =?UTF-8?q?completed=20goals?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a run finishes while a subagent (e.g. a Judge) is still active in the DB, its helper row can linger at `running` forever because only live SubagentCompleted/Failed events flip the status. This adds a backstop in the run-finalization path that: - Marks all non-terminal helpers for the run as `interrupted` - Emits matching `SubagentFailed` events so the DB and live stream converge Additionally, a Judge can flip a goal to `complete` within the same run, but the normal turn-accounting path is skipped for non-active goals, causing the finished turn count to drop by one. A new `account_turn_if_unevaluated` function idempotently bills that final turn so the running-vs-finished display stays consistent. The frontend now also maps `helper_judge` to "Judge Agent". --- src-tauri/src/core/agent_run_event_handler.rs | 51 +++++- src-tauri/src/core/goal_manager.rs | 14 ++ src-tauri/src/persistence/repo/goal_repo.rs | 33 ++++ .../src/persistence/repo/run_helper_repo.rs | 150 ++++++++++++++++++ src-tauri/tests/goal_lifecycle.rs | 15 +- .../ui/runtime-thread-surface-helpers.test.ts | 4 + .../ui/runtime-thread-surface-helpers.ts | 2 + 7 files changed, 267 insertions(+), 2 deletions(-) diff --git a/src-tauri/src/core/agent_run_event_handler.rs b/src-tauri/src/core/agent_run_event_handler.rs index 6107563c..9f17b1d4 100644 --- a/src-tauri/src/core/agent_run_event_handler.rs +++ b/src-tauri/src/core/agent_run_event_handler.rs @@ -16,7 +16,7 @@ use crate::ipc::app_events::{ use crate::ipc::frontend_channels::ThreadStreamEvent; use crate::model::errors::{AppError, ErrorSource}; use crate::model::thread::{MessageRecord, RunStatus, ThreadStatus}; -use crate::persistence::repo::{message_repo, run_repo, thread_repo}; +use crate::persistence::repo::{message_repo, run_helper_repo, run_repo, thread_repo}; use super::agent_run_manager::{AgentRunManager, StartRunOptions}; @@ -913,6 +913,55 @@ impl AgentRunManager { }); } + // Backstop: converge any helper that is still non-terminal in the DB now + // that the run is finishing. A subagent (e.g. a Judge) whose own cleanup + // future was dropped during an interrupt/cancel can otherwise linger at + // `running` forever, since the frontend only flips helper status from + // live SubagentCompleted/Failed events. We mark them `interrupted` and + // emit a matching SubagentFailed so both the DB snapshot and the live + // event stream converge. + let helper_interrupt_reason = match status { + RunStatus::Cancelled => "Subagent interrupted because the run was cancelled.", + RunStatus::Interrupted => "Subagent interrupted because the run was interrupted.", + _ => "Subagent interrupted because the run finished while it was still active.", + }; + match run_helper_repo::interrupt_non_terminal_by_run( + &self.pool, + run_id, + helper_interrupt_reason, + ) + .await + { + Ok(orphaned) => { + if !orphaned.is_empty() { + tracing::warn!( + run_id = %run_id, + count = orphaned.len(), + "converged orphaned non-terminal run helpers to interrupted" + ); + } + for helper in orphaned { + let _ = frontend_tx.send(ThreadStreamEvent::SubagentFailed { + run_id: run_id.to_string(), + subtask_id: helper.id, + helper_kind: helper.helper_kind, + started_at: helper.started_at, + error: helper_interrupt_reason.to_string(), + snapshot: + crate::core::subagent::orchestrator::SubagentProgressSnapshot::default( + ), + }); + } + } + Err(error) => { + tracing::warn!( + run_id = %run_id, + %error, + "failed to converge orphaned run helpers while finishing run" + ); + } + } + finalize_run( &self.pool, self.app_events.as_ref(), diff --git a/src-tauri/src/core/goal_manager.rs b/src-tauri/src/core/goal_manager.rs index f3d3ff2b..3e7a346e 100644 --- a/src-tauri/src/core/goal_manager.rs +++ b/src-tauri/src/core/goal_manager.rs @@ -525,6 +525,20 @@ impl GoalManager { "goal is Complete without judge_passed; treating as terminal and not re-opening" ); } + + // A Judge can flip the goal to `complete` *inside* this run. The + // normal turn-accounting path below never runs for a non-active + // goal, so the final run that achieved the goal would otherwise not + // be billed — making the finished turn count drop by one versus + // what the user saw while the run was active. Idempotently account + // that final turn here (only the first call for this run id wins). + let mut goal = goal; + if goal_repo::account_turn_if_unevaluated(&self.pool, &goal.id, run_id).await? { + if let Some(updated) = self.get_active().await? { + goal = updated; + } + } + return Ok(Some(GoalEvaluationOutcome { goal: Self::to_payload(&goal), verdict: "skipped".to_string(), diff --git a/src-tauri/src/persistence/repo/goal_repo.rs b/src-tauri/src/persistence/repo/goal_repo.rs index 72a3c53a..c3a7baf7 100644 --- a/src-tauri/src/persistence/repo/goal_repo.rs +++ b/src-tauri/src/persistence/repo/goal_repo.rs @@ -190,6 +190,39 @@ pub async fn mark_evaluated_if_needed( Ok(result.rows_affected() > 0) } +/// Idempotently account one goal turn for a run, regardless of the goal's +/// current status. Increments `turns_used` and stamps `last_evaluated_run_id` +/// only when this run has not already been counted for the goal. Returns true +/// when a turn was actually accounted. +/// +/// Unlike `mark_evaluated_if_needed` (which only stamps the run id for *active* +/// goals as part of the continuation evaluation claim), this also covers the +/// case where a Judge flips the goal to `complete` inside the same run: the +/// terminal-run evaluation early-returns without ever billing that final turn, +/// so the run-level turn would otherwise be lost. Counting here keeps the +/// running-vs-finished turn display consistent. +pub async fn account_turn_if_unevaluated( + pool: &SqlitePool, + id: &str, + run_id: &str, +) -> Result { + let result = sqlx::query( + "UPDATE goals SET \ + turns_used = turns_used + 1, \ + last_evaluated_run_id = ?, \ + updated_at = ? \ + WHERE id = ? \ + AND (last_evaluated_run_id IS NULL OR last_evaluated_run_id != ?)", + ) + .bind(run_id) + .bind(Utc::now().to_rfc3339()) + .bind(id) + .bind(run_id) + .execute(pool) + .await?; + Ok(result.rows_affected() > 0) +} + pub async fn delete(pool: &SqlitePool, id: &str) -> Result { let result = sqlx::query("DELETE FROM goals WHERE id = ?") .bind(id) diff --git a/src-tauri/src/persistence/repo/run_helper_repo.rs b/src-tauri/src/persistence/repo/run_helper_repo.rs index 0eb59481..2acb3555 100644 --- a/src-tauri/src/persistence/repo/run_helper_repo.rs +++ b/src-tauri/src/persistence/repo/run_helper_repo.rs @@ -185,6 +185,62 @@ pub async fn mark_interrupted_if_active( Ok(result.rows_affected() > 0) } +/// Converge every still-active helper belonging to a single run onto a terminal +/// `interrupted` state and return the rows that were actually transitioned. +/// +/// Used as a backstop when a run reaches a terminal state (completed / failed / +/// cancelled / interrupted) while one of its helpers is still non-terminal in +/// the DB — e.g. a Judge subagent whose own cleanup future was dropped when the +/// run was interrupted, leaving it stuck at `running`. Selecting the affected +/// rows before the update lets the caller emit a matching `SubagentFailed` +/// event so the live event stream converges too, not just the DB snapshot. +pub async fn interrupt_non_terminal_by_run( + pool: &SqlitePool, + run_id: &str, + error_summary: &str, +) -> Result, AppError> { + let mut tx = pool.begin().await?; + + let rows = sqlx::query_as::<_, RunHelperRow>( + "SELECT id, run_id, thread_id, helper_kind, parent_tool_call_id, status, + input_summary, output_summary, error_summary, started_at, finished_at, + input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, total_tokens + FROM run_helpers + WHERE run_id = ? + AND status NOT IN ('completed', 'failed', 'interrupted', 'cancelled') + AND finished_at IS NULL + ORDER BY started_at ASC, id ASC", + ) + .bind(run_id) + .fetch_all(&mut *tx) + .await?; + + if rows.is_empty() { + tx.rollback().await?; + return Ok(Vec::new()); + } + + let now = Utc::now().to_rfc3339(); + sqlx::query( + "UPDATE run_helpers + SET status = 'interrupted', + error_summary = COALESCE(error_summary, ?), + finished_at = ? + WHERE run_id = ? + AND status NOT IN ('completed', 'failed', 'interrupted', 'cancelled') + AND finished_at IS NULL", + ) + .bind(error_summary) + .bind(&now) + .bind(run_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(rows.into_iter().map(RunHelperRow::into_dto).collect()) +} + /// Mark all non-terminal run helpers as interrupted (crash recovery). pub async fn interrupt_active_helpers(pool: &SqlitePool) -> Result { let result = sqlx::query( @@ -549,6 +605,100 @@ mod tests { } } + #[tokio::test] + async fn interrupt_non_terminal_by_run_only_affects_target_run() { + let pool = setup_test_pool().await; + + for (id, run_id, status) in &[ + ("h-run1-running", "run-1", "running"), + ("h-run1-waiting", "run-1", "waiting_tool_result"), + ("h-run1-completed", "run-1", "completed"), + ("h-run2-running", "run-2", "running"), + ] { + let helper = RunHelperInsert { + id: id.to_string(), + run_id: run_id.to_string(), + thread_id: "t1".into(), + helper_kind: "helper_judge".into(), + parent_tool_call_id: None, + status: status.to_string(), + model_role: "auxiliary".into(), + provider_id: None, + model_id: None, + input_summary: None, + }; + insert(&pool, &helper).await.unwrap(); + } + + let converged = interrupt_non_terminal_by_run(&pool, "run-1", "run ended") + .await + .unwrap(); + + // Only the two non-terminal run-1 helpers are returned. + let mut ids: Vec = converged.iter().map(|h| h.id.clone()).collect(); + ids.sort(); + assert_eq!(ids, vec!["h-run1-running", "h-run1-waiting"]); + + // run-1 non-terminal helpers are now interrupted with the error summary. + for id in ["h-run1-running", "h-run1-waiting"] { + let row = sqlx::query(&format!( + "SELECT status, error_summary, finished_at FROM run_helpers WHERE id = '{id}'" + )) + .fetch_one(&pool) + .await + .unwrap(); + let status: String = row.get(0); + let error_summary: Option = row.get(1); + let finished_at: Option = row.get(2); + assert_eq!(status, "interrupted"); + assert_eq!(error_summary.as_deref(), Some("run ended")); + assert!(finished_at.is_some()); + } + + // Already-terminal run-1 helper and the run-2 helper are untouched. + let completed: String = + sqlx::query("SELECT status FROM run_helpers WHERE id = 'h-run1-completed'") + .fetch_one(&pool) + .await + .unwrap() + .get(0); + assert_eq!(completed, "completed"); + + let run2: String = + sqlx::query("SELECT status FROM run_helpers WHERE id = 'h-run2-running'") + .fetch_one(&pool) + .await + .unwrap() + .get(0); + assert_eq!(run2, "running"); + } + + #[tokio::test] + async fn interrupt_non_terminal_by_run_returns_empty_when_all_terminal() { + let pool = setup_test_pool().await; + + for (id, status) in &[("h-a", "completed"), ("h-b", "interrupted")] { + let helper = RunHelperInsert { + id: id.to_string(), + run_id: "run-1".into(), + thread_id: "t1".into(), + helper_kind: "helper_judge".into(), + parent_tool_call_id: None, + status: status.to_string(), + model_role: "auxiliary".into(), + provider_id: None, + model_id: None, + input_summary: None, + }; + insert(&pool, &helper).await.unwrap(); + } + + let converged = interrupt_non_terminal_by_run(&pool, "run-1", "run ended") + .await + .unwrap(); + assert!(converged.is_empty()); + } + #[tokio::test] async fn mark_interrupted_if_active_flips_running_helper() { let pool = setup_test_pool().await; diff --git a/src-tauri/tests/goal_lifecycle.rs b/src-tauri/tests/goal_lifecycle.rs index 157d7704..71d8ed90 100644 --- a/src-tauri/tests/goal_lifecycle.rs +++ b/src-tauri/tests/goal_lifecycle.rs @@ -453,7 +453,10 @@ mod tests { Some("run-judge-1") ); - // A verified goal stops continuation. + // A verified goal stops continuation, but the terminal run that + // achieved the goal is still accounted exactly once so the finished + // turn count matches what was shown while the run was active. + let turns_before = updated.turns_used; let outcome = mgr .evaluate_after_run("run-after", None) .await @@ -461,6 +464,16 @@ mod tests { .unwrap(); assert_eq!(outcome.verdict, "skipped"); assert!(outcome.continuation_prompt.is_none()); + assert_eq!(outcome.goal.turns_used, turns_before + 1); + + // Re-evaluating the same terminal run is idempotent (no double count). + let outcome_again = mgr + .evaluate_after_run("run-after", None) + .await + .unwrap() + .unwrap(); + assert_eq!(outcome_again.verdict, "skipped"); + assert_eq!(outcome_again.goal.turns_used, turns_before + 1); } #[tokio::test] diff --git a/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.test.ts b/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.test.ts index 937e7a3d..5bd17e96 100644 --- a/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.test.ts +++ b/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.test.ts @@ -43,6 +43,10 @@ describe("formatHelperKind", () => { expect(formatHelperKind("helper_review")).toBe("Review Agent"); }); + it("returns 'Judge Agent' for helper_judge", () => { + expect(formatHelperKind("helper_judge")).toBe("Judge Agent"); + }); + it("returns custom name from slugToNameMap when slug is present", () => { const map = new Map([["refactor", "Refactor Agent"]]); expect(formatHelperKind("helper_custom_refactor", map)).toBe( diff --git a/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.ts b/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.ts index 57e4ffd4..0aee55c6 100644 --- a/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.ts +++ b/src/modules/workbench-shell/ui/runtime-thread-surface-helpers.ts @@ -114,6 +114,8 @@ export function formatHelperKind( return "Explore Agent"; case "helper_review": return "Review Agent"; + case "helper_judge": + return "Judge Agent"; default: if (kind.startsWith(CUSTOM_PREFIX)) { const slug = kind.slice(CUSTOM_PREFIX.length); From a03d9ba5798e2aca5defc53c9b328d71fc3048e7 Mon Sep 17 00:00:00 2001 From: Jorben Date: Thu, 11 Jun 2026 02:12:24 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(core):=20=E2=9C=A8=20=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=20tiycore=200.2.10-rc.2=20=E7=BB=9F=E4=B8=80=20contex?= =?UTF-8?q?t=5Fsize=20=E8=AF=AD=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 升级 tiycore 0.2.9 → 0.2.10-rc.2,把跨协议的 `Usage::context_size()` 作为统一的上下文用量事实来源,贯通 RunUsageDto / 前端 badge / 自动压缩触发,清理原先的 `initial_context_calibration` 启发式估算路径。 主要变更: - 升级 `src-tauri/Cargo.toml` 至 tiycore 0.2.10-rc.2。 - `RunUsageDto`(`src-tauri/src/model/thread.rs`)新增 `context_size: u64` 字段;doc-comment 区分 wire-level `total_tokens` 与统一语义 `context_size`;`From` 用 `value.context_size()` 填入。前端 `src/shared/types/api.ts` 同步暴露 `contextSize`。 - `readUsage`(`src/services/bridge/agent-commands.ts`)解析 `contextSize`,缺失时按 `input+output+cache_read+cache_write` 兜底;新增 `readUsageField` 助手。 - `agent_run_event_handler.rs::ThreadUsageUpdated` 注释说明 `context_size` 派生规则。 - 前端 `ThreadContextUsage` / `buildThreadContextBadgeData` / `onUsage` / `mapRunSummaryToContextUsage` 统一改用 `contextSize` 判定百分比与 isExceeded,`totalTokens` 保留为 wire-level 显示;对应测试 fixture / 新增用例同步。 - 删除 `AgentSession.initial_context_calibration` 与 `ContextCompressionRuntimeState`(及 `effective_prompt_tokens` / `current_context_token_calibration` / `record_pending_prompt_estimate` / `observe_context_usage_calibration` / `build_initial_context_token_calibration` / `run_summary_matches_primary_model`)。 - `context_compression.rs` 删除 `calibrate_total_tokens` / `should_compress_with_calibration`,新增 `should_compress_via_context_size(last_usage, settings)`; `run_auto_compression` 启发式兜底改为 `messages.is_empty()` fast-path;`should_compress` 保留为非 hot-path 估算。 - `AgentSession.last_observed_usage: Arc>>` 替代旧 `context_compression_state`;`configure_agent` / `set_transform_context` 闭包 / `agent_session_events.rs` / `agent.subscribe` 回调统一改用新字段。 - `run_repo.rs` / `run_helper_repo.rs` / `tests/frontend_integration.rs` / `agent_session_tests.rs::make_run_summary` 同步补 `context_size`。 - 删除 / 改写 calibration 相关测试;`message_end_usage_updates_*` 改为 `message_end_usage_updates_record_observed_usage_once_per_change`, `usage_calibration_*` 改为 `usage_record_observed_includes_cache_read_for_context_size`, 清理未使用的测试 helper。 验证: - `cargo fmt --check --manifest-path src-tauri/Cargo.toml` clean - `npm run typecheck` clean - `npm run test:unit -- --run` 53 文件 846 通过 / 1 跳过 - `cargo test --locked --manifest-path src-tauri/Cargo.toml` lib + integration 全通过 - `agent_review` verdict: PASS BREAKING CHANGE: 内部 `AgentSession.context_compression_state` 及其关联启发式 API 已移除,替换为 `AgentSession.last_observed_usage: Arc>>`; 前端 `RunUsageDto` 新增必填 `contextSize`,旧 `totalTokens` 保留 为 wire-level 显示。任何外部消费方需更新到新字段。 Refs: tiycore 0.2.10-rc.2 `Usage::context_size()` --- src-tauri/Cargo.lock | 4 +- src-tauri/Cargo.toml | 2 +- src-tauri/src/core/agent_run_event_handler.rs | 11 + src-tauri/src/core/agent_session.rs | 173 +++++------ .../src/core/agent_session_compression.rs | 176 +++-------- src-tauri/src/core/agent_session_events.rs | 36 ++- src-tauri/src/core/agent_session_tests.rs | 285 ++++++------------ src-tauri/src/core/agent_session_types.rs | 2 - src-tauri/src/core/context_compression.rs | 150 ++++++--- src-tauri/src/model/thread.rs | 20 ++ .../src/persistence/repo/run_helper_repo.rs | 9 + src-tauri/src/persistence/repo/run_repo.rs | 11 + src-tauri/tests/frontend_integration.rs | 4 + .../workbench-shell/model/thread-store.ts | 8 + .../ui/dashboard-workbench-logic.ts | 27 +- .../ui/dashboard-workbench.test.ts | 43 ++- .../ui/dashboard-workbench.tsx | 2 +- .../ui/runtime-thread-surface-state.ts | 15 + .../ui/runtime-thread-surface.test.tsx | 4 + .../ui/runtime-thread-surface.tsx | 11 + .../workbench-shell/ui/workbench-top-bar.tsx | 6 + src/services/bridge/agent-commands.ts | 61 ++-- .../thread-stream/thread-stream.test.ts | 3 + src/shared/types/api.ts | 13 + 24 files changed, 559 insertions(+), 517 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index be76f106..1428aa51 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -6282,9 +6282,9 @@ dependencies = [ [[package]] name = "tiycore" -version = "0.2.9" +version = "0.2.10-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d619c75f2e2f61b57f2fcd81e7ce7e714556b75b2fda8ee932450c657de9ac29" +checksum = "809ab84f3da03ccbfc74e6676f1c99820fe8b31ff44cb4489ab5a8d5ba782172" dependencies = [ "anyhow", "arc-swap", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index eeef8cdd..4eb566da 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -60,7 +60,7 @@ keepawake = "0.6" portable-pty = "0.9" git2 = { version = "0.20", features = ["vendored-libgit2", "vendored-openssl"] } similar = "2" -tiycore = "0.2.9" +tiycore = "0.2.10-rc.2" # Gateway (IM channel support) async-trait = "0.1" diff --git a/src-tauri/src/core/agent_run_event_handler.rs b/src-tauri/src/core/agent_run_event_handler.rs index 9f17b1d4..7aee2467 100644 --- a/src-tauri/src/core/agent_run_event_handler.rs +++ b/src-tauri/src/core/agent_run_event_handler.rs @@ -415,6 +415,17 @@ impl AgentRunManager { } } ThreadStreamEvent::ThreadUsageUpdated { usage, .. } => { + // `usage` here is a `RunUsageDto` populated by tiycore via + // `From`. Its `context_size` field is + // the cross-protocol unified "context occupancy" value + // (`Usage::context_size()` = input + output + cache_read + + // cache_write), NOT the wire-level `total_tokens` (which is + // per-response and provider-dependent). We round-trip the + // whole struct (input/output/cache fields + total_tokens) + // through `tiycore::types::Usage` because the persistence + // layer is typed against `Usage`; `context_size` is + // re-derived inside the `From` impl on read, so storing only + // the raw fields is sufficient and forward-compatible. let usage = tiycore::types::Usage { input: usage.input_tokens, output: usage.output_tokens, diff --git a/src-tauri/src/core/agent_session.rs b/src-tauri/src/core/agent_session.rs index 2c7be27e..abe51a88 100644 --- a/src-tauri/src/core/agent_session.rs +++ b/src-tauri/src/core/agent_session.rs @@ -523,9 +523,7 @@ fn append_runtime_queue_message( } use crate::core::agent_session_compression::{ - build_initial_context_token_calibration, current_context_token_calibration, - persist_compression_markers_to_pool, record_pending_prompt_estimate, run_auto_compression, - ContextCompressionRuntimeState, + persist_compression_markers_to_pool, run_auto_compression, }; use crate::core::agent_session_events::handle_agent_event; pub(crate) use crate::core::agent_session_history::*; @@ -562,7 +560,12 @@ pub async fn build_session_spec( .collect(); let history_tool_calls = tool_call_repo::list_parent_visible_by_run_ids(pool, &history_run_ids).await?; - let latest_historical_run = + // No longer read after the calibration seed was removed; the + // `find_latest_with_prompt_usage_by_thread_excluding_run` call + // above is kept as documentation that historical run usage is + // intentionally not used by the new `Usage::context_size()` + // trigger (the first LLM response provides the source of truth). + let _latest_historical_run = run_repo::find_latest_with_prompt_usage_by_thread_excluding_run(pool, thread_id, run_id) .await?; @@ -629,13 +632,11 @@ pub async fn build_session_spec( } } - let initial_context_calibration = build_initial_context_token_calibration( - latest_historical_run.as_ref(), - &history_messages, - &history_tool_calls, - &resolved_plan.primary, - &system_prompt, - ); + // The previous initial-context calibration seed was removed when we + // switched auto-compression to the unified `Usage::context_size()` + // trigger (see `should_compress_via_context_size`). The trigger no + // longer needs a per-history-run calibration — it compares the most + // recent LLM usage against the budget directly. Ok(AgentSessionSpec { run_id: run_id.to_string(), @@ -649,7 +650,6 @@ pub async fn build_session_spec( history_tool_calls, model_plan: resolved_plan, initial_prompt: None, - initial_context_calibration, cache_arbiter, }) } @@ -664,7 +664,13 @@ pub struct AgentSession { cancel_requested: Arc, pub(crate) checkpoint_requested: AtomicBool, pub(crate) abort_signal: tiycore::agent::AbortSignal, - context_compression_state: Arc>, + /// Most recent `tiycore::types::Usage` observed from the last LLM + /// call, shared with the `set_transform_context` closure. The + /// closure compares `usage.context_size()` against the configured + /// `CompressionSettings::budget()` to decide whether the next turn + /// needs auto-compression. Shared with the event handler so the + /// trigger stays in lockstep with the stream of usage updates. + last_observed_usage: Arc>>, runtime_queue_state: Arc>, /// Shared goal runtime state for tool call recording across command invocations. pub(crate) goal_runtime: Arc>, @@ -682,9 +688,10 @@ impl AgentSession { ) -> Arc { Arc::new_cyclic(|weak_self| { let agent = Arc::new(Agent::with_model(spec.model_plan.primary.model.clone())); - let context_compression_state = Arc::new(StdMutex::new( - ContextCompressionRuntimeState::new(spec.initial_context_calibration), - )); + // Unified trigger: start with no observed usage; the + // `set_transform_context` closure defers its first decision + // until the first LLM response reports `context_size`. + let last_observed_usage = Arc::new(StdMutex::new(None::)); let runtime_queue_state = Arc::new(StdMutex::new(RuntimeQueueState::default())); agent.set_max_turns(max_turns); agent.set_max_retries(Some(TIYCORE_REQUEST_MAX_RETRIES)); @@ -692,7 +699,7 @@ impl AgentSession { &agent, &spec, weak_self.clone(), - Arc::clone(&context_compression_state), + Arc::clone(&last_observed_usage), Arc::clone(&runtime_queue_state), event_tx.clone(), ); @@ -707,7 +714,7 @@ impl AgentSession { cancel_requested: Arc::new(AtomicBool::new(false)), checkpoint_requested: AtomicBool::new(false), abort_signal: tiycore::agent::AbortSignal::new(), - context_compression_state, + last_observed_usage, runtime_queue_state, goal_runtime, } @@ -934,8 +941,8 @@ impl AgentSession { let last_completed_message_id_ref = Arc::clone(&last_completed_message_id); let reasoning_message_id_ref = Arc::clone(¤t_reasoning_message_id); let last_usage_ref = Arc::clone(&last_usage); + let last_observed_usage_ref = Arc::clone(&self.last_observed_usage); let reasoning_ref = Arc::clone(&reasoning_buffer); - let context_compression_state_ref = Arc::clone(&self.context_compression_state); let current_turn_index: Arc>> = Arc::new(StdMutex::new(None)); let turn_index_ref = Arc::clone(¤t_turn_index); let last_text_delta = Arc::new(StdMutex::new(None::)); @@ -948,7 +955,7 @@ impl AgentSession { &last_completed_message_id_ref, &reasoning_message_id_ref, &last_usage_ref, - &context_compression_state_ref, + &last_observed_usage_ref, &reasoning_ref, &turn_index_ref, &last_text_delta_ref, @@ -1084,7 +1091,7 @@ fn configure_agent( agent: &Arc, spec: &AgentSessionSpec, weak_self: Weak, - context_compression_state: Arc>, + last_observed_usage: Arc>>, runtime_queue_state: Arc>, event_tx: mpsc::UnboundedSender, ) { @@ -1100,27 +1107,39 @@ fn configure_agent( agent.set_transport(spec.model_plan.transport); agent.set_security_config(main_agent_security_config()); - // Context compression: when messages exceed the token budget, generate - // a summary with the primary model, persist markers to DB, and keep only - // recent messages. Falls back to pure truncation if the LLM call fails. + // Context compression: when the most recent LLM call reported a + // unified context size above the input budget, summarise the older + // messages with the primary model, persist markers to DB, and keep + // only recent messages. Falls back to pure truncation if the LLM + // call fails. + // + // The trigger is `Usage::context_size()` from tiycore 0.2.10-rc.2 — + // the cross-protocol unified "context occupancy" value + // (input + output + cache_read + cache_write). The previous heuristic + // (`estimate_total_tokens(messages) + system_prompt_estimated_tokens` + // scaled by an observed `ContextTokenCalibration` ratio) was removed + // because it could disagree with the actual provider usage and let an + // over-budget call slip through, and because the heuristic + CJK + // weight drift made the trigger point unpredictable. // - // Two correctness hazards addressed here: + // Two correctness hazards addressed in [`run_auto_compression`]: // - // 1. UUID v7 timing. The reset marker we write to DB uses `now_v7()`, but - // `cut_point` in-memory points at a slice that includes messages the - // current run persisted EARLIER than this call. A naive - // `list_since_last_reset WHERE id >= reset_id` would exclude those - // earlier messages and effectively "lose" the current user prompt on - // the next reload. We therefore resolve a DB-backed boundary id - // conservatively covering all recent messages and attach it to the - // reset marker's metadata. + // 1. UUID v7 timing. The reset marker we write to DB uses `now_v7()`, + // but `cut_point` in-memory points at a slice that includes + // messages the current run persisted EARLIER than this call. A + // naive `list_since_last_reset WHERE id >= reset_id` would exclude + // those earlier messages and effectively "lose" the current user + // prompt on the next reload. We therefore resolve a DB-backed + // boundary id conservatively covering all recent messages and + // attach it to the reset marker's metadata. // // 2. Summary-of-summary decay. If a previous auto-compression already // injected a `` as the head of `messages`, naively // re-summarising `old_messages` would re-summarise an already- - // summarised prefix, losing detail each pass. Instead we detect the - // prior summary, treat it as a pinned prefix, and ask the model to - // **merge** the prior summary with the delta of messages since then. + // summarised prefix, losing detail each pass. Instead we detect + // the prior summary, treat it as a pinned prefix, and ask the + // model to **merge** the prior summary with the delta of messages + // since then. let compression_settings = crate::core::context_compression::CompressionSettings::new( spec.model_plan.primary.model.context_window, ); @@ -1129,33 +1148,26 @@ fn configure_agent( let compression_thread_id = spec.thread_id.clone(); let compression_run_id = spec.run_id.clone(); let compression_response_language = spec.model_plan.raw.response_language.clone(); - let compression_state = Arc::clone(&context_compression_state); - // Pre-compute the system prompt's estimated token count so the - // compression check includes fixed overhead that the provider counts - // against the context window but `estimate_total_tokens(messages)` - // does not see. This narrows the gap the calibration ratio has to - // cover, making the trigger point more predictable. - let system_prompt_estimated_tokens = - crate::core::context_compression::estimate_tokens(&spec.system_prompt); + let last_observed_usage_for_trigger = Arc::clone(&last_observed_usage); agent.set_transform_context(move |messages| { - // Cheap pass-through check first: only clone the heavy captured state - // (ResolvedModelRole, String ids, Weak) when compression will actually - // run. For long sessions with many turns this avoids per-turn heap - // allocations when the thread is still well under budget. + // Cheap pass-through check first: only clone the heavy captured + // state (ResolvedModelRole, String ids, Weak) when compression + // will actually run. For long sessions with many turns this + // avoids per-turn heap allocations when the thread is still + // well under budget. let settings = compression_settings.clone(); - let raw_estimated_tokens = - crate::core::context_compression::estimate_total_tokens(&messages) - .saturating_add(system_prompt_estimated_tokens); - let calibration = current_context_token_calibration(&compression_state); - let calibrated_total_tokens = crate::core::context_compression::calibrate_total_tokens( - raw_estimated_tokens, - Some(calibration), - ); - let needs_compression = !messages.is_empty() - && crate::core::context_compression::should_compress_total_tokens( - calibrated_total_tokens, - &settings, - ); + let snapshot: Option = last_observed_usage_for_trigger + .lock() + .ok() + .and_then(|guard| *guard); + // `should_compress_via_context_size` returns `false` for + // `None` (no observed usage yet) — we defer the first decision + // until the first LLM response reports its `context_size`, + // matching the design note in `context_compression`. + let needs_compression = crate::core::context_compression::should_compress_via_context_size( + snapshot.as_ref(), + &settings, + ) && !messages.is_empty(); let model_role = if needs_compression { Some(primary_model_role.clone()) @@ -1182,31 +1194,24 @@ fn configure_agent( } else { None }; - let compression_state = Arc::clone(&compression_state); async move { - let transformed_messages = if !needs_compression { - messages - } else { - // Unwraps are sound: all `Some(_)` are populated together under - // `needs_compression`, so either all four are `Some` (compression - // path) or we returned above. - run_auto_compression( - messages, - settings, - model_role.expect("model_role populated when compressing"), - weak.expect("weak populated when compressing"), - thread_id.expect("thread_id populated when compressing"), - run_id.expect("run_id populated when compressing"), - response_language.expect("response_language populated when compressing"), - ) - .await - }; - let sent_estimated_tokens = - crate::core::context_compression::estimate_total_tokens(&transformed_messages) - .saturating_add(system_prompt_estimated_tokens); - record_pending_prompt_estimate(&compression_state, sent_estimated_tokens); - transformed_messages + if !needs_compression { + return messages; + } + // Unwraps are sound: all `Some(_)` are populated together + // under `needs_compression`, so either all six are `Some` + // (compression path) or we returned above. + run_auto_compression( + messages, + settings, + model_role.expect("model_role populated when compressing"), + weak.expect("weak populated when compressing"), + thread_id.expect("thread_id populated when compressing"), + run_id.expect("run_id populated when compressing"), + response_language.expect("response_language populated when compressing"), + ) + .await } }); diff --git a/src-tauri/src/core/agent_session_compression.rs b/src-tauri/src/core/agent_session_compression.rs index 9b03a01f..af0b5427 100644 --- a/src-tauri/src/core/agent_session_compression.rs +++ b/src-tauri/src/core/agent_session_compression.rs @@ -1,125 +1,12 @@ use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::sync::{Mutex as StdMutex, Weak}; +use std::sync::Weak; use sqlx::SqlitePool; use tiycore::agent::AgentMessage; -use tiycore::types::Usage; use crate::core::agent_session::AgentSession; -use crate::core::agent_session_history::convert_history_messages; use crate::core::agent_session_types::ResolvedModelRole; -use crate::core::context_compression::ContextTokenCalibration; use crate::ipc::frontend_channels::ThreadStreamEvent; -use crate::model::thread::{MessageRecord, RunSummaryDto, ToolCallDto}; - -pub(crate) fn effective_prompt_tokens(input_tokens: u64, cache_read_tokens: u64) -> u64 { - input_tokens.saturating_add(cache_read_tokens) -} - -#[derive(Debug, Default)] -pub(crate) struct ContextCompressionRuntimeState { - calibration: ContextTokenCalibration, - pub(crate) pending_prompt_estimate: Option, -} - -impl ContextCompressionRuntimeState { - pub(crate) fn new(initial_calibration: ContextTokenCalibration) -> Self { - Self { - calibration: initial_calibration, - pending_prompt_estimate: None, - } - } - - fn calibration(&self) -> ContextTokenCalibration { - self.calibration - } - - pub(crate) fn record_pending_prompt_estimate(&mut self, estimated_tokens: u32) { - self.pending_prompt_estimate = Some(estimated_tokens); - } - - fn observe_prompt_usage(&mut self, actual_prompt_tokens: u64) { - let Some(estimated_tokens) = self.pending_prompt_estimate.take() else { - return; - }; - - self.calibration = self - .calibration - .observe(estimated_tokens, actual_prompt_tokens); - } -} - -pub(crate) fn current_context_token_calibration( - state: &StdMutex, -) -> ContextTokenCalibration { - state - .lock() - .map(|state| state.calibration()) - .unwrap_or_default() -} - -pub(crate) fn record_pending_prompt_estimate( - state: &StdMutex, - estimated_tokens: u32, -) { - if let Ok(mut state) = state.lock() { - state.record_pending_prompt_estimate(estimated_tokens); - } -} - -pub(crate) fn observe_context_usage_calibration( - state: &StdMutex, - usage: &Usage, -) { - let actual_prompt_tokens = effective_prompt_tokens(usage.input, usage.cache_read); - if actual_prompt_tokens == 0 { - return; - } - - if let Ok(mut state) = state.lock() { - state.observe_prompt_usage(actual_prompt_tokens); - } -} - -pub(crate) fn build_initial_context_token_calibration( - latest_historical_run: Option<&RunSummaryDto>, - history_messages: &[MessageRecord], - history_tool_calls: &[ToolCallDto], - primary_model: &ResolvedModelRole, - system_prompt: &str, -) -> ContextTokenCalibration { - let Some(latest_historical_run) = latest_historical_run else { - return ContextTokenCalibration::default(); - }; - - let historical_prompt_tokens = effective_prompt_tokens( - latest_historical_run.usage.input_tokens, - latest_historical_run.usage.cache_read_tokens, - ); - if historical_prompt_tokens == 0 - || !run_summary_matches_primary_model(latest_historical_run, primary_model) - { - return ContextTokenCalibration::default(); - } - - let history = - convert_history_messages(history_messages, history_tool_calls, &primary_model.model); - let estimated_tokens = crate::core::context_compression::estimate_total_tokens(&history) - .saturating_add(crate::core::context_compression::estimate_tokens( - system_prompt, - )); - - ContextTokenCalibration::from_observation(estimated_tokens, historical_prompt_tokens) - .unwrap_or_default() -} - -fn run_summary_matches_primary_model( - run_summary: &RunSummaryDto, - primary_model: &ResolvedModelRole, -) -> bool { - run_summary.model_id.as_deref() == Some(primary_model.model_id.as_str()) - || run_summary.model_id.as_deref() == Some(primary_model.model.id.as_str()) -} /// Auto-compression hook body, extracted from the `set_transform_context` /// closure in [`configure_agent`] so the control flow is testable in isolation @@ -150,14 +37,20 @@ pub(crate) async fn run_auto_compression( run_id: String, response_language: Option, ) -> Vec { - // Phase 1: check if compression is needed. + // Phase 1: trust the caller. The `set_transform_context` closure in + // `configure_agent` is the single source of truth for "should we + // compress right now?" — it compares the most recent + // `tiycore::types::Usage::context_size()` from the last LLM call + // against `settings.budget()`. The previous heuristic + // (`should_compress` over `messages`) was removed because it could + // disagree with the unified-usage trigger (e.g. a long prompt with + // a small historical thread) and let an over-budget call slip + // through. // - // The hot-path caller (the `set_transform_context` closure) already gates - // on `should_compress` before cloning the heavy state, so in production - // this branch should never hit. It stays here defensively so direct - // callers (e.g. unit tests) still get correct behaviour for under-budget - // inputs without having to duplicate the check. - if !crate::core::context_compression::should_compress(&messages, &settings) { + // We still keep the cheap empty-input fast path so direct unit + // callers (and tests) get the same pass-through behaviour as before + // for the degenerate empty case. + if messages.is_empty() { return messages; } @@ -942,11 +835,23 @@ mod tests { } #[tokio::test] - async fn returns_messages_unchanged_when_under_budget() { - // With a generous budget, should_compress is false and the function - // is a pure pass-through — no clone of messages, no LLM call, no - // DB access. This exercises the most common hot-path behaviour. - let messages = vec![make_user("hi"), make_assistant("hello")]; + async fn returns_messages_unchanged_when_empty_or_dangling_weak() { + // The trigger ("should we auto-compress?") is now owned by + // `set_transform_context` — it gates on + // `should_compress_via_context_size` before calling + // `run_auto_compression`. So in production, by the time we + // get here, the caller has already decided to compress. The + // only fast-path still inside `run_auto_compression` is the + // `messages.is_empty()` empty-input guard, so we exercise + // that. + // + // With a dangling `Weak`, an empty input + // returns immediately via the fast-path: no LLM call, no + // DB access, no side effects. The test asserts both the + // length and the byte-identical content of the head + // message, which is enough to prove the function did not + // touch the input. + let messages = vec![]; let settings = settings_for_test(128_000, 1_024, 1_024); let result = run_auto_compression( @@ -960,22 +865,9 @@ mod tests { ) .await; - assert_eq!(result.len(), messages.len()); - // Content should be byte-identical — no summary was injected. - match (&result[0], &messages[0]) { - (AgentMessage::User(a), AgentMessage::User(b)) => { - let at = match &a.content { - tiycore::types::UserContent::Text(t) => t.as_str(), - _ => panic!("expected text"), - }; - let bt = match &b.content { - tiycore::types::UserContent::Text(t) => t.as_str(), - _ => panic!("expected text"), - }; - assert_eq!(at, bt); - } - _ => panic!("expected user message at head"), - } + assert_eq!(result.len(), 0); + // Empty input ⇒ empty output (pass-through fast path). + assert!(result.is_empty()); } #[tokio::test] diff --git a/src-tauri/src/core/agent_session_events.rs b/src-tauri/src/core/agent_session_events.rs index 17d88b06..7cae0c58 100644 --- a/src-tauri/src/core/agent_session_events.rs +++ b/src-tauri/src/core/agent_session_events.rs @@ -4,9 +4,6 @@ use tiycore::agent::AgentMessage; use tiycore::types::{AssistantMessageEvent, Usage}; use tokio::sync::mpsc; -use crate::core::agent_session_compression::{ - observe_context_usage_calibration, ContextCompressionRuntimeState, -}; use crate::ipc::frontend_channels::ThreadStreamEvent; use crate::model::thread::RunUsageDto; @@ -17,7 +14,7 @@ pub(crate) fn handle_agent_event( last_completed_message_id: &StdMutex>, current_reasoning_message_id: &StdMutex>, last_usage: &StdMutex>, - context_compression_state: &StdMutex, + last_observed_usage: &StdMutex>, reasoning_buffer: &StdMutex, current_turn_index: &StdMutex>, last_text_delta: &StdMutex>, @@ -177,7 +174,7 @@ pub(crate) fn handle_agent_event( run_id, event_tx, last_usage, - context_compression_state, + last_observed_usage, &partial.usage, context_window, model_display_name, @@ -208,7 +205,7 @@ pub(crate) fn handle_agent_event( run_id, event_tx, last_usage, - context_compression_state, + last_observed_usage, &assistant.usage, context_window, model_display_name, @@ -223,7 +220,7 @@ pub(crate) fn handle_agent_event( run_id, event_tx, last_usage, - context_compression_state, + last_observed_usage, &assistant.usage, context_window, model_display_name, @@ -259,7 +256,7 @@ fn emit_usage_update_if_changed( run_id: &str, event_tx: &mpsc::UnboundedSender, last_usage: &StdMutex>, - context_compression_state: &StdMutex, + last_observed_usage: &StdMutex>, usage: &Usage, context_window: &str, model_display_name: &str, @@ -287,7 +284,12 @@ fn emit_usage_update_if_changed( return; } - observe_context_usage_calibration(context_compression_state, usage); + // Record the freshly-observed usage into the shared + // `last_observed_usage` slot. The `set_transform_context` closure + // (configured by `configure_agent`) reads this same slot to decide + // whether the next turn needs auto-compression — see + // `should_compress_via_context_size` in `context_compression`. + record_observed_usage(last_observed_usage, usage); let _ = event_tx.send(ThreadStreamEvent::ThreadUsageUpdated { run_id: run_id.to_string(), @@ -297,6 +299,22 @@ fn emit_usage_update_if_changed( }); } +/// Record the most recent `tiycore::types::Usage` into the shared +/// compression-trigger slot. +/// +/// The slot is shared with `set_transform_context`: the closure reads it +/// to compare the latest unified `context_size()` against the configured +/// `CompressionSettings::budget()`. Writes are guarded by the standard +/// `lock_or_recover` poison-recovery helper so a panic in any consumer +/// cannot corrupt the trigger state. +fn record_observed_usage(last_observed_usage: &StdMutex>, usage: &Usage) { + if let Ok(mut guard) = last_observed_usage.lock() { + *guard = Some(*usage); + } else { + tracing::warn!("record_observed_usage: mutex poisoned, recovering"); + } +} + /// Helper to recover a poisoned `StdMutex`. All mutex helpers below use this /// pattern so that a panic in an unrelated thread never silently corrupts /// message-tracking state. diff --git a/src-tauri/src/core/agent_session_tests.rs b/src-tauri/src/core/agent_session_tests.rs index 4500c5b3..93c33137 100644 --- a/src-tauri/src/core/agent_session_tests.rs +++ b/src-tauri/src/core/agent_session_tests.rs @@ -1,23 +1,20 @@ #[cfg(test)] pub(super) mod tests { use super::super::{ - build_initial_context_token_calibration, build_profile_response_prompt_parts, - build_system_prompt, convert_history_messages, current_context_token_calibration, + build_profile_response_prompt_parts, build_system_prompt, convert_history_messages, handle_agent_event, main_agent_security_config, mark_runtime_queue_message_by_id, normalize_profile_response_language, normalize_profile_response_style, - plan_mode_missing_checkpoint_error, record_pending_prompt_estimate, - resolve_helper_model_role, resolve_helper_profile, resolve_model_plan, - resolve_runtime_model_role, response_style_system_instruction, + plan_mode_missing_checkpoint_error, resolve_helper_model_role, resolve_helper_profile, + resolve_model_plan, resolve_runtime_model_role, response_style_system_instruction, runtime_queue_message_display_content, runtime_security_config, runtime_tools_for_profile, runtime_tools_for_profile_with_extensions, standard_tool_timeout, trim_history_to_current_context, trim_runtime_queue_state, update_runtime_queue_state_for_event, AgentQueueMessageKind, AgentSession, - AgentSessionSpec, ContextCompressionRuntimeState, ProfileResponseStyle, ResolvedModelRole, - ResolvedRuntimeModelPlan, RuntimeModelPlan, RuntimeQueueEventAction, RuntimeQueueEventDto, - RuntimeQueueMessageDto, RuntimeQueueMessageStatus, RuntimeQueueState, SortKey, - DEFAULT_FULL_TOOL_PROFILE, MAIN_AGENT_TOOL_TIMEOUT_SECS, - PLAN_MODE_MISSING_CHECKPOINT_ERROR, PLAN_READ_ONLY_TOOL_PROFILE, - STANDARD_TOOL_TIMEOUT_SECS, SUBAGENT_TOOL_TIMEOUT_SECS, + AgentSessionSpec, ProfileResponseStyle, ResolvedModelRole, ResolvedRuntimeModelPlan, + RuntimeModelPlan, RuntimeQueueEventAction, RuntimeQueueEventDto, RuntimeQueueMessageDto, + RuntimeQueueMessageStatus, RuntimeQueueState, SortKey, DEFAULT_FULL_TOOL_PROFILE, + MAIN_AGENT_TOOL_TIMEOUT_SECS, PLAN_MODE_MISSING_CHECKPOINT_ERROR, + PLAN_READ_ONLY_TOOL_PROFILE, STANDARD_TOOL_TIMEOUT_SECS, SUBAGENT_TOOL_TIMEOUT_SECS, }; use std::fs; use std::sync::{Arc, Mutex as StdMutex}; @@ -43,7 +40,7 @@ pub(super) mod tests { use crate::ipc::frontend_channels::ThreadStreamEvent; use crate::model::provider::{AgentProfileRecord, ProviderKind, ProviderRecord}; use crate::model::subagent::CustomSubagentModelRole; - use crate::model::thread::{MessageRecord, RunSummaryDto, RunUsageDto, ToolCallDto}; + use crate::model::thread::{MessageRecord, ToolCallDto}; use crate::persistence::init_database; use crate::persistence::repo::provider_repo; @@ -196,22 +193,6 @@ pub(super) mod tests { } } - fn make_history_message(id: &str, run_id: &str, role: &str, content: &str) -> MessageRecord { - MessageRecord { - id: id.to_string(), - thread_id: "thread-1".to_string(), - run_id: Some(run_id.to_string()), - role: role.to_string(), - content_markdown: content.to_string(), - parts_json: None, - message_type: "plain_message".to_string(), - status: "completed".to_string(), - metadata_json: None, - attachments_json: None, - created_at: "2026-01-01T00:00:00.000Z".to_string(), - } - } - #[test] fn update_runtime_queue_state_for_consumed_returns_pending_messages() { let mut state = RuntimeQueueState::default(); @@ -322,7 +303,6 @@ pub(super) mod tests { history_tool_calls: Vec::new(), model_plan: sample_resolved_runtime_model_plan(None), initial_prompt: None, - initial_context_calibration: Default::default(), cache_arbiter: None, }; @@ -397,7 +377,6 @@ pub(super) mod tests { history_tool_calls: Vec::new(), model_plan: sample_resolved_runtime_model_plan(None), initial_prompt: None, - initial_context_calibration: Default::default(), cache_arbiter: None, }; let session = AgentSession::new( @@ -678,35 +657,6 @@ pub(super) mod tests { assert_eq!(state.messages[1].updated_at, now); } - fn make_run_summary(model_id: &str, input_tokens: u64) -> RunSummaryDto { - make_run_summary_with_cache(model_id, input_tokens, 0) - } - - fn make_run_summary_with_cache( - model_id: &str, - input_tokens: u64, - cache_read_tokens: u64, - ) -> RunSummaryDto { - RunSummaryDto { - id: "run-prev".to_string(), - thread_id: "thread-1".to_string(), - run_mode: "default".to_string(), - status: "completed".to_string(), - model_id: Some(model_id.to_string()), - model_display_name: Some(model_id.to_string()), - context_window: Some(TEST_CONTEXT_WINDOW.to_string()), - error_message: None, - started_at: "2026-01-01T00:00:00.000Z".to_string(), - usage: RunUsageDto { - input_tokens, - output_tokens: 128, - cache_read_tokens, - cache_write_tokens: 0, - total_tokens: input_tokens + cache_read_tokens + 128, - }, - } - } - fn message_text(message: &AgentMessage) -> String { match message { AgentMessage::User(user) => match &user.content { @@ -744,26 +694,29 @@ pub(super) mod tests { reasoning_buffer: &StdMutex, event: &AgentEvent, ) { - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); - handle_test_agent_event_with_context_state( + // The unified-trigger slot is created fresh per test; tests that + // care about compression-trigger state should pass their own + // shared mutex via `handle_test_agent_event_with_observed_usage`. + let last_observed_usage = StdMutex::new(None::); + handle_test_agent_event_with_observed_usage( run_id, event_tx, current_message_id, current_reasoning_message_id, last_usage, - &context_compression_state, + &last_observed_usage, reasoning_buffer, event, ); } - fn handle_test_agent_event_with_context_state( + fn handle_test_agent_event_with_observed_usage( run_id: &str, event_tx: &mpsc::UnboundedSender, current_message_id: &StdMutex>, current_reasoning_message_id: &StdMutex>, last_usage: &StdMutex>, - context_compression_state: &StdMutex, + last_observed_usage: &StdMutex>, reasoning_buffer: &StdMutex, event: &AgentEvent, ) { @@ -777,7 +730,7 @@ pub(super) mod tests { &last_completed_message_id, current_reasoning_message_id, last_usage, - context_compression_state, + last_observed_usage, reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1300,12 +1253,20 @@ Used for prompt assembly coverage. } #[test] - fn message_end_usage_updates_consume_pending_prompt_estimate_once() { + fn message_end_usage_updates_record_observed_usage_once_per_change() { + // Replaces the legacy `message_end_usage_updates_consume_pending_prompt_estimate_once` + // test. The new compression trigger (`Usage::context_size()`) does + // NOT depend on a pending prompt estimate — it just records the + // latest `Usage` into the shared `last_observed_usage` slot on + // every changed `MessageEnd` event. Sending the same event twice + // should still emit the `ThreadUsageUpdated` exactly once (the + // dedup happens via the `last_usage` comparison in + // `emit_usage_update_if_changed`). let (event_tx, mut event_rx) = mpsc::unbounded_channel(); let current_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let assistant = AssistantMessage::builder() .api(Api::OpenAICompletions) @@ -1315,14 +1276,13 @@ Used for prompt assembly coverage. .build() .expect("assistant message with usage"); - record_pending_prompt_estimate(&context_compression_state, 1_000); - handle_test_agent_event_with_context_state( - "run-usage-calibration", + handle_test_agent_event_with_observed_usage( + "run-usage-record", &event_tx, ¤t_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, &AgentEvent::MessageEnd { turn_index: 0, @@ -1330,13 +1290,13 @@ Used for prompt assembly coverage. message: AgentMessage::Assistant(assistant.clone()), }, ); - handle_test_agent_event_with_context_state( - "run-usage-calibration", + handle_test_agent_event_with_observed_usage( + "run-usage-record", &event_tx, ¤t_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, &AgentEvent::MessageEnd { turn_index: 0, @@ -1348,25 +1308,43 @@ Used for prompt assembly coverage. let usage_events = std::iter::from_fn(|| event_rx.try_recv().ok()) .filter(|event| matches!(event, ThreadStreamEvent::ThreadUsageUpdated { .. })) .count(); - let calibration = current_context_token_calibration(&context_compression_state); + let observed = last_observed_usage + .lock() + .expect("last_observed_usage mutex") + .clone(); + // The dedup still fires: identical MessageEnd ⇒ 1 emit, even + // though we wrote the same Usage into the trigger slot twice. assert_eq!(usage_events, 1); - assert_eq!(calibration.ratio_basis_points(), 15_000); - assert!(context_compression_state - .lock() - .expect("context compression state") - .pending_prompt_estimate - .is_none()); + let observed = observed.expect("observed usage should be recorded"); + assert_eq!(observed.input, 1_500); + assert_eq!(observed.output, 32); + // The unified context_size sums input + output = 1532. + assert_eq!(observed.context_size(), 1_532); } #[test] - fn usage_calibration_counts_cache_read_when_input_is_zero() { + fn usage_record_observed_includes_cache_read_for_context_size() { + // Replaces the legacy `usage_calibration_counts_cache_read_when_input_is_zero` + // test. The new unified `Usage::context_size()` already adds + // `cache_read` (and `cache_write`) to the context footprint, so + // the trigger sees the true "tokens in the context window" figure + // even when the wire-level `input` is 0 — a configuration that + // happens e.g. on Anthropic when the entire prompt was served + // from the prompt cache. We assert both: + // + // 1. The `last_observed_usage` slot is populated with the full + // `Usage` (including cache_read), so the next + // `set_transform_context` call can compare the unified + // `context_size()` against the budget. + // 2. `Usage::context_size()` adds `cache_read` to the total — + // input=0, output=32, cache_read=1500 ⇒ context_size=1532. let (event_tx, mut event_rx) = mpsc::unbounded_channel(); let current_message_id = StdMutex::new(None::); let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1385,15 +1363,14 @@ Used for prompt assembly coverage. .build() .expect("assistant message with cache-read usage"); - record_pending_prompt_estimate(&context_compression_state, 1_000); handle_agent_event( - "run-cache-read-calibration", + "run-cache-read-record", &event_tx, ¤t_message_id, &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1409,100 +1386,18 @@ Used for prompt assembly coverage. let usage_events = std::iter::from_fn(|| event_rx.try_recv().ok()) .filter(|event| matches!(event, ThreadStreamEvent::ThreadUsageUpdated { .. })) .count(); - let calibration = current_context_token_calibration(&context_compression_state); - - assert_eq!(usage_events, 1); - assert_eq!(calibration.ratio_basis_points(), 15_000); - assert!(context_compression_state + let observed = last_observed_usage .lock() - .expect("context compression state") - .pending_prompt_estimate - .is_none()); - } + .expect("last_observed_usage mutex") + .clone(); - #[test] - fn build_initial_context_token_calibration_seeds_from_matching_historical_run() { - let primary_model = sample_resolved_model_role("primary-model"); - let history_messages = vec![ - make_history_message("msg-1", "run-prev", "user", &"x".repeat(600)), - make_history_message("msg-2", "run-prev", "assistant", &"y".repeat(600)), - ]; - let history = convert_history_messages(&history_messages, &[], &primary_model.model); - let estimated_tokens = crate::core::context_compression::estimate_total_tokens(&history); - let run_summary = make_run_summary("primary-model", (estimated_tokens as u64) * 2); - - let calibration = build_initial_context_token_calibration( - Some(&run_summary), - &history_messages, - &[], - &primary_model, - "", - ); - - assert_eq!(calibration.ratio_basis_points(), 20_000); - assert_eq!( - calibration.apply_to_estimate(estimated_tokens), - estimated_tokens * 2 - ); - } - - #[test] - fn build_initial_context_token_calibration_counts_cache_read_tokens() { - let primary_model = sample_resolved_model_role("primary-model"); - let history_messages = vec![ - make_history_message("msg-1", "run-prev", "user", &"x".repeat(600)), - make_history_message("msg-2", "run-prev", "assistant", &"y".repeat(600)), - ]; - let history = convert_history_messages(&history_messages, &[], &primary_model.model); - let estimated_tokens = crate::core::context_compression::estimate_total_tokens(&history); - let run_summary = make_run_summary_with_cache( - "primary-model", - estimated_tokens as u64 / 2, - estimated_tokens as u64 * 3 / 2, - ); - - let calibration = build_initial_context_token_calibration( - Some(&run_summary), - &history_messages, - &[], - &primary_model, - "", - ); - - assert_eq!(calibration.ratio_basis_points(), 20_000); - assert_eq!( - calibration.apply_to_estimate(estimated_tokens), - estimated_tokens * 2 - ); - } - - #[test] - fn build_initial_context_token_calibration_ignores_mismatched_models_and_zero_usage() { - let primary_model = sample_resolved_model_role("primary-model"); - let history_messages = vec![make_history_message( - "msg-1", - "run-prev", - "user", - &"x".repeat(400), - )]; - - let mismatched = build_initial_context_token_calibration( - Some(&make_run_summary("other-model", 4_096)), - &history_messages, - &[], - &primary_model, - "", - ); - let zero_usage = build_initial_context_token_calibration( - Some(&make_run_summary("primary-model", 0)), - &history_messages, - &[], - &primary_model, - "", - ); - - assert_eq!(mismatched.ratio_basis_points(), 10_000); - assert_eq!(zero_usage.ratio_basis_points(), 10_000); + assert_eq!(usage_events, 1); + let observed = observed.expect("observed usage should be recorded"); + assert_eq!(observed.input, 0); + assert_eq!(observed.output, 32); + assert_eq!(observed.cache_read, 1_500); + // Unified context_size adds cache_read even when input is 0. + assert_eq!(observed.context_size(), 1_532); } #[test] @@ -1512,7 +1407,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1524,7 +1419,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1558,7 +1453,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1570,7 +1465,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1617,7 +1512,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1640,7 +1535,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1657,7 +1552,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1674,7 +1569,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1706,7 +1601,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1719,7 +1614,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1750,7 +1645,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1814,7 +1709,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1829,7 +1724,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1861,7 +1756,7 @@ Used for prompt assembly coverage. let last_completed_message_id = StdMutex::new(None::); let current_reasoning_message_id = StdMutex::new(None::); let last_usage = StdMutex::new(None::); - let context_compression_state = StdMutex::new(ContextCompressionRuntimeState::default()); + let last_observed_usage = StdMutex::new(None::); let reasoning_buffer = StdMutex::new(String::new()); let current_turn_index = StdMutex::new(None::); let last_text_delta = StdMutex::new(None::); @@ -1884,7 +1779,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -1903,7 +1798,7 @@ Used for prompt assembly coverage. &last_completed_message_id, ¤t_reasoning_message_id, &last_usage, - &context_compression_state, + &last_observed_usage, &reasoning_buffer, ¤t_turn_index, &last_text_delta, @@ -4904,7 +4799,6 @@ Used for prompt assembly coverage. history_tool_calls: Vec::new(), model_plan: sample_resolved_runtime_model_plan(None), initial_prompt: None, - initial_context_calibration: Default::default(), cache_arbiter: None, }; let session = AgentSession::new( @@ -4979,7 +4873,6 @@ Used for prompt assembly coverage. history_tool_calls: Vec::new(), model_plan: sample_resolved_runtime_model_plan(None), initial_prompt: None, - initial_context_calibration: Default::default(), cache_arbiter: None, }; let session = AgentSession::new( diff --git a/src-tauri/src/core/agent_session_types.rs b/src-tauri/src/core/agent_session_types.rs index f689cd42..56e9be10 100644 --- a/src-tauri/src/core/agent_session_types.rs +++ b/src-tauri/src/core/agent_session_types.rs @@ -5,7 +5,6 @@ use tiycore::agent::AgentTool; use tiycore::thinking::ThinkingLevel; use tiycore::types::{Model, OpenAICompletionsCompat, Transport}; -use crate::core::context_compression::ContextTokenCalibration; use crate::core::prompt::CacheMarkerArbiter; use crate::model::provider::AgentProfileRecord; use crate::model::thread::{MessageRecord, ToolCallDto}; @@ -168,7 +167,6 @@ pub struct AgentSessionSpec { pub history_tool_calls: Vec, pub model_plan: ResolvedRuntimeModelPlan, pub initial_prompt: Option, - pub initial_context_calibration: ContextTokenCalibration, /// Global cache marker arbiter for the request lifecycle. /// Records system prompt markers and allocates message-layer quota. /// Must be reset after each LLM call (§ 3.7.1). diff --git a/src-tauri/src/core/context_compression.rs b/src-tauri/src/core/context_compression.rs index d63279a1..a0f7efcc 100644 --- a/src-tauri/src/core/context_compression.rs +++ b/src-tauri/src/core/context_compression.rs @@ -253,14 +253,27 @@ pub fn estimate_total_tokens(messages: &[AgentMessage]) -> u32 { messages.iter().map(estimate_message_tokens).sum() } -/// Apply an optional conservative calibration to a heuristic token estimate. -pub(crate) fn calibrate_total_tokens( - total_tokens: u32, - calibration: Option, -) -> u32 { - calibration - .unwrap_or_default() - .apply_to_estimate(total_tokens) +/// Trigger compression when the most recently observed unified context +/// occupancy (`tiycore::types::Usage::context_size()`) is over budget. +/// +/// This is the canonical "should we auto-compress right now?" predicate +/// used by the `set_transform_context` hook in `agent_session`. It takes +/// the last observed usage (the `Some(usage)` branch) and the configured +/// `CompressionSettings`, returning `true` iff the **cross-protocol +/// unified** context size exceeds `context_window - reserve_tokens`. +/// +/// The first time we see a thread we have no observed usage yet; the +/// closure passes `None` to defer the decision until the first LLM +/// response reports its `context_size`. In that case the function +/// returns `false` — never trigger on a missing observation. +pub fn should_compress_via_context_size( + last_usage: Option<&tiycore::types::Usage>, + settings: &CompressionSettings, +) -> bool { + match last_usage { + Some(usage) => usage.context_size() > u64::from(settings.budget()), + None => false, + } } /// Check whether a token total exceeds the compression input budget. @@ -271,25 +284,20 @@ pub(crate) fn should_compress_total_tokens( total_tokens > settings.budget() } -/// Check whether compression is needed for the given messages and settings, -/// applying an optional conservative calibration derived from real provider -/// `usage.input` samples. -pub fn should_compress_with_calibration( - messages: &[AgentMessage], - settings: &CompressionSettings, - calibration: Option, -) -> bool { +/// Check whether compression is needed for the given messages and settings. +/// +/// This is a heuristic estimator kept for direct callers (e.g. fallback +/// paths, debug tooling) and for the `compress_context_fallback` safety +/// net. The hot path in `set_transform_context` deliberately does **not** +/// use this — it uses [`should_compress_via_context_size`] so the trigger +/// reflects real provider-reported context occupancy +/// (`Usage::context_size()`) rather than a chars/4 estimate. +pub fn should_compress(messages: &[AgentMessage], settings: &CompressionSettings) -> bool { if messages.is_empty() { return false; } let total_tokens = estimate_total_tokens(messages); - let calibrated_total_tokens = calibrate_total_tokens(total_tokens, calibration); - should_compress_total_tokens(calibrated_total_tokens, settings) -} - -/// Check whether compression is needed for the given messages and settings. -pub fn should_compress(messages: &[AgentMessage], settings: &CompressionSettings) -> bool { - should_compress_with_calibration(messages, settings, None) + should_compress_total_tokens(total_tokens, settings) } /// Find the cut-point index: messages before this index are "old" (to be @@ -904,33 +912,89 @@ mod tests { } #[test] - fn should_compress_with_calibration_triggers_when_raw_estimate_is_under_budget() { - let mut messages = Vec::new(); - for i in 0..4 { - messages.push(make_user(&format!("Question {}: {}", i, "x".repeat(400)))); - messages.push(make_assistant(&format!( - "Answer {}: {}", - i, - "y".repeat(400) - ))); - } - + fn should_compress_via_context_size_triggers_when_last_usage_exceeds_budget() { + // The unified `context_size` (= input + output + cache_read + + // cache_write) is the canonical "context occupancy" source of + // truth from tiycore 0.2.10-rc.2. When the most recent LLM call + // reports a `context_size` over `context_window - reserve_tokens`, + // compression must trigger on the NEXT `set_transform_context` + // invocation. let settings = CompressionSettings { context_window: 4_000, reserve_tokens: 2_000, + // budget = 2_000 keep_recent_tokens: 500, }; - let raw_total = estimate_total_tokens(&messages); - assert!(raw_total < settings.budget()); - - let calibration = ContextTokenCalibration::from_observation(raw_total, 2_500) - .expect("non-zero observation should produce calibration"); + // 2_500 > 2_000 → trigger. + let over_budget = tiycore::types::Usage { + input: 2_000, + output: 500, + cache_read: 0, + cache_write: 0, + total_tokens: 2_500, + cost: tiycore::types::UsageCost::default(), + }; + assert!(should_compress_via_context_size( + Some(&over_budget), + &settings, + )); - assert!(!should_compress(&messages, &settings)); - assert!(should_compress_with_calibration( - &messages, + // 1_500 ≤ 2_000 → pass-through. + let under_budget = tiycore::types::Usage { + input: 1_000, + output: 500, + cache_read: 0, + cache_write: 0, + total_tokens: 1_500, + cost: tiycore::types::UsageCost::default(), + }; + assert!(!should_compress_via_context_size( + Some(&under_budget), &settings, - Some(calibration), + )); + + // First request: no observed usage yet → pass-through. + assert!(!should_compress_via_context_size(None, &settings)); + } + + #[test] + fn should_compress_via_context_size_uses_cache_read_and_cache_write() { + // The unified context size includes cache_read and cache_write + // because those tokens still occupy the provider's context window. + // A wire-level `total_tokens` may exclude them on some providers + // (e.g. Anthropic) but the unified figure must add them back. + let settings = CompressionSettings { + context_window: 4_000, + reserve_tokens: 2_000, + keep_recent_tokens: 500, + }; + let usage = tiycore::types::Usage { + input: 800, + output: 200, + cache_read: 800, + cache_write: 200, + // Wire-level: input + output = 1000. With cache: 2000. + // The Anthropic wire-level total_tokens is sometimes reported + // as `input + output + cache_read + cache_write` already + // (= 2000 here), but the unified figure is always computed + // by the sum so it doesn't depend on the provider. + total_tokens: 1_000, + cost: tiycore::types::UsageCost::default(), + }; + // context_size = 800 + 200 + 800 + 200 = 2000 = budget → NOT over. + assert!(!should_compress_via_context_size(Some(&usage), &settings)); + // Just-above: 2001 > 2000. + let just_above = tiycore::types::Usage { + input: 801, + output: 200, + cache_read: 800, + cache_write: 200, + total_tokens: 1_001, + cost: tiycore::types::UsageCost::default(), + }; + assert!(should_compress_via_context_size( + Some(&just_above), + &settings )); } diff --git a/src-tauri/src/model/thread.rs b/src-tauri/src/model/thread.rs index 4137fa27..14bfd690 100644 --- a/src-tauri/src/model/thread.rs +++ b/src-tauri/src/model/thread.rs @@ -296,6 +296,20 @@ impl From for MessageDto { // RunSummary — lightweight run info for snapshots // --------------------------------------------------------------------------- +/// Per-run token usage snapshot sent to the frontend. +/// +/// `total_tokens` carries the **wire-level** value reported by the provider +/// (e.g. OpenAI/Google: `prompt + completion`; Anthropic: `input + output`). +/// It is preserved unchanged for backwards compatibility with downstream +/// billing/reporting consumers that need the wire-level total. +/// +/// `context_size` is the **cross-protocol unified** "context occupancy" +/// value, derived from `tiycore::types::Usage::context_size()` = +/// `input + output + cache_read + cache_write`. Frontend code that asks +/// "how much of the context window are we using?" MUST read `context_size`, +/// not `total_tokens` — the latter no longer represents occupancy once we +/// accumulate usage across turns (the wire-level field is per-response, not +/// cumulative). #[derive(Debug, Clone, Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct RunUsageDto { @@ -303,7 +317,12 @@ pub struct RunUsageDto { pub output_tokens: u64, pub cache_read_tokens: u64, pub cache_write_tokens: u64, + /// Wire-level total reported by the provider (per-response, not cumulative). pub total_tokens: u64, + /// Cross-protocol unified context occupancy: + /// `input + output + cache_read + cache_write`. Use this for any + /// "context used" display or trigger logic. + pub context_size: u64, } impl From for RunUsageDto { @@ -314,6 +333,7 @@ impl From for RunUsageDto { cache_read_tokens: value.cache_read, cache_write_tokens: value.cache_write, total_tokens: value.total_tokens, + context_size: value.context_size(), } } } diff --git a/src-tauri/src/persistence/repo/run_helper_repo.rs b/src-tauri/src/persistence/repo/run_helper_repo.rs index 2acb3555..06eae05a 100644 --- a/src-tauri/src/persistence/repo/run_helper_repo.rs +++ b/src-tauri/src/persistence/repo/run_helper_repo.rs @@ -46,6 +46,15 @@ impl RunHelperRow { cache_read_tokens: self.cache_read_tokens.max(0) as u64, cache_write_tokens: self.cache_write_tokens.max(0) as u64, total_tokens: self.total_tokens.max(0) as u64, + // Reconstruct the cross-protocol unified context size + // from the persisted per-bucket fields. The DB schema + // doesn't store `context_size` (it would duplicate the + // four per-bucket fields); we derive it on read. + context_size: (self.input_tokens + + self.output_tokens + + self.cache_read_tokens + + self.cache_write_tokens) + .max(0) as u64, }, } } diff --git a/src-tauri/src/persistence/repo/run_repo.rs b/src-tauri/src/persistence/repo/run_repo.rs index ad783f65..52c9782b 100644 --- a/src-tauri/src/persistence/repo/run_repo.rs +++ b/src-tauri/src/persistence/repo/run_repo.rs @@ -364,6 +364,17 @@ fn map_run_summary(row: RunRow) -> RunSummaryDto { cache_read_tokens: row.cache_read_tokens.max(0) as u64, cache_write_tokens: row.cache_write_tokens.max(0) as u64, total_tokens: row.total_tokens.max(0) as u64, + // Derive the cross-protocol unified context size on read — + // see the doc-comment in the `RunUsageDto` declaration. + // The DB schema stores the four per-bucket token columns; + // `context_size` is a derived projection that we re-emit + // on every read so the frontend always has a unified + // "context occupancy" figure to display. + context_size: (row.input_tokens + + row.output_tokens + + row.cache_read_tokens + + row.cache_write_tokens) + .max(0) as u64, }, } } diff --git a/src-tauri/tests/frontend_integration.rs b/src-tauri/tests/frontend_integration.rs index f24ba175..966c9b3b 100644 --- a/src-tauri/tests/frontend_integration.rs +++ b/src-tauri/tests/frontend_integration.rs @@ -558,6 +558,10 @@ fn test_all_events_have_type_field() { cache_read_tokens: 0, cache_write_tokens: 0, total_tokens: 22, + // Mirrors `Usage::context_size()` from tiycore 0.2.10-rc.2 + // (= input + output + cache_read + cache_write = 10 + 12 + // + 0 + 0 = 22). + context_size: 22, }, }, ThreadStreamEvent::RunCheckpointed { run_id: "r".into() }, diff --git a/src/modules/workbench-shell/model/thread-store.ts b/src/modules/workbench-shell/model/thread-store.ts index f87adfc6..8089e08f 100644 --- a/src/modules/workbench-shell/model/thread-store.ts +++ b/src/modules/workbench-shell/model/thread-store.ts @@ -20,6 +20,14 @@ export type ThreadContextUsage = { outputTokens: number; cacheReadTokens: number; cacheWriteTokens: number; + /** + * Cross-protocol unified context occupancy + * (`input + output + cache_read + cache_write`). The badge uses this for + * percentage and "exceeded" detection. `totalTokens` is retained for + * wire-level reporting but is no longer used as the "used" figure. + */ + contextSize: number; + /** Wire-level total reported by the provider. Preserved for parity with the backend DTO. */ totalTokens: number; modelDisplayName: string | null; runId: string; diff --git a/src/modules/workbench-shell/ui/dashboard-workbench-logic.ts b/src/modules/workbench-shell/ui/dashboard-workbench-logic.ts index 9dfc20f1..30996970 100644 --- a/src/modules/workbench-shell/ui/dashboard-workbench-logic.ts +++ b/src/modules/workbench-shell/ui/dashboard-workbench-logic.ts @@ -138,18 +138,31 @@ export function buildThreadContextBadgeData(options: { const contextWindow = parseTokenCount(options.fallbackContextWindow) ?? parseTokenCount(options.runtimeUsage?.contextWindow); - const totalTokens = options.runtimeUsage?.totalTokens ?? 0; + // Use the cross-protocol unified `contextSize` (= input + output + + // cache_read + cache_write) as the "context occupancy" figure for the + // badge. This is what tiycore 0.2.10-rc.2 exposes via + // `Usage::context_size()` and works consistently across OpenAI / + // Anthropic / Google. `totalTokens` is intentionally NOT used here — it + // is the wire-level per-response total and is provider-dependent + // (OpenAI/Google: prompt+completion; Anthropic: input+output+cache). + const contextSize = options.runtimeUsage?.contextSize ?? 0; const inputTokens = options.runtimeUsage?.inputTokens ?? 0; const outputTokens = options.runtimeUsage?.outputTokens ?? 0; const cacheReadTokens = options.runtimeUsage?.cacheReadTokens ?? 0; const cacheWriteTokens = options.runtimeUsage?.cacheWriteTokens ?? 0; + const totalTokens = options.runtimeUsage?.totalTokens ?? 0; + // Anthropic / ZenMux(Anthropic) report cache reads as a separate bucket, but + // they still count against the prompt context window and the provider's input + // billing. Surface the combined "input" figure (raw input + cache hits) so + // the header's `In … · Out …` numbers match the `used / total` total above. + const effectiveInputTokens = inputTokens + cacheReadTokens; const rawUsedPercent = contextWindow && contextWindow > 0 - ? Math.round((totalTokens / contextWindow) * 100) + ? Math.round((contextSize / contextWindow) * 100) : 0; const usageRatio = contextWindow && contextWindow > 0 - ? Math.min(totalTokens / contextWindow, 1) + ? Math.min(contextSize / contextWindow, 1) : 0; const usedPercent = contextWindow && contextWindow > 0 @@ -157,7 +170,7 @@ export function buildThreadContextBadgeData(options: { : 0; const leftPercent = Math.max(0, 100 - rawUsedPercent); const isExceeded = Boolean( - contextWindow && contextWindow > 0 && totalTokens > contextWindow, + contextWindow && contextWindow > 0 && contextSize > contextWindow, ); return { @@ -166,6 +179,7 @@ export function buildThreadContextBadgeData(options: { outputTokens, cacheReadTokens, cacheWriteTokens, + effectiveInputTokens, isExceeded, leftPercent, modelDisplayName: @@ -174,8 +188,11 @@ export function buildThreadContextBadgeData(options: { null, rawUsedPercent, totalTokens, + // New: expose the source of truth for the percentage so consumers can + // label the figure precisely. + contextSize, usageRatio, - usedLabel: formatCompactTokenCount(totalTokens), + usedLabel: formatCompactTokenCount(contextSize), totalLabel: contextWindow ? formatCompactTokenCount(contextWindow) : "N/A", usedPercent, }; diff --git a/src/modules/workbench-shell/ui/dashboard-workbench.test.ts b/src/modules/workbench-shell/ui/dashboard-workbench.test.ts index d3a157c5..e09f9413 100644 --- a/src/modules/workbench-shell/ui/dashboard-workbench.test.ts +++ b/src/modules/workbench-shell/ui/dashboard-workbench.test.ts @@ -58,6 +58,12 @@ describe("buildThreadContextBadgeData", () => { modelDisplayName: "Old Runtime Model", outputTokens: 300, runId: "run-1", + // 1200 (input) + 300 (output) + 10 (cache_read) + 5 (cache_write) = 1515. + // Mirrors `Usage::context_size()` from tiycore 0.2.10-rc.2 + // (= input + output + cache_read + cache_write). Tests can override + // contextSize / totalTokens independently to assert the cross-protocol + // unified semantics. + contextSize: 1_515, totalTokens: 1_500, ...overrides, }; @@ -75,6 +81,8 @@ describe("buildThreadContextBadgeData", () => { expect(badge.contextWindow).toBe(16_000); expect(badge.modelDisplayName).toBe("Selected Model"); + // contextSize is the new "used" figure, distinct from totalTokens. + expect(badge.contextSize).toBe(1_515); expect(badge.totalTokens).toBe(1_500); expect(badge.isExceeded).toBe(false); }); @@ -94,11 +102,19 @@ describe("buildThreadContextBadgeData", () => { expect(badge.totalLabel).toBe("32K"); }); - it("marks usage as exceeded when used tokens are over the current context window", () => { + it("uses contextSize (not totalTokens) for the percentage when contextSize is larger", () => { + // The cross-protocol unified `contextSize` is the badge's "used" figure. + // Even when the wire-level `totalTokens` is below the context window, + // `contextSize` above the window should mark the badge as exceeded. + // This mirrors Anthropic: total_tokens (wire) excludes cache_read, but + // `context_size` adds it back. const badge = buildThreadContextBadgeData({ fallbackContextWindow: "1000", fallbackModelDisplayName: "Small Model", - runtimeUsage: makeRuntimeUsage({ totalTokens: 1_250 }), + runtimeUsage: makeRuntimeUsage({ + contextSize: 1_250, // exceeds 1000 + totalTokens: 900, // under 1000 (wire-level) + }), }); expect(badge.isExceeded).toBe(true); @@ -108,6 +124,29 @@ describe("buildThreadContextBadgeData", () => { expect(badge.usageRatio).toBe(1); }); + it("uses contextSize as the percentage source when it diverges from totalTokens", () => { + // When wire-level `totalTokens` exceeds the window but the unified + // `contextSize` does not, the badge reflects the unified value + // (the new "context occupancy" source of truth from + // `Usage::context_size()`). Wire-level `totalTokens` is retained on + // the DTO for downstream reporting; it is NOT used for the badge + // percentage anymore. + const badge = buildThreadContextBadgeData({ + fallbackContextWindow: "1000", + fallbackModelDisplayName: "Small Model", + runtimeUsage: makeRuntimeUsage({ + contextSize: 800, // under 1000 + totalTokens: 1_250, // over 1000 (wire-level) + }), + }); + + expect(badge.isExceeded).toBe(false); + expect(badge.rawUsedPercent).toBe(80); + // The DTO still carries the wire-level total for consumers that + // want it; the badge just doesn't use it for percentages. + expect(badge.totalTokens).toBe(1_250); + }); + it("does not exceed when no valid context window is available", () => { const badge = buildThreadContextBadgeData({ fallbackContextWindow: null, diff --git a/src/modules/workbench-shell/ui/dashboard-workbench.tsx b/src/modules/workbench-shell/ui/dashboard-workbench.tsx index f2dd227c..8173430e 100644 --- a/src/modules/workbench-shell/ui/dashboard-workbench.tsx +++ b/src/modules/workbench-shell/ui/dashboard-workbench.tsx @@ -1097,7 +1097,7 @@ const drawerWidth = useStore(uiLayoutStore, (s) => s.drawerWidth);

In{" "} {formatCompactTokenCount( - contextBadge.inputTokens, + contextBadge.effectiveInputTokens, )}{" "} · Out{" "} {formatCompactTokenCount( diff --git a/src/modules/workbench-shell/ui/runtime-thread-surface-state.ts b/src/modules/workbench-shell/ui/runtime-thread-surface-state.ts index dab0e8be..dc8a284b 100644 --- a/src/modules/workbench-shell/ui/runtime-thread-surface-state.ts +++ b/src/modules/workbench-shell/ui/runtime-thread-surface-state.ts @@ -517,12 +517,27 @@ export function mapRunSummaryToContextUsage(run: RunSummaryDto | null) { return null; } + // Prefer the cross-protocol unified `contextSize` from tiycore + // 0.2.10-rc.2 (set in the Rust DTO via `Usage::context_size()` = + // input + output + cache_read + cache_write). Fall back to that sum + // when the field is missing — e.g. older persisted snapshots + // written before the upgrade. + const explicitContextSize = run.usage.contextSize ?? 0; + const contextSize = + explicitContextSize > 0 + ? explicitContextSize + : run.usage.inputTokens + + run.usage.outputTokens + + run.usage.cacheReadTokens + + run.usage.cacheWriteTokens; + return { contextWindow: run.contextWindow, inputTokens: run.usage.inputTokens, outputTokens: run.usage.outputTokens, cacheReadTokens: run.usage.cacheReadTokens, cacheWriteTokens: run.usage.cacheWriteTokens, + contextSize, totalTokens: run.usage.totalTokens, modelDisplayName: run.modelDisplayName, runId: run.id, diff --git a/src/modules/workbench-shell/ui/runtime-thread-surface.test.tsx b/src/modules/workbench-shell/ui/runtime-thread-surface.test.tsx index d2370961..1fd5f8fb 100644 --- a/src/modules/workbench-shell/ui/runtime-thread-surface.test.tsx +++ b/src/modules/workbench-shell/ui/runtime-thread-surface.test.tsx @@ -57,6 +57,10 @@ function makeSnapshot(activeStatus: RunStatus | null): ThreadSnapshotDto { cacheReadTokens: 0, cacheWriteTokens: 0, totalTokens: 0, + // Cross-protocol unified context occupancy, defaulting to 0 + // for an empty/seed snapshot. tiycore 0.2.10-rc.2 derives this + // as input + output + cache_read + cache_write. + contextSize: 0, }, } : null, diff --git a/src/modules/workbench-shell/ui/runtime-thread-surface.tsx b/src/modules/workbench-shell/ui/runtime-thread-surface.tsx index cb357f46..b53b4eb9 100644 --- a/src/modules/workbench-shell/ui/runtime-thread-surface.tsx +++ b/src/modules/workbench-shell/ui/runtime-thread-surface.tsx @@ -1157,6 +1157,17 @@ export function RuntimeThreadSurface({ outputTokens: event.usage.outputTokens, cacheReadTokens: event.usage.cacheReadTokens, cacheWriteTokens: event.usage.cacheWriteTokens, + // Prefer the cross-protocol unified `contextSize` from + // tiycore 0.2.10-rc.2 (= input + output + cache_read + + // cache_write). Fall back to that sum when the field is missing + // (older payloads or hand-crafted events). + contextSize: + event.usage.contextSize > 0 + ? event.usage.contextSize + : event.usage.inputTokens + + event.usage.outputTokens + + event.usage.cacheReadTokens + + event.usage.cacheWriteTokens, totalTokens: event.usage.totalTokens, modelDisplayName: event.modelDisplayName, runId: event.runId, diff --git a/src/modules/workbench-shell/ui/workbench-top-bar.tsx b/src/modules/workbench-shell/ui/workbench-top-bar.tsx index 92a00140..b5b8c7ff 100644 --- a/src/modules/workbench-shell/ui/workbench-top-bar.tsx +++ b/src/modules/workbench-shell/ui/workbench-top-bar.tsx @@ -138,6 +138,12 @@ export function WorkbenchTopBar({ return (

+ {isMacOS ? ( +
+ ) : null}
diff --git a/src/services/bridge/agent-commands.ts b/src/services/bridge/agent-commands.ts index 9e1c17ac..9159bc83 100644 --- a/src/services/bridge/agent-commands.ts +++ b/src/services/bridge/agent-commands.ts @@ -178,40 +178,41 @@ function readActivity( function readUsage(event: RawThreadStreamEvent): RunUsageDto { const value = readValue(event, "usage", "usage") as Record | null | undefined; + const inputTokens = readUsageField(value, "inputTokens", "input_tokens"); + const outputTokens = readUsageField(value, "outputTokens", "output_tokens"); + const cacheReadTokens = readUsageField(value, "cacheReadTokens", "cache_read_tokens"); + const cacheWriteTokens = readUsageField(value, "cacheWriteTokens", "cache_write_tokens"); + const totalTokens = readUsageField(value, "totalTokens", "total_tokens"); + // Prefer the unified `context_size` field sent by tiycore >= 0.2.10-rc.2 + // (it sums input + output + cache_read + cache_write consistently across + // OpenAI / Anthropic / Google). Fall back to that sum when the field is + // missing — e.g. older persisted snapshots or partial hand-crafted events. + const explicitContextSize = readUsageField(value, "contextSize", "context_size"); + const contextSize = + explicitContextSize > 0 + ? explicitContextSize + : inputTokens + outputTokens + cacheReadTokens + cacheWriteTokens; return { - inputTokens: - typeof value?.inputTokens === "number" - ? value.inputTokens - : typeof value?.input_tokens === "number" - ? value.input_tokens - : 0, - outputTokens: - typeof value?.outputTokens === "number" - ? value.outputTokens - : typeof value?.output_tokens === "number" - ? value.output_tokens - : 0, - cacheReadTokens: - typeof value?.cacheReadTokens === "number" - ? value.cacheReadTokens - : typeof value?.cache_read_tokens === "number" - ? value.cache_read_tokens - : 0, - cacheWriteTokens: - typeof value?.cacheWriteTokens === "number" - ? value.cacheWriteTokens - : typeof value?.cache_write_tokens === "number" - ? value.cache_write_tokens - : 0, - totalTokens: - typeof value?.totalTokens === "number" - ? value.totalTokens - : typeof value?.total_tokens === "number" - ? value.total_tokens - : 0, + inputTokens, + outputTokens, + cacheReadTokens, + cacheWriteTokens, + totalTokens, + contextSize, }; } +function readUsageField( + value: Record | null | undefined, + camelKey: string, + snakeKey: string, +): number { + if (!value) return 0; + if (typeof value[camelKey] === "number") return value[camelKey]; + if (typeof value[snakeKey] === "number") return value[snakeKey]; + return 0; +} + function readRuntimeQueueSnapshot(value: unknown): RuntimeQueueSnapshotDto { const fallbackId = () => Math.random().toString(36).slice(2); const raw = value && typeof value === "object" ? value as Record : {}; diff --git a/src/services/thread-stream/thread-stream.test.ts b/src/services/thread-stream/thread-stream.test.ts index 1b9a2140..51aa3a59 100644 --- a/src/services/thread-stream/thread-stream.test.ts +++ b/src/services/thread-stream/thread-stream.test.ts @@ -40,6 +40,9 @@ const usage = { cacheReadTokens: 3, cacheWriteTokens: 4, totalTokens: 10, + // Cross-protocol unified context occupancy from tiycore 0.2.10-rc.2: + // input + output + cache_read + cache_write = 1 + 2 + 3 + 4 = 10. + contextSize: 10, }; const helperSnapshot = { diff --git a/src/shared/types/api.ts b/src/shared/types/api.ts index 754934f1..1f9056a9 100644 --- a/src/shared/types/api.ts +++ b/src/shared/types/api.ts @@ -382,7 +382,20 @@ export interface RunUsageDto { outputTokens: number; cacheReadTokens: number; cacheWriteTokens: number; + /** + * Wire-level total reported by the provider. This is **per-response** and + * is NOT a reliable "context occupancy" value (different providers sum + * different buckets). Prefer {@link contextSize} for any "context used" + * display or trigger logic. + */ totalTokens: number; + /** + * Cross-protocol unified context occupancy, derived from + * `tiycore::types::Usage::context_size()` = + * `input + output + cache_read + cache_write`. Use this for the context + * badge, percentage calculations, and compression triggers. + */ + contextSize: number; } export interface ToolCallDto {