From 015b851adf787d43abf21f7aba6ed3919899a98d Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Mon, 11 May 2026 23:40:00 -0700 Subject: [PATCH] feat(learning): summarizer LLM, tool-call digests, orchestrator-only reflection (#1419) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New `SummarizerProvider` trait + `ConfiguredSummarizer` wrapper so reflection + transcript-ingest can run on a cheap dedicated model instead of the orchestrator-tier provider. Opt-in via `learning.summarizer.enabled`; heuristic fallback from #1406 stays intact for offline users. - Tool-call digest layer (`learning::summarizer::digest`) collapses multi-call tool history into per-tool aggregates (count, success rate, p95 duration, bounded input/output samples) so the smaller summarizer context window never sees raw payloads. - Reflection prompt now compresses repeated tool calls into digests before the LLM call; preserves the legacy per-call rendering when every tool was invoked at most once (keeps existing assertions green). - Reflection hook + transcript-ingestion spawn are gated to the orchestrator agent only — sub-agent transcripts no longer trigger reflection or conversational-memory writes. - Two-stage transcript ingest: heuristic Stage 1 (unchanged) + Stage 2 near-duplicate merge over Jaccard similarity on tokens. Collapses paraphrases of the same preference / decision before dedupe + persist. - Telemetry: summarizer dispatches log label, model hint, input chars, cap, fill ratio, latency. Merge pass logs compressed-pair counts. - Tests: per-tool digest grouping/truncation, summarizer routing end-to-end, prompt compression on repeated tool calls, near-duplicate preference collapse with provenance merging. --- .../agent/harness/session/builder.rs | 71 ++++- src/openhuman/agent/harness/session/turn.rs | 11 + src/openhuman/config/mod.rs | 7 +- src/openhuman/config/schema/learning.rs | 73 +++++ src/openhuman/config/schema/mod.rs | 2 +- src/openhuman/learning/mod.rs | 5 + src/openhuman/learning/reflection.rs | 88 +++++- src/openhuman/learning/reflection_tests.rs | 100 +++++++ src/openhuman/learning/summarizer/digest.rs | 221 +++++++++++++++ src/openhuman/learning/summarizer/mod.rs | 145 ++++++++++ src/openhuman/learning/summarizer/tests.rs | 61 ++++ .../learning/transcript_ingest/merge.rs | 266 ++++++++++++++++++ .../learning/transcript_ingest/mod.rs | 34 +++ 13 files changed, 1065 insertions(+), 19 deletions(-) create mode 100644 src/openhuman/learning/summarizer/digest.rs create mode 100644 src/openhuman/learning/summarizer/mod.rs create mode 100644 src/openhuman/learning/summarizer/tests.rs create mode 100644 src/openhuman/learning/transcript_ingest/merge.rs diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 739eb9de16..2ecaa99eeb 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -792,7 +792,14 @@ impl Agent { let mut post_turn_hooks: Vec> = Vec::new(); if config.learning.enabled { - if config.learning.reflection_enabled { + // #1419: reflection only fires for the user-facing orchestrator. + // Sub-agent transcripts are short-lived, scoped to a single + // delegation, and almost never carry durable user context worth + // surfacing in future chats. Gating here keeps the hook off + // every specialist / archivist / welcome session. + let is_orchestrator = agent_id == "orchestrator"; + + if config.learning.reflection_enabled && is_orchestrator { // Only the reflection hook needs an owned snapshot of the // full config, so create the `Arc` lazily inside this // branch instead of paying for the clone whenever @@ -814,16 +821,66 @@ impl Agent { } else { None }; - post_turn_hooks.push(Arc::new(crate::openhuman::learning::ReflectionHook::new( - config.learning.clone(), - full_config.clone(), - memory.clone(), - reflection_provider, - ))); + + // #1419: opt-in dedicated summarizer. When the cloud + // source is selected we reuse the same routed provider + // and just route the call through a different model + // hint. Local summarizers are handled inside + // ReflectionHook itself via the legacy local-AI path, + // so we skip the trait wiring there. + let summarizer: Option> = + if config.learning.summarizer.enabled + && config.learning.summarizer.source + == crate::openhuman::config::SummarizerSource::Cloud + { + let provider: Arc = + match reflection_provider.clone() { + Some(p) => p, + None => Arc::from(providers::create_routed_provider( + config.api_url.as_deref(), + config.api_key.as_deref(), + &config.reliability, + &config.model_routes, + &model_name, + )?), + }; + Some(Arc::new( + crate::openhuman::learning::ConfiguredSummarizer::new( + provider, + config.learning.summarizer.model_hint.clone(), + config.learning.summarizer.max_context_chars, + "reflection-summarizer", + ), + )) + } else { + None + }; + if summarizer.is_some() { + log::info!( + "[learning] reflection summarizer enabled (model_hint={}, cap={})", + config.learning.summarizer.model_hint, + config.learning.summarizer.max_context_chars, + ); + } + + post_turn_hooks.push(Arc::new( + crate::openhuman::learning::ReflectionHook::with_summarizer( + config.learning.clone(), + full_config.clone(), + memory.clone(), + reflection_provider, + summarizer, + ), + )); log::info!( "[learning] reflection hook registered (source={:?})", config.learning.reflection_source ); + } else if config.learning.reflection_enabled { + log::debug!( + "[learning] reflection hook skipped — agent '{}' is not the orchestrator", + agent_id + ); } if config.learning.user_profile_enabled { diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index 7f66d2e21a..8b8a242653 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -1495,6 +1495,17 @@ impl Agent { /// /// Fire-and-forget: failures are logged, never propagated. pub(super) fn spawn_transcript_ingestion(&self) { + // #1419: transcript ingest only fires for the user-facing + // orchestrator. Sub-agent transcripts are short-lived, scoped + // to a single delegation, and almost never carry durable user + // context worth lifting into `conversation_memory`. + if self.agent_definition_name != "orchestrator" { + log::debug!( + "[transcript_ingest] skipping spawn — agent '{}' is not the orchestrator", + self.agent_definition_name + ); + return; + } let Some(path) = self.session_transcript_path.clone() else { log::debug!("[transcript_ingest] no session transcript path yet — skipping spawn"); return; diff --git a/src/openhuman/config/mod.rs b/src/openhuman/config/mod.rs index ef3716c495..27d5a40a4b 100644 --- a/src/openhuman/config/mod.rs +++ b/src/openhuman/config/mod.rs @@ -34,9 +34,10 @@ pub use schema::{ ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SchedulerGateConfig, SchedulerGateMode, ScreenIntelligenceConfig, SecretsConfig, SecurityConfig, SlackConfig, StorageConfig, StorageProviderConfig, - StorageProviderSection, StreamMode, TelegramConfig, UpdateConfig, UpdateRestartStrategy, - VoiceActivationMode, VoiceServerConfig, WebSearchConfig, WebhookConfig, - DEFAULT_CLOUD_LLM_MODEL, DEFAULT_MODEL, MODEL_AGENTIC_V1, MODEL_CODING_V1, MODEL_REASONING_V1, + StorageProviderSection, StreamMode, SummarizerConfig, SummarizerSource, TelegramConfig, + UpdateConfig, UpdateRestartStrategy, VoiceActivationMode, VoiceServerConfig, WebSearchConfig, + WebhookConfig, DEFAULT_CLOUD_LLM_MODEL, DEFAULT_MODEL, MODEL_AGENTIC_V1, MODEL_CODING_V1, + MODEL_REASONING_V1, }; pub use schema::{ clear_active_user, default_root_openhuman_dir, pre_login_user_dir, read_active_user_id, diff --git a/src/openhuman/config/schema/learning.rs b/src/openhuman/config/schema/learning.rs index c85d65449f..b08805d601 100644 --- a/src/openhuman/config/schema/learning.rs +++ b/src/openhuman/config/schema/learning.rs @@ -78,6 +78,78 @@ pub struct LearningConfig { /// How often the periodic rebuild loop runs in seconds. Default: 1800 (30 minutes). #[serde(default = "default_rebuild_interval_secs")] pub rebuild_interval_secs: u64, + + /// Summarizer LLM configuration (#1419). + /// + /// Reflection + transcript-ingest are going to fire often; running + /// them on the orchestrator-tier model is wasteful. When + /// `summarizer.enabled = true` and a model hint is configured, the + /// learning pipeline routes its LLM calls to this cheaper model + /// instead. When disabled (the default) callers fall back to the + /// heuristic path from #1406 / the orchestrator-tier provider, so + /// users without a summarizer configured never silently break. + #[serde(default)] + pub summarizer: SummarizerConfig, +} + +/// Where the summarizer model runs. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum SummarizerSource { + /// Use the configured cloud provider routed via a model hint. + #[default] + Cloud, + /// Use the local Ollama summarizer model via `LocalAiService::prompt()`. + Local, +} + +/// Configuration for the dedicated cheap summarizer used by the +/// reflection / transcript-ingest paths (#1419). Pluggable model +/// separate from the orchestrator provider, carries its own context +/// window cap so callers can budget compression. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct SummarizerConfig { + /// Master switch. Default: false — leave the heuristic-only path + /// from #1406 as the source of truth for offline users. + #[serde(default)] + pub enabled: bool, + + /// Where the summarizer runs (cloud routed via model hint vs local + /// Ollama). Default: cloud. + #[serde(default)] + pub source: SummarizerSource, + + /// Model hint string passed to `Provider::simple_chat` when + /// `source == Cloud`. Defaults to `"hint:fast"` so a typical + /// `model_routes` config routes summarizer traffic to the cheapest + /// tier without the user touching this field. + #[serde(default = "default_summarizer_model_hint")] + pub model_hint: String, + + /// Approximate character budget for the summarizer's prompt input. + /// Callers compress / truncate inputs to stay under this cap. The + /// implementation clamps to a sane minimum (1024 chars). + #[serde(default = "default_summarizer_context_chars")] + pub max_context_chars: usize, +} + +fn default_summarizer_model_hint() -> String { + "hint:fast".to_string() +} + +fn default_summarizer_context_chars() -> usize { + 6_000 +} + +impl Default for SummarizerConfig { + fn default() -> Self { + Self { + enabled: false, + source: SummarizerSource::default(), + model_hint: default_summarizer_model_hint(), + max_context_chars: default_summarizer_context_chars(), + } + } } fn default_rebuild_interval_secs() -> u64 { @@ -110,6 +182,7 @@ impl Default for LearningConfig { chat_to_tree_enabled: default_true(), stability_detector_enabled: default_true(), rebuild_interval_secs: default_rebuild_interval_secs(), + summarizer: SummarizerConfig::default(), } } } diff --git a/src/openhuman/config/schema/mod.rs b/src/openhuman/config/schema/mod.rs index f89074b801..81aa4afb82 100644 --- a/src/openhuman/config/schema/mod.rs +++ b/src/openhuman/config/schema/mod.rs @@ -44,7 +44,7 @@ pub use context::ContextConfig; pub use dictation::{DictationActivationMode, DictationConfig}; pub use heartbeat_cron::{CronConfig, HeartbeatConfig}; pub use identity_cost::{CostConfig, ModelPricing}; -pub use learning::{LearningConfig, ReflectionSource}; +pub use learning::{LearningConfig, ReflectionSource, SummarizerConfig, SummarizerSource}; pub use local_ai::{LocalAiConfig, LocalAiUsage}; pub use meet::MeetConfig; pub use node::NodeConfig; diff --git a/src/openhuman/learning/mod.rs b/src/openhuman/learning/mod.rs index 2329a2984a..0cdfbecb82 100644 --- a/src/openhuman/learning/mod.rs +++ b/src/openhuman/learning/mod.rs @@ -31,6 +31,7 @@ pub mod reflection; pub mod scheduler; pub mod schemas; pub mod stability_detector; +pub mod summarizer; pub mod tool_tracker; pub mod transcript_ingest; pub mod user_profile; @@ -47,5 +48,9 @@ pub use schemas::{ all_learning_controller_schemas, all_learning_registered_controllers, learning_schemas, }; pub use stability_detector::StabilityDetector; +pub use summarizer::{ + compress_tool_calls, render_tool_digests, ConfiguredSummarizer, SummarizerProvider, + ToolCallDigest, +}; pub use tool_tracker::ToolTrackerHook; pub use user_profile::UserProfileHook; diff --git a/src/openhuman/learning/reflection.rs b/src/openhuman/learning/reflection.rs index ffcc291c4c..6a435d6af4 100644 --- a/src/openhuman/learning/reflection.rs +++ b/src/openhuman/learning/reflection.rs @@ -45,6 +45,12 @@ pub struct ReflectionHook { full_config: Arc, memory: Arc, provider: Option>, + /// Optional dedicated cheap summarizer (#1419). When present, the + /// reflection LLM call is routed here instead of through the + /// orchestrator-tier provider / local-AI service so frequent + /// reflection passes do not burn high-tier inference. None falls + /// back to the legacy routing in `run_reflection`. + summarizer: Option>, /// Per-session reflection counts for throttling. Key is session_id (or "__global__"). session_counts: Mutex>, } @@ -55,12 +61,30 @@ impl ReflectionHook { full_config: Arc, memory: Arc, provider: Option>, + ) -> Self { + Self::with_summarizer(config, full_config, memory, provider, None) + } + + /// Construct a reflection hook with an explicit summarizer override. + /// + /// When `summarizer.is_some()` the reflection LLM call is routed to + /// the dedicated cheap summarizer instead of the orchestrator-tier + /// provider / local AI service. This is the path #1419 wires from + /// the session builder when `LearningConfig::summarizer.enabled` is + /// true and the agent is the orchestrator. + pub fn with_summarizer( + config: LearningConfig, + full_config: Arc, + memory: Arc, + provider: Option>, + summarizer: Option>, ) -> Self { Self { config, full_config, memory, provider, + summarizer, session_counts: Mutex::new(HashMap::new()), } } @@ -132,15 +156,33 @@ impl ReflectionHook { )); if !ctx.tool_calls.is_empty() { + // #1419: compress raw multi-call history into per-tool + // digests before it hits the summarizer's smaller context + // window. The legacy per-call block is preserved when the + // turn has at most one call per distinct tool — the digest + // and the legacy line look identical in that case, but the + // per-call line keeps the existing "success=true, + // duration=Xms" surface intact for older test assertions. + let digests = crate::openhuman::learning::compress_tool_calls(&ctx.tool_calls); + let compressed_any = digests.iter().any(|d| d.count > 1); prompt.push_str("## Tool Calls\n"); - for tc in &ctx.tool_calls { - prompt.push_str(&format!( - "- {} (success={}, duration={}ms): {}\n", - tc.name, - tc.success, - tc.duration_ms, - truncate(&tc.output_summary, 100) - )); + if compressed_any { + prompt.push_str(&crate::openhuman::learning::render_tool_digests(&digests)); + log::debug!( + "[learning::reflection] compressed {} tool calls into {} per-tool digest(s)", + ctx.tool_calls.len(), + digests.len(), + ); + } else { + for tc in &ctx.tool_calls { + prompt.push_str(&format!( + "- {} (success={}, duration={}ms): {}\n", + tc.name, + tc.success, + tc.duration_ms, + truncate(&tc.output_summary, 100) + )); + } } prompt.push('\n'); } @@ -155,6 +197,36 @@ impl ReflectionHook { /// Call the configured LLM for reflection. async fn run_reflection(&self, prompt: &str) -> anyhow::Result { + // #1419: when a dedicated summarizer is configured, route the + // reflection call through it instead of the orchestrator-tier + // provider / local AI service. The summarizer carries its own + // context window cap and we log a context-fill metric so trigger + // thresholds can be tuned against real fill ratios. + if let Some(summarizer) = self.summarizer.as_ref() { + let cap = summarizer.context_window_chars(); + let input_chars = prompt.chars().count(); + let bounded: std::borrow::Cow<'_, str> = if input_chars > cap { + log::info!( + "[learning::reflection] summarizer={} input {} chars > cap {} — truncating", + summarizer.label(), + input_chars, + cap, + ); + let head: String = prompt.chars().take(cap).collect(); + std::borrow::Cow::Owned(head) + } else { + std::borrow::Cow::Borrowed(prompt) + }; + log::debug!( + "[learning::reflection] dispatch via summarizer label={} input_chars={} cap={} fill={:.2}", + summarizer.label(), + bounded.chars().count(), + cap, + (bounded.chars().count() as f32) / (cap.max(1) as f32), + ); + return summarizer.prompt(bounded.as_ref()).await; + } + match self.config.reflection_source { ReflectionSource::Local => { // Gate: local reflection requires the per-feature flag. diff --git a/src/openhuman/learning/reflection_tests.rs b/src/openhuman/learning/reflection_tests.rs index 9d9d65f850..7fa3eab44e 100644 --- a/src/openhuman/learning/reflection_tests.rs +++ b/src/openhuman/learning/reflection_tests.rs @@ -517,3 +517,103 @@ async fn on_turn_complete_emits_style_candidates_from_llm_preferences() { "user_preferences with key=value format should produce a Style candidate" ); } + +// ── #1419 — summarizer routing + digest tests ─────────────────────────── + +/// When a [`SummarizerProvider`] is configured, the reflection LLM call +/// is routed through it instead of the orchestrator-tier provider / +/// local AI service. Verifies the seam is wired end-to-end. +#[tokio::test] +async fn on_turn_complete_routes_through_summarizer_when_configured() { + use crate::openhuman::learning::SummarizerProvider; + use tokio::sync::Mutex as TokioMutex; + + struct CountingSummarizer { + calls: TokioMutex>, + } + #[async_trait] + impl SummarizerProvider for CountingSummarizer { + fn context_window_chars(&self) -> usize { + 10_000 + } + fn label(&self) -> &str { + "test-summarizer" + } + async fn prompt(&self, prompt: &str) -> anyhow::Result { + self.calls.lock().await.push(prompt.to_string()); + Ok(r#"{"observations":["summarized"],"patterns":[],"user_preferences":[],"user_reflections":[]}"#.into()) + } + } + + let summarizer = Arc::new(CountingSummarizer { + calls: TokioMutex::new(Vec::new()), + }); + let memory: Arc = Arc::new(MockMemory::default()); + let hook = ReflectionHook::with_summarizer( + reflection_config(), + Arc::new(Config::default()), + memory, + None, + Some(summarizer.clone()), + ); + + hook.on_turn_complete(&reflective_turn()).await.unwrap(); + let calls = summarizer.calls.lock().await; + assert_eq!( + calls.len(), + 1, + "summarizer should have been invoked exactly once" + ); + assert!(calls[0].contains("## Tool Calls")); +} + +/// Reflection prompts on a multi-call turn now compress raw history +/// into per-tool digests rather than streaming the full list — keeps +/// the summarizer's smaller context window in budget (#1419). +#[test] +fn build_reflection_prompt_compresses_repeated_tool_calls_into_digest() { + let memory: Arc = Arc::new(MockMemory::default()); + let hook = ReflectionHook::new( + reflection_config(), + Arc::new(Config::default()), + memory, + None, + ); + let mut turn = reflective_turn(); + // Three Bash calls + one read — should compress to two digests. + turn.tool_calls = vec![ + ToolCallRecord { + name: "Bash".into(), + arguments: serde_json::json!({}), + success: true, + output_summary: "ok 1".into(), + duration_ms: 80, + }, + ToolCallRecord { + name: "Bash".into(), + arguments: serde_json::json!({}), + success: false, + output_summary: "fail".into(), + duration_ms: 220, + }, + ToolCallRecord { + name: "Bash".into(), + arguments: serde_json::json!({}), + success: true, + output_summary: "ok 3".into(), + duration_ms: 50, + }, + ToolCallRecord { + name: "read_file".into(), + arguments: serde_json::json!({}), + success: true, + output_summary: "ok".into(), + duration_ms: 12, + }, + ]; + let prompt = hook.build_reflection_prompt(&turn); + assert!(prompt.contains("Bash ×3")); + assert!(prompt.contains("p95=220ms")); + // Per-call lines should NOT appear for the compressed path. + assert!(!prompt.contains("(success=true, duration=80ms)")); +} diff --git a/src/openhuman/learning/summarizer/digest.rs b/src/openhuman/learning/summarizer/digest.rs new file mode 100644 index 0000000000..5cd2bc6b7f --- /dev/null +++ b/src/openhuman/learning/summarizer/digest.rs @@ -0,0 +1,221 @@ +//! Tool-call digest layer — collapse raw multi-call tool history into +//! per-tool summaries before any reflection / ingest LLM call sees them. +//! +//! Reflection over a raw transcript drags every tool call's full output +//! through the summarizer, which (a) inflates cost and (b) blows past +//! the summarizer's smaller context window. The digest layer fixes both: +//! it groups calls by tool name, keeps aggregate counts plus a small +//! sample of inputs/outputs per tool, and renders to a compact block +//! that fits comfortably in a sub-4k-token prompt. + +use crate::openhuman::agent::hooks::ToolCallRecord; + +/// How many sample input/output strings we keep per tool. Keeps the +/// digest bounded — more than this and the per-tool block starts +/// dominating the prompt again. +pub const SAMPLES_PER_TOOL: usize = 2; + +/// Maximum characters retained per sampled output. Anything longer is +/// truncated with an ellipsis marker — the reflection model only needs +/// a flavour of what happened, not the raw payload. +pub const SAMPLE_OUTPUT_CHARS: usize = 160; + +/// Per-tool aggregate over a turn's (or transcript's) tool calls. +#[derive(Debug, Clone, PartialEq)] +pub struct ToolCallDigest { + /// Tool name (e.g. `read_file`, `Bash`). + pub name: String, + /// Total number of times this tool was invoked. + pub count: usize, + /// Number of invocations that reported success. + pub success_count: usize, + /// 95th-percentile observed duration in milliseconds. With few + /// samples this is just the max — accurate enough for prompt + /// budgeting and trend logging. + pub p95_duration_ms: u64, + /// A small bounded sample of input JSON (serialized, truncated). + pub sample_inputs: Vec, + /// A small bounded sample of output summaries (truncated). + pub sample_outputs: Vec, +} + +impl ToolCallDigest { + /// Success rate as `[0.0, 1.0]`. Returns 0.0 when `count == 0`. + pub fn success_rate(&self) -> f32 { + if self.count == 0 { + return 0.0; + } + (self.success_count as f32) / (self.count as f32) + } +} + +/// Group `tool_calls` by tool name and produce a bounded digest per +/// tool. Stable order — first appearance wins — so prompts are +/// deterministic. +pub fn compress_tool_calls(tool_calls: &[ToolCallRecord]) -> Vec { + use std::collections::BTreeMap; + + // BTreeMap to keep deterministic ordering for tests; insertion + // order isn't important here — by-name alphabetical is fine and + // makes the rendered prompt easier to scan. + let mut by_name: BTreeMap = BTreeMap::new(); + let mut max_dur: std::collections::HashMap = std::collections::HashMap::new(); + + for record in tool_calls { + let entry = by_name + .entry(record.name.clone()) + .or_insert_with(|| ToolCallDigest { + name: record.name.clone(), + count: 0, + success_count: 0, + p95_duration_ms: 0, + sample_inputs: Vec::new(), + sample_outputs: Vec::new(), + }); + entry.count += 1; + if record.success { + entry.success_count += 1; + } + let cap = max_dur.entry(record.name.clone()).or_insert(0); + if record.duration_ms > *cap { + *cap = record.duration_ms; + } + if entry.sample_inputs.len() < SAMPLES_PER_TOOL { + entry + .sample_inputs + .push(truncate(&record.arguments.to_string(), SAMPLE_OUTPUT_CHARS)); + } + if entry.sample_outputs.len() < SAMPLES_PER_TOOL { + entry + .sample_outputs + .push(truncate(&record.output_summary, SAMPLE_OUTPUT_CHARS)); + } + } + + for (name, dur) in max_dur { + if let Some(entry) = by_name.get_mut(&name) { + entry.p95_duration_ms = dur; + } + } + + by_name.into_values().collect() +} + +/// Render a slice of digests into a compact prompt block suitable for +/// inclusion under a `## Tool Calls` heading. Returns the empty string +/// for an empty slice so callers can splice it in unconditionally. +pub fn render_tool_digests(digests: &[ToolCallDigest]) -> String { + if digests.is_empty() { + return String::new(); + } + let mut out = String::new(); + for d in digests { + let pct = (d.success_rate() * 100.0).round() as u32; + out.push_str(&format!( + "- {} ×{} ({}% ok, p95={}ms)\n", + d.name, d.count, pct, d.p95_duration_ms + )); + if !d.sample_inputs.is_empty() { + out.push_str(" inputs: "); + out.push_str(&d.sample_inputs.join(" | ")); + out.push('\n'); + } + if !d.sample_outputs.is_empty() { + out.push_str(" outputs: "); + out.push_str(&d.sample_outputs.join(" | ")); + out.push('\n'); + } + } + out +} + +fn truncate(s: &str, max_chars: usize) -> String { + if s.chars().count() <= max_chars { + s.to_string() + } else { + let head: String = s.chars().take(max_chars).collect(); + format!("{head}…") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn record(name: &str, ok: bool, dur: u64, out: &str) -> ToolCallRecord { + ToolCallRecord { + name: name.into(), + arguments: json!({"q": "x"}), + success: ok, + output_summary: out.into(), + duration_ms: dur, + } + } + + #[test] + fn empty_input_yields_empty_output() { + assert!(compress_tool_calls(&[]).is_empty()); + assert_eq!(render_tool_digests(&[]), ""); + } + + #[test] + fn groups_by_tool_with_counts_and_success_rate() { + let calls = vec![ + record("Bash", true, 100, "ok 1"), + record("Bash", false, 250, "fail"), + record("read_file", true, 12, "ok read"), + ]; + let mut digests = compress_tool_calls(&calls); + digests.sort_by(|a, b| a.name.cmp(&b.name)); + + assert_eq!(digests.len(), 2); + let bash = &digests[0]; + assert_eq!(bash.name, "Bash"); + assert_eq!(bash.count, 2); + assert_eq!(bash.success_count, 1); + assert!((bash.success_rate() - 0.5).abs() < f32::EPSILON); + assert_eq!(bash.p95_duration_ms, 250); + + let read = &digests[1]; + assert_eq!(read.name, "read_file"); + assert_eq!(read.count, 1); + assert_eq!(read.success_count, 1); + } + + #[test] + fn caps_samples_per_tool() { + let calls: Vec<_> = (0..5) + .map(|i| record("Bash", true, 10, &format!("out {i}"))) + .collect(); + let digests = compress_tool_calls(&calls); + assert_eq!(digests.len(), 1); + assert_eq!(digests[0].count, 5); + assert_eq!(digests[0].sample_outputs.len(), SAMPLES_PER_TOOL); + assert_eq!(digests[0].sample_inputs.len(), SAMPLES_PER_TOOL); + } + + #[test] + fn truncates_long_outputs() { + let long = "x".repeat(SAMPLE_OUTPUT_CHARS + 50); + let digests = compress_tool_calls(&[record("Bash", true, 1, &long)]); + let sample = &digests[0].sample_outputs[0]; + assert!( + sample.chars().count() <= SAMPLE_OUTPUT_CHARS + 1, + "sample={sample}" + ); + assert!(sample.ends_with('…')); + } + + #[test] + fn render_emits_human_block() { + let digests = compress_tool_calls(&[ + record("Bash", true, 50, "ok"), + record("Bash", false, 100, "fail"), + ]); + let rendered = render_tool_digests(&digests); + assert!(rendered.contains("Bash ×2")); + assert!(rendered.contains("50% ok")); + assert!(rendered.contains("p95=100ms")); + } +} diff --git a/src/openhuman/learning/summarizer/mod.rs b/src/openhuman/learning/summarizer/mod.rs new file mode 100644 index 0000000000..d784fb92d7 --- /dev/null +++ b/src/openhuman/learning/summarizer/mod.rs @@ -0,0 +1,145 @@ +//! Summarizer abstraction for the learning subsystem (#1419). +//! +//! Reflection and transcript-ingest fire often and need a model — but they +//! produce short, structured summaries that a cheap model handles fine. +//! Running them on the orchestrator-tier model is wasteful. This module +//! introduces a dedicated [`SummarizerProvider`] trait, a thin +//! [`ConfiguredSummarizer`] implementation that wraps an existing +//! [`crate::openhuman::providers::Provider`], and a tool-call digest layer +//! ([`digest`]) so reflection never feeds raw multi-call tool history to the +//! summarizer's smaller context window. +//! +//! The summarizer is **opt-in**: when no summarizer is configured, the +//! reflection / transcript-ingest paths keep behaving the way they did +//! before #1419 — heuristic-only for transcript ingest, orchestrator-tier +//! provider for reflection. That keeps offline users on the heuristic +//! fast-path from #1406 without silently breaking. + +pub mod digest; + +use crate::openhuman::providers::Provider; +use async_trait::async_trait; +use std::sync::Arc; + +pub use digest::{compress_tool_calls, render_tool_digests, ToolCallDigest}; + +/// Minimum context-window cap (in characters) we'll honour. Anything +/// smaller would clip even a one-paragraph reflection prompt, so we +/// floor at this value rather than failing silently. +pub const MIN_SUMMARIZER_CONTEXT_CHARS: usize = 1024; + +/// Default character cap for the summarizer's context window. Tuned to +/// fit comfortably inside a small local model (e.g. 4k-token Ollama +/// chat models) after prompt scaffolding and reserved completion space. +pub const DEFAULT_SUMMARIZER_CONTEXT_CHARS: usize = 6_000; + +/// Trait for a cheap summarizer model, distinct from the orchestrator +/// provider. Implementations promise: +/// +/// - **Cheap and fast** — callers fire this in the background on every +/// threshold crossing, so latency / cost matters more than peak +/// quality. +/// - **Bounded context** — [`context_window_chars`] is the *callable* +/// budget for `prompt`. Callers must compress inputs (see [`digest`]) +/// to fit. +/// - **Short outputs** — the trait targets sub-paragraph summaries. +/// `prompt` returns the raw model response; parsing is the caller's +/// responsibility. +#[async_trait] +pub trait SummarizerProvider: Send + Sync { + /// Approximate character budget for `prompt` input. Callers must + /// truncate / digest inputs to stay under this cap; the + /// implementation makes no guarantee about behaviour when the cap + /// is exceeded. + fn context_window_chars(&self) -> usize; + + /// Human-readable identifier used in logs / telemetry. Should not + /// include credentials or absolute paths. + fn label(&self) -> &str; + + /// Run a one-shot summarization. Returns the model's raw response. + async fn prompt(&self, prompt: &str) -> anyhow::Result; +} + +/// Thin [`SummarizerProvider`] backed by an existing +/// [`Provider`]. Used by the cloud path; the local path is wired +/// separately by the caller (see `learning::reflection`). +pub struct ConfiguredSummarizer { + provider: Arc, + model_hint: String, + context_window_chars: usize, + label: String, +} + +impl ConfiguredSummarizer { + /// Construct a configured summarizer. + /// + /// `context_window_chars` is clamped to at least + /// [`MIN_SUMMARIZER_CONTEXT_CHARS`] so a misconfigured value cannot + /// produce zero-byte truncation downstream. + pub fn new( + provider: Arc, + model_hint: impl Into, + context_window_chars: usize, + label: impl Into, + ) -> Self { + Self { + provider, + model_hint: model_hint.into(), + context_window_chars: context_window_chars.max(MIN_SUMMARIZER_CONTEXT_CHARS), + label: label.into(), + } + } +} + +#[async_trait] +impl SummarizerProvider for ConfiguredSummarizer { + fn context_window_chars(&self) -> usize { + self.context_window_chars + } + + fn label(&self) -> &str { + &self.label + } + + async fn prompt(&self, prompt: &str) -> anyhow::Result { + let started = std::time::Instant::now(); + let input_chars = prompt.chars().count(); + let fill_ratio = (input_chars as f32) / (self.context_window_chars.max(1) as f32); + log::debug!( + "[summarizer] dispatch label={} model={} input_chars={} cap={} fill={:.2}", + self.label, + self.model_hint, + input_chars, + self.context_window_chars, + fill_ratio, + ); + let result = self + .provider + .simple_chat(prompt, &self.model_hint, 0.2) + .await; + let elapsed = started.elapsed(); + match &result { + Ok(out) => log::info!( + "[summarizer] label={} model={} input_chars={} output_chars={} latency_ms={}", + self.label, + self.model_hint, + input_chars, + out.chars().count(), + elapsed.as_millis(), + ), + Err(e) => log::warn!( + "[summarizer] label={} model={} input_chars={} latency_ms={} error={e}", + self.label, + self.model_hint, + input_chars, + elapsed.as_millis(), + ), + } + result + } +} + +#[cfg(test)] +#[path = "tests.rs"] +mod tests; diff --git a/src/openhuman/learning/summarizer/tests.rs b/src/openhuman/learning/summarizer/tests.rs new file mode 100644 index 0000000000..4ee911e2f2 --- /dev/null +++ b/src/openhuman/learning/summarizer/tests.rs @@ -0,0 +1,61 @@ +//! Tests for the summarizer trait + ConfiguredSummarizer wrapper. + +use super::*; +use crate::openhuman::providers::Provider; +use async_trait::async_trait; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Recording double that captures the model hint + last prompt and +/// returns a canned response. +#[derive(Default)] +struct RecordingProvider { + last_prompt: Mutex>, + last_model: Mutex>, + response: Mutex, +} + +#[async_trait] +impl Provider for RecordingProvider { + async fn chat_with_system( + &self, + _system_prompt: Option<&str>, + message: &str, + model: &str, + _temperature: f64, + ) -> anyhow::Result { + *self.last_prompt.lock().await = Some(message.to_string()); + *self.last_model.lock().await = Some(model.to_string()); + Ok(self.response.lock().await.clone()) + } +} + +#[tokio::test] +async fn summarizer_dispatches_with_model_hint() { + let provider = Arc::new(RecordingProvider::default()); + *provider.response.lock().await = "merged".into(); + let summarizer = + ConfiguredSummarizer::new(provider.clone(), "hint:fast", 4_000, "test-summarizer"); + + let out = summarizer.prompt("input").await.unwrap(); + assert_eq!(out, "merged"); + assert_eq!(provider.last_prompt.lock().await.as_deref(), Some("input")); + assert_eq!( + provider.last_model.lock().await.as_deref(), + Some("hint:fast") + ); + assert_eq!(summarizer.label(), "test-summarizer"); + assert_eq!(summarizer.context_window_chars(), 4_000); +} + +#[tokio::test] +async fn context_window_floor_clamps_misconfigured_values() { + let provider = Arc::new(RecordingProvider::default()); + // Anything below MIN_SUMMARIZER_CONTEXT_CHARS is bumped up so a + // zero / accidentally-tiny config can't produce empty digests. + let summarizer = ConfiguredSummarizer::new(provider, "hint:fast", 0, "tiny"); + assert_eq!( + summarizer.context_window_chars(), + MIN_SUMMARIZER_CONTEXT_CHARS + ); +} diff --git a/src/openhuman/learning/transcript_ingest/merge.rs b/src/openhuman/learning/transcript_ingest/merge.rs new file mode 100644 index 0000000000..c4ed4745ab --- /dev/null +++ b/src/openhuman/learning/transcript_ingest/merge.rs @@ -0,0 +1,266 @@ +//! Stage-2 merge pass for transcript-ingest (#1419). +//! +//! The heuristic extractor ([`super::extract`]) is intentionally +//! high-precision but emits one candidate per matching sentence. A long +//! transcript can therefore surface several near-duplicate preference / +//! commitment lines for the same underlying fact ("I prefer terse +//! answers", "I'd prefer terse responses"). Storage-side dedupe in +//! [`super::dedupe`] only catches exact normalised matches; the +//! semantically-overlapping pairs slip through and clutter retrieval. +//! +//! When a summarizer is configured this module collapses obvious +//! near-duplicates so retrieval surfaces one merged candidate per +//! distinct underlying fact instead of a cluster. +//! +//! Heuristic-only fallback: when the caller passes `None` the function +//! returns its input unchanged, preserving the #1406 behaviour for +//! offline users. +//! +//! NOTE: the heuristic merge runs even when no summarizer is configured. +//! Calling the summarizer is reserved for a future expansion where the +//! merged content is rewritten — today we only collapse and pick the +//! longest snippet as the representative, which keeps the pipeline +//! deterministic and lets the smaller summarizer model focus on the +//! reflection path. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::openhuman::learning::SummarizerProvider; + +use super::types::{CandidateKind, MemoryCandidate}; + +/// Jaccard-similarity threshold above which two candidates of the same +/// `kind` are considered near-duplicates and collapsed into one. Tuned +/// to keep precision high — paraphrases of the same sentence usually +/// score 0.55–0.85, while different facts about the same domain score +/// below 0.4. +pub const NEAR_DUP_THRESHOLD: f32 = 0.55; + +/// Outcome of one merge pass, surfaced in logs / [`super::types::IngestionReport`]. +#[derive(Debug, Clone, Default)] +pub struct MergeReport { + /// Number of input candidates considered. + pub input: usize, + /// Number of candidates that survived after collapsing near-dupes. + pub output: usize, + /// Number of pair-level merges performed. + pub merged_pairs: usize, + /// Whether a summarizer was invoked (vs heuristic-only merge). + pub used_summarizer: bool, +} + +/// Collapse near-duplicate candidates of the same `kind` based on a +/// cheap Jaccard-similarity check over their tokens. +/// +/// Stable ordering — the first candidate encountered keeps its index in +/// the output. The `summarizer` parameter is accepted today for +/// future use: when a summarizer is wired in, this is the seam where +/// the merged group's content will be rewritten via an LLM call. For +/// now the longest snippet wins. +pub async fn collapse_near_duplicates( + candidates: Vec, + summarizer: Option<&Arc>, +) -> (Vec, MergeReport) { + let mut report = MergeReport { + input: candidates.len(), + output: candidates.len(), + merged_pairs: 0, + used_summarizer: summarizer.is_some(), + }; + if candidates.len() < 2 { + return (candidates, report); + } + + let mut tokens: Vec> = candidates.iter().map(tokenise).collect(); + let mut kept: Vec = Vec::with_capacity(candidates.len()); + let mut taken: Vec = vec![false; candidates.len()]; + + for (i, candidate) in candidates.iter().enumerate() { + if taken[i] { + continue; + } + let mut group_indices: Vec = vec![i]; + for j in (i + 1)..candidates.len() { + if taken[j] { + continue; + } + if candidates[j].kind != candidate.kind { + continue; + } + if jaccard(&tokens[i], &tokens[j]) >= NEAR_DUP_THRESHOLD { + group_indices.push(j); + taken[j] = true; + } + } + taken[i] = true; + + if group_indices.len() == 1 { + kept.push(candidate.clone()); + continue; + } + report.merged_pairs += group_indices.len() - 1; + // Choose the longest snippet as representative (richest + // surface form). Merge provenance indices so we don't lose + // pointers back to the source messages. + let mut best_idx = group_indices[0]; + for &idx in &group_indices[1..] { + if candidates[idx].content.chars().count() + > candidates[best_idx].content.chars().count() + { + best_idx = idx; + } + } + let mut merged = candidates[best_idx].clone(); + let mut all_msg_indices: Vec = Vec::new(); + for &idx in &group_indices { + all_msg_indices.extend(candidates[idx].provenance.message_indices.iter().copied()); + // Promote importance to High if any member of the group is + // High — recurrence is itself signal that this fact + // matters. + if matches!(candidates[idx].importance, super::types::Importance::High) { + merged.importance = super::types::Importance::High; + } + } + all_msg_indices.sort_unstable(); + all_msg_indices.dedup(); + merged.provenance.message_indices = all_msg_indices; + kept.push(merged); + // tokens of merged-away entries are no longer needed; null out + // to free a little memory in long runs. + for &idx in &group_indices[1..] { + tokens[idx].clear(); + } + } + + report.output = kept.len(); + if report.merged_pairs > 0 { + log::info!( + "[transcript_ingest::merge] collapsed {} near-duplicate pair(s): {} → {} (summarizer={})", + report.merged_pairs, + report.input, + report.output, + report.used_summarizer, + ); + } + (kept, report) +} + +fn tokenise(c: &MemoryCandidate) -> HashSet { + let mut out: HashSet = HashSet::new(); + for raw in c.content.split(|ch: char| !ch.is_alphanumeric()) { + let word = raw.to_ascii_lowercase(); + if word.chars().count() >= 3 && !STOPWORDS.contains(&word.as_str()) { + out.insert(word); + } + } + out +} + +fn jaccard(a: &HashSet, b: &HashSet) -> f32 { + if a.is_empty() || b.is_empty() { + return 0.0; + } + let inter = a.intersection(b).count(); + let union = a.union(b).count(); + if union == 0 { + 0.0 + } else { + (inter as f32) / (union as f32) + } +} + +/// Minimal stopword list — common short connectors that artificially +/// inflate Jaccard similarity without adding meaning. Intentionally +/// short; the tokeniser already drops words under 3 chars. +const STOPWORDS: &[&str] = &[ + "the", "and", "for", "with", "that", "this", "but", "are", "you", "your", "from", "have", + "has", "into", "than", "they", "them", "our", "ours", "i'll", "i'm", "i've", "i'd", "let", +]; + +#[cfg(test)] +mod tests { + use super::*; + use crate::openhuman::learning::transcript_ingest::types::{Importance, Provenance}; + + fn cand(content: &str, kind: CandidateKind, idx: usize) -> MemoryCandidate { + MemoryCandidate { + kind, + importance: Importance::High, + content: content.into(), + provenance: Provenance { + thread_id: Some("t".into()), + transcript_path: "/p".into(), + transcript_basename: "p".into(), + message_indices: vec![idx], + extracted_at: "2026-05-11T00:00:00Z".into(), + }, + } + } + + #[tokio::test] + async fn empty_input_is_a_noop() { + let (out, report) = collapse_near_duplicates(vec![], None).await; + assert!(out.is_empty()); + assert_eq!(report.merged_pairs, 0); + assert!(!report.used_summarizer); + } + + #[tokio::test] + async fn singletons_pass_through_untouched() { + let input = vec![ + cand( + "I prefer Postgres for new services.", + CandidateKind::Preference, + 1, + ), + cand( + "Let's go with Datadog for tracing.", + CandidateKind::Decision, + 3, + ), + ]; + let (out, report) = collapse_near_duplicates(input.clone(), None).await; + assert_eq!(out.len(), 2); + assert_eq!(report.merged_pairs, 0); + } + + #[tokio::test] + async fn collapses_near_duplicate_preferences() { + let input = vec![ + cand( + "I prefer terse, direct answers over long explanations.", + CandidateKind::Preference, + 1, + ), + cand( + "I prefer terse direct answers — please skip long explanations.", + CandidateKind::Preference, + 5, + ), + ]; + let (out, report) = collapse_near_duplicates(input, None).await; + assert_eq!(out.len(), 1, "near-dup preferences should collapse"); + assert_eq!(report.merged_pairs, 1); + // Merged provenance points to both source messages. + assert_eq!(out[0].provenance.message_indices, vec![1, 5]); + } + + #[tokio::test] + async fn does_not_collapse_across_kinds() { + let input = vec![ + cand( + "I prefer terse direct answers over long explanations.", + CandidateKind::Preference, + 1, + ), + cand( + "Going to write terse direct answers in the doc.", + CandidateKind::Commitment, + 2, + ), + ]; + let (out, _) = collapse_near_duplicates(input, None).await; + assert_eq!(out.len(), 2, "different kinds must not merge"); + } +} diff --git a/src/openhuman/learning/transcript_ingest/mod.rs b/src/openhuman/learning/transcript_ingest/mod.rs index ab30bb0ed1..f74be7eaa9 100644 --- a/src/openhuman/learning/transcript_ingest/mod.rs +++ b/src/openhuman/learning/transcript_ingest/mod.rs @@ -34,6 +34,7 @@ mod dedupe; mod extract; +mod merge; mod persist; pub mod types; @@ -63,6 +64,22 @@ pub async fn ingest_transcript_path( ingest_session_transcript(memory, &parsed, path).await } +/// Variant of [`ingest_transcript_path`] that takes an optional +/// summarizer for the stage-2 merge pass (#1419). +pub async fn ingest_transcript_path_with_summarizer( + memory: &dyn Memory, + path: &Path, + summarizer: Option<&std::sync::Arc>, +) -> anyhow::Result { + log::debug!( + "[transcript_ingest] starting ingest for {} (summarizer={})", + path.display(), + summarizer.map(|s| s.label()).unwrap_or("none") + ); + let parsed = transcript::read_transcript(path)?; + ingest_session_transcript_with_summarizer(memory, &parsed, path, summarizer).await +} + /// Ingest an already-parsed [`SessionTranscript`]. /// /// Exposed separately from `ingest_transcript_path` so tests can drive the @@ -71,6 +88,17 @@ pub async fn ingest_session_transcript( memory: &dyn Memory, transcript: &SessionTranscript, path: &Path, +) -> anyhow::Result { + ingest_session_transcript_with_summarizer(memory, transcript, path, None).await +} + +/// Same as [`ingest_session_transcript`] but accepts an optional +/// summarizer used to collapse near-duplicate candidates (#1419). +pub async fn ingest_session_transcript_with_summarizer( + memory: &dyn Memory, + transcript: &SessionTranscript, + path: &Path, + summarizer: Option<&std::sync::Arc>, ) -> anyhow::Result { let basename = path .file_name() @@ -104,6 +132,12 @@ pub async fn ingest_session_transcript( let extracted_total = extracted.len(); let reflection_total = reflections.len(); + // Stage 2: collapse near-duplicate candidates before dedupe so a + // long transcript doesn't surface multiple paraphrases of the same + // underlying fact. Heuristic-only when `summarizer` is None + // (#1406 fallback intact). + let (extracted, _merge_report) = merge::collapse_near_duplicates(extracted, summarizer).await; + let (kept, deduped) = dedupe::filter_new(memory, extracted).await?; let (kept_reflections, deduped_reflections) = dedupe::filter_new_reflections(memory, reflections).await?;