From 46a45bcc22685ec37f7a823296353bc1d04ef674 Mon Sep 17 00:00:00 2001 From: David Stern Date: Wed, 10 Jun 2026 23:03:37 -0400 Subject: [PATCH] Batch AI events during conversation restoration. --- .../ai/blocklist/controller/shared_session.rs | 196 ++++++++++++++++-- .../shared_session/viewer/event_loop.rs | 23 +- crates/warp_features/src/lib.rs | 8 + crates/warpui_core/src/core/model/context.rs | 49 +++++ 4 files changed, 262 insertions(+), 14 deletions(-) diff --git a/app/src/ai/blocklist/controller/shared_session.rs b/app/src/ai/blocklist/controller/shared_session.rs index 8575651bb8..00f8e4821b 100644 --- a/app/src/ai/blocklist/controller/shared_session.rs +++ b/app/src/ai/blocklist/controller/shared_session.rs @@ -9,17 +9,17 @@ use warp_core::features::FeatureFlag; use warp_multi_agent_api::client_action::Action; use warp_multi_agent_api::message::Message; use warp_multi_agent_api::response_event::{stream_finished, ClientActions}; -use warpui::{AppContext, ModelContext, SingletonEntity}; +use warpui::{AppContext, EntityId, ModelContext, SingletonEntity}; use super::response_stream::ResponseStreamId; use super::{BlocklistAIController, RequestInput, SessionContext}; use crate::ai::agent::conversation::{AIConversationId, ConversationStatus}; -use crate::ai::agent::{AIAgentActionId, AIAgentAttachment, EntrypointType}; +use crate::ai::agent::{AIAgentActionId, AIAgentAttachment, AIAgentExchangeId, EntrypointType}; use crate::ai::attachment_utils::{ build_file_attachment_map, download_file, sanitize_filename, DownloadedAttachment, }; use crate::ai::blocklist::agent_view::AgentViewEntryOrigin; -use crate::ai::blocklist::history_model::BlocklistAIHistoryModel; +use crate::ai::blocklist::history_model::{BlocklistAIHistoryEvent, BlocklistAIHistoryModel}; use crate::server::server_api::ServerApiProvider; use crate::terminal::model::block::BlockId; @@ -38,6 +38,86 @@ pub(super) struct SharedSessionState { current_response_initiator: Option, // The sharer's participant ID (set when session sharing starts) sharer_participant_id: Option, + /// `true` while the viewer's event loop is catching up on the backlog of ordered events that + /// existed when the session was joined. + is_catching_up_on_event_backlog: bool, + /// `true` while the sharer is replaying an existing agent conversation (bracketed by the + /// `AgentConversationReplayStarted`/`AgentConversationReplayEnded` ordered events). + is_receiving_conversation_replay: bool, + /// History-model events captured while coalescing is active. High-volume events are deduped + /// on insert (see [`coalescing_key`]); the buffer is flushed as a single batch once no + /// coalescing window remains active. + coalesced_history_events: Vec, +} + +/// Identity of a buffered history event used for deduplication while coalescing. A new event +/// that maps to the same key as an earlier buffered event supersedes it: the stale entry is +/// removed and the new one is appended, so the latest payload wins in both content and +/// ordering. Events with no key are always appended and delivered in emission order. +#[derive(PartialEq, Eq)] +enum CoalescedHistoryEventKey { + UpdatedStreamingExchange { exchange_id: AIAgentExchangeId }, + UpdatedTodoList { terminal_view_id: EntityId }, + UpdatedConversationStatus { conversation_id: AIConversationId }, + UpdatedConversationMetadata { conversation_id: AIConversationId }, + ConversationUsageMetadataUpdated { conversation_id: AIConversationId }, + SetActiveConversation { terminal_view_id: EntityId }, +} + +/// Returns the deduplication key for a history event, or `None` for events that should never +/// be coalesced. Only events whose subscribers re-read the latest model state (rather than +/// depending on each intermediate payload) are safe to dedupe. +fn coalescing_key(event: &BlocklistAIHistoryEvent) -> Option { + match event { + BlocklistAIHistoryEvent::UpdatedStreamingExchange { exchange_id, .. } => { + Some(CoalescedHistoryEventKey::UpdatedStreamingExchange { + exchange_id: *exchange_id, + }) + } + BlocklistAIHistoryEvent::UpdatedTodoList { terminal_view_id } => { + Some(CoalescedHistoryEventKey::UpdatedTodoList { + terminal_view_id: *terminal_view_id, + }) + } + BlocklistAIHistoryEvent::UpdatedConversationStatus { + conversation_id, .. + } => Some(CoalescedHistoryEventKey::UpdatedConversationStatus { + conversation_id: *conversation_id, + }), + BlocklistAIHistoryEvent::UpdatedConversationMetadata { + conversation_id, .. + } => Some(CoalescedHistoryEventKey::UpdatedConversationMetadata { + conversation_id: *conversation_id, + }), + BlocklistAIHistoryEvent::ConversationUsageMetadataUpdated { conversation_id } => { + Some(CoalescedHistoryEventKey::ConversationUsageMetadataUpdated { + conversation_id: *conversation_id, + }) + } + BlocklistAIHistoryEvent::SetActiveConversation { + terminal_view_id, .. + } => Some(CoalescedHistoryEventKey::SetActiveConversation { + terminal_view_id: *terminal_view_id, + }), + BlocklistAIHistoryEvent::StartedNewConversation { .. } + | BlocklistAIHistoryEvent::CreatedSubtask { .. } + | BlocklistAIHistoryEvent::UpgradedTask { .. } + | BlocklistAIHistoryEvent::AppendedExchange { .. } + | BlocklistAIHistoryEvent::ReassignedExchange { .. } + | BlocklistAIHistoryEvent::ClearedActiveConversation { .. } + | BlocklistAIHistoryEvent::ClearedConversationsInTerminalView { .. } + | BlocklistAIHistoryEvent::UpdatedAutoexecuteOverride { .. } + | BlocklistAIHistoryEvent::SplitConversation { .. } + | BlocklistAIHistoryEvent::RemoveConversation { .. } + | BlocklistAIHistoryEvent::DeletedConversation { .. } + | BlocklistAIHistoryEvent::RestoredConversations { .. } + | BlocklistAIHistoryEvent::UpdatedConversationArtifacts { .. } + | BlocklistAIHistoryEvent::ConversationServerTokenAssigned { .. } + | BlocklistAIHistoryEvent::ConversationOwnershipTransferred { .. } + | BlocklistAIHistoryEvent::NewConversationRequestComplete { .. } + | BlocklistAIHistoryEvent::OrchestrationConfigUpdated { .. } + | BlocklistAIHistoryEvent::LocalSharedSessionEstablished { .. } => None, + } } impl BlocklistAIController { @@ -95,18 +175,110 @@ impl BlocklistAIController { return; }; match kind { - warp_multi_agent_api::response_event::Type::Init(init) => { - self.on_shared_init(init, ctx) - } - warp_multi_agent_api::response_event::Type::ClientActions(actions) => { - self.on_shared_client_actions(actions, ctx) - } - warp_multi_agent_api::response_event::Type::Finished(finished) => { - self.on_shared_finished(finished, ctx); - } + warp_multi_agent_api::response_event::Type::Init(init) => self + .capture_history_events_if_coalescing(ctx, |controller, ctx| { + controller.on_shared_init(init, ctx) + }), + warp_multi_agent_api::response_event::Type::ClientActions(actions) => self + .capture_history_events_if_coalescing(ctx, |controller, ctx| { + controller.on_shared_client_actions(actions, ctx) + }), + warp_multi_agent_api::response_event::Type::Finished(finished) => self + .capture_history_events_if_coalescing(ctx, |controller, ctx| { + controller.on_shared_finished(finished, ctx); + }), } } + /// Returns whether history-model events emitted while processing shared-session response + /// events should currently be buffered instead of delivered immediately. Coalescing is + /// active while the viewer is replaying a backlog of events (joining mid-session or + /// receiving an existing-conversation replay), where delivering each event individually + /// triggers a full subscriber fanout per replayed delta and can freeze the UI. + fn is_coalescing_history_events(&self) -> bool { + FeatureFlag::CoalesceSharedSessionCatchUpEvents.is_enabled() + && (self.shared_session_state.is_catching_up_on_event_backlog + || self.shared_session_state.is_receiving_conversation_replay) + } + + /// Marks the start of the viewer's catch-up on the session's ordered event backlog. + pub fn begin_shared_session_event_backlog_catch_up(&mut self) { + self.shared_session_state.is_catching_up_on_event_backlog = true; + } + + /// Marks the end of the viewer's catch-up on the session's ordered event backlog, flushing + /// any coalesced history events. + pub fn end_shared_session_event_backlog_catch_up(&mut self, ctx: &mut ModelContext) { + self.shared_session_state.is_catching_up_on_event_backlog = false; + self.maybe_flush_coalesced_history_events(ctx); + } + + /// Records whether the sharer is currently replaying an existing agent conversation + /// (bracketed by the `AgentConversationReplayStarted`/`AgentConversationReplayEnded` + /// ordered events), flushing any coalesced history events when the replay ends. + pub fn set_is_receiving_conversation_replay( + &mut self, + value: bool, + ctx: &mut ModelContext, + ) { + self.shared_session_state.is_receiving_conversation_replay = value; + if !value { + self.maybe_flush_coalesced_history_events(ctx); + } + } + + /// Runs `f`, capturing any history-model events it emits into the coalescing buffer when a + /// coalescing window is active. When no window is active, `f` runs unchanged and its events + /// are delivered normally. + fn capture_history_events_if_coalescing( + &mut self, + ctx: &mut ModelContext, + f: impl FnOnce(&mut Self, &mut ModelContext) -> R, + ) -> R { + if !self.is_coalescing_history_events() { + return f(self, ctx); + } + let history = BlocklistAIHistoryModel::handle(ctx); + let (result, events) = ctx.capture_emitted_events_for_model(&history, |ctx| f(self, ctx)); + for event in events { + self.buffer_coalesced_history_event(event); + } + result + } + + /// Inserts a captured event into the coalescing buffer, superseding any earlier buffered + /// event with the same coalescing key. + fn buffer_coalesced_history_event(&mut self, event: BlocklistAIHistoryEvent) { + if let Some(key) = coalescing_key(&event) { + self.shared_session_state + .coalesced_history_events + .retain(|existing| coalescing_key(existing).as_ref() != Some(&key)); + } + self.shared_session_state + .coalesced_history_events + .push(event); + } + + /// Emits all buffered history events as a single batch, in buffer order, if no coalescing + /// window remains active. Subscribers then see one `flush_effects` fanout for the whole + /// backlog instead of one per replayed event. + fn maybe_flush_coalesced_history_events(&mut self, ctx: &mut ModelContext) { + if self.is_coalescing_history_events() + || self + .shared_session_state + .coalesced_history_events + .is_empty() + { + return; + } + let events = std::mem::take(&mut self.shared_session_state.coalesced_history_events); + BlocklistAIHistoryModel::handle(ctx).update(ctx, |_history, ctx| { + for event in events { + ctx.emit(event); + } + }); + } + fn on_shared_init( &mut self, init_event: warp_multi_agent_api::response_event::StreamInit, diff --git a/app/src/terminal/shared_session/viewer/event_loop.rs b/app/src/terminal/shared_session/viewer/event_loop.rs index 4cf1d1685d..579d7a5721 100644 --- a/app/src/terminal/shared_session/viewer/event_loop.rs +++ b/app/src/terminal/shared_session/viewer/event_loop.rs @@ -121,6 +121,15 @@ impl EventLoop { .set_shared_session_status(SharedSessionStatus::ActiveViewer { role: Default::default(), }); + } else if let Some(terminal_view) = terminal_view.upgrade(ctx) { + // Coalesce AI history events while replaying the join-time event backlog so each + // replayed delta does not trigger a full subscriber fanout, which can freeze the UI + // for large conversations. + terminal_view.update(ctx, |terminal_view, ctx| { + terminal_view.ai_controller().update(ctx, |controller, _| { + controller.begin_shared_session_event_backlog_catch_up(); + }); + }); } let mut event_loop = Self { @@ -350,10 +359,11 @@ impl EventLoop { let should_suppress_existing_replay = self.should_suppress_existing_agent_conversation_replay; view.update(ctx, |view, ctx| { - view.ai_controller().update(ctx, |controller, _| { + view.ai_controller().update(ctx, |controller, ctx| { controller.set_should_suppress_existing_agent_conversation_replay( should_suppress_existing_replay, ); + controller.set_is_receiving_conversation_replay(true, ctx); }); }); } @@ -364,9 +374,10 @@ impl EventLoop { .set_is_receiving_agent_conversation_replay(false); if let Some(view) = self.terminal_view.upgrade(ctx) { view.update(ctx, |view, ctx| { - view.ai_controller().update(ctx, |controller, _| { + view.ai_controller().update(ctx, |controller, ctx| { controller .set_should_suppress_existing_agent_conversation_replay(false); + controller.set_is_receiving_conversation_replay(false, ctx); }); }); } @@ -402,6 +413,14 @@ impl EventLoop { ); } } + + // Catch-up is complete; deliver any coalesced AI history events as a + // single batch. + view.update(ctx, |view, ctx| { + view.ai_controller().update(ctx, |controller, ctx| { + controller.end_shared_session_event_backlog_catch_up(ctx); + }); + }); } } diff --git a/crates/warp_features/src/lib.rs b/crates/warp_features/src/lib.rs index dd507080c5..a120d5267c 100644 --- a/crates/warp_features/src/lib.rs +++ b/crates/warp_features/src/lib.rs @@ -886,6 +886,13 @@ pub enum FeatureFlag { /// Gates the SuperGrok feature, which lets users /// connect a Grok subscription instead of pasting an API key. SuperGrok, + + /// Coalesces `BlocklistAIHistoryModel` events while a shared-session + /// viewer catches up on a session's event backlog, flushing one deduped + /// batch when caught up. This avoids a per-delta subscriber fanout that + /// can freeze the UI for tens of seconds when opening large cloud agent + /// conversations. + CoalesceSharedSessionCatchUpEvents, } static FLAG_STATES: [AtomicBool; cardinality::()] = @@ -955,6 +962,7 @@ pub const DOGFOOD_FLAGS: &[FeatureFlag] = &[ FeatureFlag::GPTConfigurableContextWindow, FeatureFlag::RestorePromptOnInlineModelSelectorSearch, FeatureFlag::SuperGrok, + FeatureFlag::CoalesceSharedSessionCatchUpEvents, ]; /// Features enabled for feature preview build users (e.g.: Friends of Warp). diff --git a/crates/warpui_core/src/core/model/context.rs b/crates/warpui_core/src/core/model/context.rs index 53b9b4c3a6..4095321579 100644 --- a/crates/warpui_core/src/core/model/context.rs +++ b/crates/warpui_core/src/core/model/context.rs @@ -195,6 +195,55 @@ impl<'a, T: Entity> ModelContext<'a, T> { }); } + /// Runs `f` and captures any events emitted for `handle`'s model during it, + /// instead of leaving them queued for delivery to subscribers. + /// + /// Effects emitted for other entities and non-event effects (notifications, + /// actions, etc.) are left in the queue untouched. The captured events are + /// returned in emission order; the caller decides whether and when to + /// re-emit them. This is useful for deferring or coalescing high-volume + /// event streams (e.g. while replaying a backlog) without changing the + /// emitting code paths. + pub fn capture_emitted_events_for_model( + &mut self, + handle: &ModelHandle, + f: impl FnOnce(&mut Self) -> R, + ) -> (R, Vec) + where + E: Entity, + E::Event: 'static, + { + let target_entity_id = handle.id(); + let start_index = self.app.pending_effects.len(); + let result = f(self); + + let mut captured = Vec::new(); + let mut index = start_index; + while index < self.app.pending_effects.len() { + let is_captured_event = matches!( + &self.app.pending_effects[index], + Effect::Event { entity_id, .. } if *entity_id == target_entity_id + ); + if is_captured_event { + // `remove` preserves the relative order of the remaining + // effects, so interleaved effects for other entities are + // delivered in their original order. + let Some(Effect::Event { payload, .. }) = self.app.pending_effects.remove(index) + else { + unreachable!("the effect at this index was just matched as an event"); + }; + let payload = payload + .downcast::() + .expect("events emitted with a model's entity id have its event type"); + captured.push(*payload); + } else { + index += 1; + } + } + + (result, captured) + } + /// Global actions are being phased out. Prefer dispatching typed actions instead of global actions. /// Dispatch a global action to be handled by the registered handler ///