Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 184 additions & 12 deletions app/src/ai/blocklist/controller/shared_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use warp_core::features::FeatureFlag;
use warp_multi_agent_api::client_action::Action;
use warp_multi_agent_api::message::Message;
use warp_multi_agent_api::response_event::{stream_finished, ClientActions};
use warpui::{AppContext, ModelContext, SingletonEntity};
use warpui::{AppContext, EntityId, ModelContext, SingletonEntity};

use super::response_stream::ResponseStreamId;
use super::{BlocklistAIController, RequestInput, SessionContext};
use crate::ai::agent::conversation::{AIConversationId, ConversationStatus};
use crate::ai::agent::{AIAgentActionId, AIAgentAttachment, EntrypointType};
use crate::ai::agent::{AIAgentActionId, AIAgentAttachment, AIAgentExchangeId, EntrypointType};
use crate::ai::attachment_utils::{
build_file_attachment_map, download_file, sanitize_filename, DownloadedAttachment,
};
use crate::ai::blocklist::agent_view::AgentViewEntryOrigin;
use crate::ai::blocklist::history_model::BlocklistAIHistoryModel;
use crate::ai::blocklist::history_model::{BlocklistAIHistoryEvent, BlocklistAIHistoryModel};
use crate::server::server_api::ServerApiProvider;
use crate::terminal::model::block::BlockId;

Expand All @@ -38,6 +38,86 @@ pub(super) struct SharedSessionState {
current_response_initiator: Option<ParticipantId>,
// The sharer's participant ID (set when session sharing starts)
sharer_participant_id: Option<ParticipantId>,
/// `true` while the viewer's event loop is catching up on the backlog of ordered events that
/// existed when the session was joined.
is_catching_up_on_event_backlog: bool,
/// `true` while the sharer is replaying an existing agent conversation (bracketed by the
/// `AgentConversationReplayStarted`/`AgentConversationReplayEnded` ordered events).
is_receiving_conversation_replay: bool,
/// History-model events captured while coalescing is active. High-volume events are deduped
/// on insert (see [`coalescing_key`]); the buffer is flushed as a single batch once no
/// coalescing window remains active.
coalesced_history_events: Vec<BlocklistAIHistoryEvent>,
}

/// Identity of a buffered history event used for deduplication while coalescing. A new event
/// that maps to the same key as an earlier buffered event supersedes it: the stale entry is
/// removed and the new one is appended, so the latest payload wins in both content and
/// ordering. Events with no key are always appended and delivered in emission order.
#[derive(PartialEq, Eq)]
enum CoalescedHistoryEventKey {
UpdatedStreamingExchange { exchange_id: AIAgentExchangeId },
UpdatedTodoList { terminal_view_id: EntityId },
UpdatedConversationStatus { conversation_id: AIConversationId },
UpdatedConversationMetadata { conversation_id: AIConversationId },
ConversationUsageMetadataUpdated { conversation_id: AIConversationId },
SetActiveConversation { terminal_view_id: EntityId },
}

/// Returns the deduplication key for a history event, or `None` for events that should never
/// be coalesced. Only events whose subscribers re-read the latest model state (rather than
/// depending on each intermediate payload) are safe to dedupe.
fn coalescing_key(event: &BlocklistAIHistoryEvent) -> Option<CoalescedHistoryEventKey> {
match event {
BlocklistAIHistoryEvent::UpdatedStreamingExchange { exchange_id, .. } => {
Some(CoalescedHistoryEventKey::UpdatedStreamingExchange {
exchange_id: *exchange_id,
})
}
BlocklistAIHistoryEvent::UpdatedTodoList { terminal_view_id } => {
Some(CoalescedHistoryEventKey::UpdatedTodoList {
terminal_view_id: *terminal_view_id,
})
}
BlocklistAIHistoryEvent::UpdatedConversationStatus {
conversation_id, ..
} => Some(CoalescedHistoryEventKey::UpdatedConversationStatus {
conversation_id: *conversation_id,
}),
BlocklistAIHistoryEvent::UpdatedConversationMetadata {
conversation_id, ..
} => Some(CoalescedHistoryEventKey::UpdatedConversationMetadata {
conversation_id: *conversation_id,
}),
BlocklistAIHistoryEvent::ConversationUsageMetadataUpdated { conversation_id } => {
Some(CoalescedHistoryEventKey::ConversationUsageMetadataUpdated {
conversation_id: *conversation_id,
})
}
BlocklistAIHistoryEvent::SetActiveConversation {
terminal_view_id, ..
} => Some(CoalescedHistoryEventKey::SetActiveConversation {
terminal_view_id: *terminal_view_id,
}),
BlocklistAIHistoryEvent::StartedNewConversation { .. }
| BlocklistAIHistoryEvent::CreatedSubtask { .. }
| BlocklistAIHistoryEvent::UpgradedTask { .. }
| BlocklistAIHistoryEvent::AppendedExchange { .. }
| BlocklistAIHistoryEvent::ReassignedExchange { .. }
| BlocklistAIHistoryEvent::ClearedActiveConversation { .. }
| BlocklistAIHistoryEvent::ClearedConversationsInTerminalView { .. }
| BlocklistAIHistoryEvent::UpdatedAutoexecuteOverride { .. }
| BlocklistAIHistoryEvent::SplitConversation { .. }
| BlocklistAIHistoryEvent::RemoveConversation { .. }
| BlocklistAIHistoryEvent::DeletedConversation { .. }
| BlocklistAIHistoryEvent::RestoredConversations { .. }
| BlocklistAIHistoryEvent::UpdatedConversationArtifacts { .. }
| BlocklistAIHistoryEvent::ConversationServerTokenAssigned { .. }
| BlocklistAIHistoryEvent::ConversationOwnershipTransferred { .. }
| BlocklistAIHistoryEvent::NewConversationRequestComplete { .. }
| BlocklistAIHistoryEvent::OrchestrationConfigUpdated { .. }
| BlocklistAIHistoryEvent::LocalSharedSessionEstablished { .. } => None,
}
}

impl BlocklistAIController {
Expand Down Expand Up @@ -95,18 +175,110 @@ impl BlocklistAIController {
return;
};
match kind {
warp_multi_agent_api::response_event::Type::Init(init) => {
self.on_shared_init(init, ctx)
}
warp_multi_agent_api::response_event::Type::ClientActions(actions) => {
self.on_shared_client_actions(actions, ctx)
}
warp_multi_agent_api::response_event::Type::Finished(finished) => {
self.on_shared_finished(finished, ctx);
}
warp_multi_agent_api::response_event::Type::Init(init) => self
.capture_history_events_if_coalescing(ctx, |controller, ctx| {
controller.on_shared_init(init, ctx)
}),
warp_multi_agent_api::response_event::Type::ClientActions(actions) => self
.capture_history_events_if_coalescing(ctx, |controller, ctx| {
controller.on_shared_client_actions(actions, ctx)
}),
warp_multi_agent_api::response_event::Type::Finished(finished) => self
.capture_history_events_if_coalescing(ctx, |controller, ctx| {
controller.on_shared_finished(finished, ctx);
}),
}
}

/// Returns whether history-model events emitted while processing shared-session response
/// events should currently be buffered instead of delivered immediately. Coalescing is
/// active while the viewer is replaying a backlog of events (joining mid-session or
/// receiving an existing-conversation replay), where delivering each event individually
/// triggers a full subscriber fanout per replayed delta and can freeze the UI.
fn is_coalescing_history_events(&self) -> bool {
FeatureFlag::CoalesceSharedSessionCatchUpEvents.is_enabled()
&& (self.shared_session_state.is_catching_up_on_event_backlog
|| self.shared_session_state.is_receiving_conversation_replay)
}

/// Marks the start of the viewer's catch-up on the session's ordered event backlog.
pub fn begin_shared_session_event_backlog_catch_up(&mut self) {
self.shared_session_state.is_catching_up_on_event_backlog = true;
}

/// Marks the end of the viewer's catch-up on the session's ordered event backlog, flushing
/// any coalesced history events.
pub fn end_shared_session_event_backlog_catch_up(&mut self, ctx: &mut ModelContext<Self>) {
self.shared_session_state.is_catching_up_on_event_backlog = false;
self.maybe_flush_coalesced_history_events(ctx);
}

/// Records whether the sharer is currently replaying an existing agent conversation
/// (bracketed by the `AgentConversationReplayStarted`/`AgentConversationReplayEnded`
/// ordered events), flushing any coalesced history events when the replay ends.
pub fn set_is_receiving_conversation_replay(
&mut self,
value: bool,
ctx: &mut ModelContext<Self>,
) {
self.shared_session_state.is_receiving_conversation_replay = value;
if !value {
self.maybe_flush_coalesced_history_events(ctx);
}
}

/// Runs `f`, capturing any history-model events it emits into the coalescing buffer when a
/// coalescing window is active. When no window is active, `f` runs unchanged and its events
/// are delivered normally.
fn capture_history_events_if_coalescing<R>(
&mut self,
ctx: &mut ModelContext<Self>,
f: impl FnOnce(&mut Self, &mut ModelContext<Self>) -> R,
) -> R {
if !self.is_coalescing_history_events() {
return f(self, ctx);
}
let history = BlocklistAIHistoryModel::handle(ctx);
let (result, events) = ctx.capture_emitted_events_for_model(&history, |ctx| f(self, ctx));
for event in events {
self.buffer_coalesced_history_event(event);
}
result
}

/// Inserts a captured event into the coalescing buffer, superseding any earlier buffered
/// event with the same coalescing key.
fn buffer_coalesced_history_event(&mut self, event: BlocklistAIHistoryEvent) {
if let Some(key) = coalescing_key(&event) {
self.shared_session_state
.coalesced_history_events
.retain(|existing| coalescing_key(existing).as_ref() != Some(&key));
}
self.shared_session_state
.coalesced_history_events
.push(event);
}

/// Emits all buffered history events as a single batch, in buffer order, if no coalescing
/// window remains active. Subscribers then see one `flush_effects` fanout for the whole
/// backlog instead of one per replayed event.
fn maybe_flush_coalesced_history_events(&mut self, ctx: &mut ModelContext<Self>) {
if self.is_coalescing_history_events()
|| self
.shared_session_state
.coalesced_history_events
.is_empty()
{
return;
}
let events = std::mem::take(&mut self.shared_session_state.coalesced_history_events);
BlocklistAIHistoryModel::handle(ctx).update(ctx, |_history, ctx| {
for event in events {
ctx.emit(event);
}
});
}

fn on_shared_init(
&mut self,
init_event: warp_multi_agent_api::response_event::StreamInit,
Expand Down
23 changes: 21 additions & 2 deletions app/src/terminal/shared_session/viewer/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ impl EventLoop {
.set_shared_session_status(SharedSessionStatus::ActiveViewer {
role: Default::default(),
});
} else if let Some(terminal_view) = terminal_view.upgrade(ctx) {
// Coalesce AI history events while replaying the join-time event backlog so each
// replayed delta does not trigger a full subscriber fanout, which can freeze the UI
// for large conversations.
terminal_view.update(ctx, |terminal_view, ctx| {
terminal_view.ai_controller().update(ctx, |controller, _| {
controller.begin_shared_session_event_backlog_catch_up();
});
});
}

let mut event_loop = Self {
Expand Down Expand Up @@ -350,10 +359,11 @@ impl EventLoop {
let should_suppress_existing_replay =
self.should_suppress_existing_agent_conversation_replay;
view.update(ctx, |view, ctx| {
view.ai_controller().update(ctx, |controller, _| {
view.ai_controller().update(ctx, |controller, ctx| {
controller.set_should_suppress_existing_agent_conversation_replay(
should_suppress_existing_replay,
);
controller.set_is_receiving_conversation_replay(true, ctx);
});
});
}
Expand All @@ -364,9 +374,10 @@ impl EventLoop {
.set_is_receiving_agent_conversation_replay(false);
if let Some(view) = self.terminal_view.upgrade(ctx) {
view.update(ctx, |view, ctx| {
view.ai_controller().update(ctx, |controller, _| {
view.ai_controller().update(ctx, |controller, ctx| {
controller
.set_should_suppress_existing_agent_conversation_replay(false);
controller.set_is_receiving_conversation_replay(false, ctx);
});
});
}
Expand Down Expand Up @@ -402,6 +413,14 @@ impl EventLoop {
);
}
}

// Catch-up is complete; deliver any coalesced AI history events as a
// single batch.
view.update(ctx, |view, ctx| {
view.ai_controller().update(ctx, |controller, ctx| {
controller.end_shared_session_event_backlog_catch_up(ctx);
});
});
}
}

Expand Down
8 changes: 8 additions & 0 deletions crates/warp_features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,13 @@ pub enum FeatureFlag {
/// Gates the SuperGrok feature, which lets users
/// connect a Grok subscription instead of pasting an API key.
SuperGrok,

/// Coalesces `BlocklistAIHistoryModel` events while a shared-session
/// viewer catches up on a session's event backlog, flushing one deduped
/// batch when caught up. This avoids a per-delta subscriber fanout that
/// can freeze the UI for tens of seconds when opening large cloud agent
/// conversations.
CoalesceSharedSessionCatchUpEvents,
}

static FLAG_STATES: [AtomicBool; cardinality::<FeatureFlag>()] =
Expand Down Expand Up @@ -955,6 +962,7 @@ pub const DOGFOOD_FLAGS: &[FeatureFlag] = &[
FeatureFlag::GPTConfigurableContextWindow,
FeatureFlag::RestorePromptOnInlineModelSelectorSearch,
FeatureFlag::SuperGrok,
FeatureFlag::CoalesceSharedSessionCatchUpEvents,
];

/// Features enabled for feature preview build users (e.g.: Friends of Warp).
Expand Down
49 changes: 49 additions & 0 deletions crates/warpui_core/src/core/model/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,55 @@ impl<'a, T: Entity> ModelContext<'a, T> {
});
}

/// Runs `f` and captures any events emitted for `handle`'s model during it,
/// instead of leaving them queued for delivery to subscribers.
///
/// Effects emitted for other entities and non-event effects (notifications,
/// actions, etc.) are left in the queue untouched. The captured events are
/// returned in emission order; the caller decides whether and when to
/// re-emit them. This is useful for deferring or coalescing high-volume
/// event streams (e.g. while replaying a backlog) without changing the
/// emitting code paths.
pub fn capture_emitted_events_for_model<E, R>(
&mut self,
handle: &ModelHandle<E>,
f: impl FnOnce(&mut Self) -> R,
) -> (R, Vec<E::Event>)
where
E: Entity,
E::Event: 'static,
{
let target_entity_id = handle.id();
let start_index = self.app.pending_effects.len();
let result = f(self);

let mut captured = Vec::new();
let mut index = start_index;
while index < self.app.pending_effects.len() {
let is_captured_event = matches!(
&self.app.pending_effects[index],
Effect::Event { entity_id, .. } if *entity_id == target_entity_id
);
if is_captured_event {
// `remove` preserves the relative order of the remaining
// effects, so interleaved effects for other entities are
// delivered in their original order.
let Some(Effect::Event { payload, .. }) = self.app.pending_effects.remove(index)
else {
unreachable!("the effect at this index was just matched as an event");
};
let payload = payload
.downcast::<E::Event>()
.expect("events emitted with a model's entity id have its event type");
captured.push(*payload);
} else {
index += 1;
}
}

(result, captured)
}

/// Global actions are being phased out. Prefer dispatching typed actions instead of global actions.
/// Dispatch a global action to be handled by the registered handler
///
Expand Down