Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ keepawake = "0.6"
portable-pty = "0.9"
git2 = { version = "0.20", features = ["vendored-libgit2", "vendored-openssl"] }
similar = "2"
tiycore = "0.2.9"
tiycore = "0.2.10-rc.2"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW] Cargo.toml depends on a release-candidate (rc) version of tiycore

The tiycore dependency now points to a pre-release (rc) version, which may be acceptable during development but should be stabilised before merging to master.

Suggestion: Change to a stable release version before promoting the branch, or justify the rc dependency in the PR description.

Risk: Using an rc in the mainline can cause unexpected churn when the crate is yanked or bumped, and may violate internal publishing policies.

Confidence: 0.85

[From SubAgent: general]


# Gateway (IM channel support)
async-trait = "0.1"
Expand Down
62 changes: 61 additions & 1 deletion src-tauri/src/core/agent_run_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::ipc::app_events::{
use crate::ipc::frontend_channels::ThreadStreamEvent;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review completed for this PR diff. No concrete inline issue was selected after aggregation.

use crate::model::errors::{AppError, ErrorSource};
use crate::model::thread::{MessageRecord, RunStatus, ThreadStatus};
use crate::persistence::repo::{message_repo, run_repo, thread_repo};
use crate::persistence::repo::{message_repo, run_helper_repo, run_repo, thread_repo};

use super::agent_run_manager::{AgentRunManager, StartRunOptions};

Expand Down Expand Up @@ -415,6 +415,17 @@ impl AgentRunManager {
}
}
ThreadStreamEvent::ThreadUsageUpdated { usage, .. } => {
// `usage` here is a `RunUsageDto` populated by tiycore via
// `From<tiycore::types::Usage>`. Its `context_size` field is
// the cross-protocol unified "context occupancy" value
// (`Usage::context_size()` = input + output + cache_read +
// cache_write), NOT the wire-level `total_tokens` (which is
// per-response and provider-dependent). We round-trip the
// whole struct (input/output/cache fields + total_tokens)
// through `tiycore::types::Usage` because the persistence
// layer is typed against `Usage`; `context_size` is
// re-derived inside the `From` impl on read, so storing only
// the raw fields is sufficient and forward-compatible.
let usage = tiycore::types::Usage {
input: usage.input_tokens,
output: usage.output_tokens,
Expand Down Expand Up @@ -913,6 +924,55 @@ impl AgentRunManager {
});
}

// Backstop: converge any helper that is still non-terminal in the DB now
// that the run is finishing. A subagent (e.g. a Judge) whose own cleanup
// future was dropped during an interrupt/cancel can otherwise linger at
// `running` forever, since the frontend only flips helper status from
// live SubagentCompleted/Failed events. We mark them `interrupted` and
// emit a matching SubagentFailed so both the DB snapshot and the live
// event stream converge.
let helper_interrupt_reason = match status {
RunStatus::Cancelled => "Subagent interrupted because the run was cancelled.",
RunStatus::Interrupted => "Subagent interrupted because the run was interrupted.",
_ => "Subagent interrupted because the run finished while it was still active.",
};
match run_helper_repo::interrupt_non_terminal_by_run(
&self.pool,
run_id,
helper_interrupt_reason,
)
.await
{
Ok(orphaned) => {
if !orphaned.is_empty() {
tracing::warn!(
run_id = %run_id,
count = orphaned.len(),
"converged orphaned non-terminal run helpers to interrupted"
);
}
for helper in orphaned {
let _ = frontend_tx.send(ThreadStreamEvent::SubagentFailed {
run_id: run_id.to_string(),
subtask_id: helper.id,
helper_kind: helper.helper_kind,
started_at: helper.started_at,
error: helper_interrupt_reason.to_string(),
snapshot:
crate::core::subagent::orchestrator::SubagentProgressSnapshot::default(
),
});
}
}
Err(error) => {
tracing::warn!(
run_id = %run_id,
%error,
"failed to converge orphaned run helpers while finishing run"
);
}
}

finalize_run(
&self.pool,
self.app_events.as_ref(),
Expand Down
173 changes: 89 additions & 84 deletions src-tauri/src/core/agent_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,7 @@ fn append_runtime_queue_message(
}

use crate::core::agent_session_compression::{
build_initial_context_token_calibration, current_context_token_calibration,
persist_compression_markers_to_pool, record_pending_prompt_estimate, run_auto_compression,
ContextCompressionRuntimeState,
persist_compression_markers_to_pool, run_auto_compression,
};
use crate::core::agent_session_events::handle_agent_event;
pub(crate) use crate::core::agent_session_history::*;
Expand Down Expand Up @@ -562,7 +560,12 @@ pub async fn build_session_spec(
.collect();
let history_tool_calls =
tool_call_repo::list_parent_visible_by_run_ids(pool, &history_run_ids).await?;
let latest_historical_run =
// No longer read after the calibration seed was removed; the
// `find_latest_with_prompt_usage_by_thread_excluding_run` call
// above is kept as documentation that historical run usage is
// intentionally not used by the new `Usage::context_size()`
// trigger (the first LLM response provides the source of truth).
let _latest_historical_run =
run_repo::find_latest_with_prompt_usage_by_thread_excluding_run(pool, thread_id, run_id)
.await?;

Expand Down Expand Up @@ -629,13 +632,11 @@ pub async fn build_session_spec(
}
}

let initial_context_calibration = build_initial_context_token_calibration(
latest_historical_run.as_ref(),
&history_messages,
&history_tool_calls,
&resolved_plan.primary,
&system_prompt,
);
// The previous initial-context calibration seed was removed when we
// switched auto-compression to the unified `Usage::context_size()`
// trigger (see `should_compress_via_context_size`). The trigger no
// longer needs a per-history-run calibration — it compares the most
// recent LLM usage against the budget directly.

Ok(AgentSessionSpec {
run_id: run_id.to_string(),
Expand All @@ -649,7 +650,6 @@ pub async fn build_session_spec(
history_tool_calls,
model_plan: resolved_plan,
initial_prompt: None,
initial_context_calibration,
cache_arbiter,
})
}
Expand All @@ -664,7 +664,13 @@ pub struct AgentSession {
cancel_requested: Arc<AtomicBool>,
pub(crate) checkpoint_requested: AtomicBool,
pub(crate) abort_signal: tiycore::agent::AbortSignal,
context_compression_state: Arc<StdMutex<ContextCompressionRuntimeState>>,
/// Most recent `tiycore::types::Usage` observed from the last LLM
/// call, shared with the `set_transform_context` closure. The
/// closure compares `usage.context_size()` against the configured
/// `CompressionSettings::budget()` to decide whether the next turn
/// needs auto-compression. Shared with the event handler so the
/// trigger stays in lockstep with the stream of usage updates.
last_observed_usage: Arc<StdMutex<Option<Usage>>>,
runtime_queue_state: Arc<StdMutex<RuntimeQueueState>>,
/// Shared goal runtime state for tool call recording across command invocations.
pub(crate) goal_runtime: Arc<std::sync::Mutex<GoalRuntimeState>>,
Expand All @@ -682,17 +688,18 @@ impl AgentSession {
) -> Arc<Self> {
Arc::new_cyclic(|weak_self| {
let agent = Arc::new(Agent::with_model(spec.model_plan.primary.model.clone()));
let context_compression_state = Arc::new(StdMutex::new(
ContextCompressionRuntimeState::new(spec.initial_context_calibration),
));
// Unified trigger: start with no observed usage; the
// `set_transform_context` closure defers its first decision
// until the first LLM response reports `context_size`.
let last_observed_usage = Arc::new(StdMutex::new(None::<Usage>));
let runtime_queue_state = Arc::new(StdMutex::new(RuntimeQueueState::default()));
agent.set_max_turns(max_turns);
agent.set_max_retries(Some(TIYCORE_REQUEST_MAX_RETRIES));
configure_agent(
&agent,
&spec,
weak_self.clone(),
Arc::clone(&context_compression_state),
Arc::clone(&last_observed_usage),
Arc::clone(&runtime_queue_state),
event_tx.clone(),
);
Expand All @@ -707,7 +714,7 @@ impl AgentSession {
cancel_requested: Arc::new(AtomicBool::new(false)),
checkpoint_requested: AtomicBool::new(false),
abort_signal: tiycore::agent::AbortSignal::new(),
context_compression_state,
last_observed_usage,
runtime_queue_state,
goal_runtime,
}
Expand Down Expand Up @@ -934,8 +941,8 @@ impl AgentSession {
let last_completed_message_id_ref = Arc::clone(&last_completed_message_id);
let reasoning_message_id_ref = Arc::clone(&current_reasoning_message_id);
let last_usage_ref = Arc::clone(&last_usage);
let last_observed_usage_ref = Arc::clone(&self.last_observed_usage);
let reasoning_ref = Arc::clone(&reasoning_buffer);
let context_compression_state_ref = Arc::clone(&self.context_compression_state);
let current_turn_index: Arc<StdMutex<Option<usize>>> = Arc::new(StdMutex::new(None));
let turn_index_ref = Arc::clone(&current_turn_index);
let last_text_delta = Arc::new(StdMutex::new(None::<String>));
Expand All @@ -948,7 +955,7 @@ impl AgentSession {
&last_completed_message_id_ref,
&reasoning_message_id_ref,
&last_usage_ref,
&context_compression_state_ref,
&last_observed_usage_ref,
&reasoning_ref,
&turn_index_ref,
&last_text_delta_ref,
Expand Down Expand Up @@ -1084,7 +1091,7 @@ fn configure_agent(
agent: &Arc<Agent>,
spec: &AgentSessionSpec,
weak_self: Weak<AgentSession>,
context_compression_state: Arc<StdMutex<ContextCompressionRuntimeState>>,
last_observed_usage: Arc<StdMutex<Option<Usage>>>,
runtime_queue_state: Arc<StdMutex<RuntimeQueueState>>,
event_tx: mpsc::UnboundedSender<ThreadStreamEvent>,
) {
Expand All @@ -1100,27 +1107,39 @@ fn configure_agent(
agent.set_transport(spec.model_plan.transport);
agent.set_security_config(main_agent_security_config());

// Context compression: when messages exceed the token budget, generate
// a summary with the primary model, persist markers to DB, and keep only
// recent messages. Falls back to pure truncation if the LLM call fails.
// Context compression: when the most recent LLM call reported a
// unified context size above the input budget, summarise the older
// messages with the primary model, persist markers to DB, and keep
// only recent messages. Falls back to pure truncation if the LLM
// call fails.
//
// The trigger is `Usage::context_size()` from tiycore 0.2.10-rc.2 —
// the cross-protocol unified "context occupancy" value
// (input + output + cache_read + cache_write). The previous heuristic
// (`estimate_total_tokens(messages) + system_prompt_estimated_tokens`
// scaled by an observed `ContextTokenCalibration` ratio) was removed
// because it could disagree with the actual provider usage and let an
// over-budget call slip through, and because the heuristic + CJK
// weight drift made the trigger point unpredictable.
//
// Two correctness hazards addressed here:
// Two correctness hazards addressed in [`run_auto_compression`]:
//
// 1. UUID v7 timing. The reset marker we write to DB uses `now_v7()`, but
// `cut_point` in-memory points at a slice that includes messages the
// current run persisted EARLIER than this call. A naive
// `list_since_last_reset WHERE id >= reset_id` would exclude those
// earlier messages and effectively "lose" the current user prompt on
// the next reload. We therefore resolve a DB-backed boundary id
// conservatively covering all recent messages and attach it to the
// reset marker's metadata.
// 1. UUID v7 timing. The reset marker we write to DB uses `now_v7()`,
// but `cut_point` in-memory points at a slice that includes
// messages the current run persisted EARLIER than this call. A
// naive `list_since_last_reset WHERE id >= reset_id` would exclude
// those earlier messages and effectively "lose" the current user
// prompt on the next reload. We therefore resolve a DB-backed
// boundary id conservatively covering all recent messages and
// attach it to the reset marker's metadata.
//
// 2. Summary-of-summary decay. If a previous auto-compression already
// injected a `<context_summary>` as the head of `messages`, naively
// re-summarising `old_messages` would re-summarise an already-
// summarised prefix, losing detail each pass. Instead we detect the
// prior summary, treat it as a pinned prefix, and ask the model to
// **merge** the prior summary with the delta of messages since then.
// summarised prefix, losing detail each pass. Instead we detect
// the prior summary, treat it as a pinned prefix, and ask the
// model to **merge** the prior summary with the delta of messages
// since then.
let compression_settings = crate::core::context_compression::CompressionSettings::new(
spec.model_plan.primary.model.context_window,
);
Expand All @@ -1129,33 +1148,26 @@ fn configure_agent(
let compression_thread_id = spec.thread_id.clone();
let compression_run_id = spec.run_id.clone();
let compression_response_language = spec.model_plan.raw.response_language.clone();
let compression_state = Arc::clone(&context_compression_state);
// Pre-compute the system prompt's estimated token count so the
// compression check includes fixed overhead that the provider counts
// against the context window but `estimate_total_tokens(messages)`
// does not see. This narrows the gap the calibration ratio has to
// cover, making the trigger point more predictable.
let system_prompt_estimated_tokens =
crate::core::context_compression::estimate_tokens(&spec.system_prompt);
let last_observed_usage_for_trigger = Arc::clone(&last_observed_usage);
agent.set_transform_context(move |messages| {
// Cheap pass-through check first: only clone the heavy captured state
// (ResolvedModelRole, String ids, Weak) when compression will actually
// run. For long sessions with many turns this avoids per-turn heap
// allocations when the thread is still well under budget.
// Cheap pass-through check first: only clone the heavy captured
// state (ResolvedModelRole, String ids, Weak) when compression
// will actually run. For long sessions with many turns this
// avoids per-turn heap allocations when the thread is still
// well under budget.
let settings = compression_settings.clone();
let raw_estimated_tokens =
crate::core::context_compression::estimate_total_tokens(&messages)
.saturating_add(system_prompt_estimated_tokens);
let calibration = current_context_token_calibration(&compression_state);
let calibrated_total_tokens = crate::core::context_compression::calibrate_total_tokens(
raw_estimated_tokens,
Some(calibration),
);
let needs_compression = !messages.is_empty()
&& crate::core::context_compression::should_compress_total_tokens(
calibrated_total_tokens,
&settings,
);
let snapshot: Option<Usage> = last_observed_usage_for_trigger
.lock()
.ok()
.and_then(|guard| *guard);
// `should_compress_via_context_size` returns `false` for
// `None` (no observed usage yet) — we defer the first decision
// until the first LLM response reports its `context_size`,
// matching the design note in `context_compression`.
let needs_compression = crate::core::context_compression::should_compress_via_context_size(
snapshot.as_ref(),
&settings,
) && !messages.is_empty();

let model_role = if needs_compression {
Some(primary_model_role.clone())
Expand All @@ -1182,31 +1194,24 @@ fn configure_agent(
} else {
None
};
let compression_state = Arc::clone(&compression_state);

async move {
let transformed_messages = if !needs_compression {
messages
} else {
// Unwraps are sound: all `Some(_)` are populated together under
// `needs_compression`, so either all four are `Some` (compression
// path) or we returned above.
run_auto_compression(
messages,
settings,
model_role.expect("model_role populated when compressing"),
weak.expect("weak populated when compressing"),
thread_id.expect("thread_id populated when compressing"),
run_id.expect("run_id populated when compressing"),
response_language.expect("response_language populated when compressing"),
)
.await
};
let sent_estimated_tokens =
crate::core::context_compression::estimate_total_tokens(&transformed_messages)
.saturating_add(system_prompt_estimated_tokens);
record_pending_prompt_estimate(&compression_state, sent_estimated_tokens);
transformed_messages
if !needs_compression {
return messages;
}
// Unwraps are sound: all `Some(_)` are populated together
// under `needs_compression`, so either all six are `Some`
// (compression path) or we returned above.
run_auto_compression(
messages,
settings,
model_role.expect("model_role populated when compressing"),
weak.expect("weak populated when compressing"),
thread_id.expect("thread_id populated when compressing"),
run_id.expect("run_id populated when compressing"),
response_language.expect("response_language populated when compressing"),
)
.await
}
});

Expand Down
Loading
Loading