diff --git a/crates/convergio-tui/src/client.rs b/crates/convergio-tui/src/client.rs index 5d3629a..bb41eda 100644 --- a/crates/convergio-tui/src/client.rs +++ b/crates/convergio-tui/src/client.rs @@ -4,11 +4,12 @@ //! `/v1/plans/{id}/messages/tail`, `/v1/audit/verify`, plus `gh pr list` (skipped when //! `CONVERGIO_DASH_NO_GH=1`). -use crate::client_gh::fetch_prs; +use crate::client_gh::{fetch_prs_closed, fetch_prs_open}; +use crate::client_pr_cache::{cached_fetch, PrCacheCell}; use anyhow::Result; use serde::Deserialize; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; pub use crate::plan_counts::PlanCounts; pub use crate::types::{AgentProcess, BusMessage, Plan, PrSummary, RegistryAgent, TaskSummary}; @@ -35,15 +36,6 @@ pub struct Snapshot { pub daemon_version: Option, } -/// PR data is cached for this long before the next `gh pr list` -/// shell-out. The dashboard tick is 5s by default; 30s means the -/// `gh` cost is amortised across ~6 refreshes without making the -/// PR pane feel stale (PR state turns over much slower than tasks). -const PR_CACHE_TTL: Duration = Duration::from_secs(30); - -/// Time-stamped cache entry for the PR list. -type PrCacheCell = Arc)>>>; - /// Read-only HTTP client. Cloneable. #[derive(Debug, Clone)] pub struct Client { @@ -51,7 +43,8 @@ pub struct Client { inner: reqwest::Client, enable_gh: bool, github_slug: Option, - pr_cache: PrCacheCell, + open_prs: PrCacheCell, + closed_prs: PrCacheCell, } impl Client { @@ -66,10 +59,17 @@ impl Client { .unwrap_or_else(|_| reqwest::Client::new()), enable_gh, github_slug: None, - pr_cache: Arc::new(Mutex::new(None)), + open_prs: Arc::new(Mutex::new(None)), + closed_prs: Arc::new(Mutex::new(None)), } } + /// `true` when `gh pr list` shell-out is allowed. Disabled when + /// `CONVERGIO_DASH_NO_GH=1` is set. + pub fn gh_enabled(&self) -> bool { + self.enable_gh + } + /// Scope `gh pr list` to `owner/repo` instead of inheriting cwd. /// `cvg dash` derives the slug from `origin` so the dashboard /// works from any directory. @@ -78,19 +78,21 @@ impl Client { self } - /// One-shot fetch of every dataset. Sub-fetches fail soft — - /// partial data is more useful than blanking the dashboard. + /// Fetch every dataset *except* the PRs. Returns within ~50ms + /// even on a cold start because no shell-out runs here. The + /// dashboard event loop emits this snapshot first to populate + /// Plans / Tasks / Agents while `gh pr list` is still in + /// flight — the user no longer waits 1-3s for the slowest + /// part of the refresh. /// - /// Per-plan fetches (`tasks` + `messages/tail`) run in parallel - /// via `futures::future::join_all`; without this fan-out the - /// loop is N+1 and dashes with 40+ plans block for hundreds of - /// ms per refresh on loopback. Global fetches (registry, - /// processes, audit, health) run concurrently with the - /// per-plan fan-out via `tokio::join!`. The PRs `gh pr list` - /// shell-out also runs concurrently (it is the dominant cost, - /// ~600ms on a warm cache) and is additionally memoised for - /// [`PR_CACHE_TTL`] so most refreshes pay zero gh cost. - pub async fn snapshot(&self) -> Result { + /// Per-plan fetches (`tasks` + `messages/tail`) run in + /// parallel via `futures::future::join_all`; without this + /// fan-out the loop is N+1 and dashes with 40+ plans block + /// for hundreds of ms per refresh on loopback. Global + /// fetches (registry, processes, audit, health) run + /// concurrently with the per-plan fan-out via + /// `tokio::join!`. + pub async fn snapshot_core(&self) -> Result { let mut plans: Vec = self .get_json("/v1/plans") .await @@ -103,12 +105,10 @@ impl Client { .iter() .map(|id| self.fetch_plan_overview(id.clone())), ); - let global_fetches = self.fetch_globals(); - let prs_future = self.fetch_prs_cached(); - let (plan_results, (agents, agent_processes, audit_ok, daemon_version), prs) = - tokio::join!(plan_fetches, global_fetches, prs_future); + let (plan_results, (agents, agent_processes, audit_ok, daemon_version)) = + tokio::join!(plan_fetches, global_fetches); let mut tasks: Vec = Vec::new(); let mut messages: Vec = Vec::new(); @@ -127,51 +127,48 @@ impl Client { tasks, agents, agent_processes, - prs, + prs: Vec::new(), messages, audit_ok, daemon_version, }) } - /// PR fetch with a [`PR_CACHE_TTL`] memo. Returns the cached - /// vector when fresh; otherwise shells out to `gh` (off the - /// blocking thread, see [`crate::client_gh::fetch_prs`]) and - /// updates the cache on success. A failed fetch keeps the - /// stale cache rather than blanking the PRs pane — partial - /// data beats no data. - async fn fetch_prs_cached(&self) -> Vec { - if !self.enable_gh { - return Vec::new(); - } - if let Some((stamped_at, cached)) = self.pr_cache_snapshot() { - if stamped_at.elapsed() < PR_CACHE_TTL { - return cached; - } - } - match fetch_prs(self.github_slug.as_deref()).await { - Ok(prs) => { - self.pr_cache_store(prs.clone()); - prs - } - Err(_) => self.pr_cache_snapshot().map(|(_, v)| v).unwrap_or_default(), - } + /// Fetch open PRs with a 30s memo (see + /// [`crate::client_pr_cache::PR_CACHE_TTL`]). Fast (~0.5s + /// cold) — the PR pane's first paint after a cache miss. + pub async fn fetch_prs_open_cached(&self) -> Vec { + cached_fetch( + self.enable_gh, + &self.open_prs, + fetch_prs_open(self.github_slug.as_deref()), + ) + .await } - fn pr_cache_snapshot(&self) -> Option<(Instant, Vec)> { - let guard = match self.pr_cache.lock() { - Ok(g) => g, - Err(p) => p.into_inner(), - }; - guard.clone() + /// Fetch closed/merged PRs with the same memo. Slower (~1-3s + /// cold on busy repos) so it runs after the open list paints. + pub async fn fetch_prs_closed_cached(&self) -> Vec { + cached_fetch( + self.enable_gh, + &self.closed_prs, + fetch_prs_closed(self.github_slug.as_deref()), + ) + .await } - fn pr_cache_store(&self, prs: Vec) { - let mut guard = match self.pr_cache.lock() { - Ok(g) => g, - Err(p) => p.into_inner(), - }; - *guard = Some((Instant::now(), prs)); + /// Backwards-compatible single-shot snapshot: core + open + + /// closed PRs in sequence. Kept for the + /// `Client`-driven test path and the `snapshot_bench` + /// example; the live dashboard uses `snapshot_core` plus the + /// progressive PR fetch in [`crate::run`]. + pub async fn snapshot(&self) -> Result { + let mut snap = self.snapshot_core().await?; + let (open, closed) = + tokio::join!(self.fetch_prs_open_cached(), self.fetch_prs_closed_cached(),); + snap.prs = open; + snap.prs.extend(closed); + Ok(snap) } async fn fetch_plan_overview( diff --git a/crates/convergio-tui/src/client_gh.rs b/crates/convergio-tui/src/client_gh.rs index 88a6c6f..26ff66a 100644 --- a/crates/convergio-tui/src/client_gh.rs +++ b/crates/convergio-tui/src/client_gh.rs @@ -9,31 +9,56 @@ use crate::client::PrSummary; use anyhow::Result; use tokio::process::Command; -/// How many PRs we ask `gh` for. The dashboard previously requested -/// 100, which routinely cost ~1s and blocked the (then-synchronous) -/// refresh. 50 still covers the active queue with margin and keeps -/// `gh` under ~600ms in our measurements. -const PR_LIMIT: &str = "50"; - -/// Run `gh pr list` and parse the JSON. When `slug` is `Some`, the -/// query is scoped to that `owner/repo` (`gh pr list -R `) so -/// the dashboard works from any cwd. When `None`, gh inherits cwd — -/// original behaviour, kept for shells run inside a repo with no -/// workspace `Cargo.toml`. -/// -/// Async because the pre-1s shell-out used to block the dashboard's -/// event loop; with `tokio::process::Command` it cooperates with the -/// rest of the snapshot's `tokio::join!` fan-out. -pub async fn fetch_prs(slug: Option<&str>) -> Result> { +/// Open PRs are the dashboard's hot set: small, ever-changing, and +/// where CI status matters in real time. We pay for +/// `statusCheckRollup` here because the request is naturally cheap +/// (~0.5s on a typical repo). +const OPEN_LIMIT: &str = "30"; + +/// Closed/merged PRs are the historical tail. We fetch them with a +/// minimal field set — `statusCheckRollup` triggers a per-PR API +/// round-trip on the GitHub side and pushed our shell-out from +/// 0.5s to 4s on a busy repo. CI on a closed PR is finalised +/// anyway, so we render `?` rather than blocking the dashboard. +const CLOSED_LIMIT: &str = "30"; + +const OPEN_FIELDS: &str = "number,title,headRefName,state,statusCheckRollup,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body"; +const CLOSED_FIELDS: &str = "number,title,headRefName,state,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body"; + +/// Fetch open PRs only. Fast (~0.5s) — the dashboard's first PR +/// paint after a cache miss. See [`OPEN_FIELDS`] for what's +/// included. +pub async fn fetch_prs_open(slug: Option<&str>) -> Result> { + fetch_prs_with_args(slug, "open", OPEN_LIMIT, OPEN_FIELDS).await +} + +/// Fetch merged + closed PRs. Slower (~1-3s on busy repos) and +/// runs after the open list has already painted. Field set +/// excludes `statusCheckRollup` — see [`CLOSED_LIMIT`] for the +/// rationale. +pub async fn fetch_prs_closed(slug: Option<&str>) -> Result> { + // `gh pr list --state` accepts only one of {open, closed, + // merged, all}. We want merged+closed but not open; the + // closest single value is `closed`, which on `gh` covers + // both. + fetch_prs_with_args(slug, "closed", CLOSED_LIMIT, CLOSED_FIELDS).await +} + +async fn fetch_prs_with_args( + slug: Option<&str>, + state: &str, + limit: &str, + fields: &str, +) -> Result> { let mut args: Vec = vec![ "pr".into(), "list".into(), "--state".into(), - "all".into(), + state.into(), "--limit".into(), - PR_LIMIT.into(), + limit.into(), "--json".into(), - "number,title,headRefName,state,statusCheckRollup,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body".into(), + fields.into(), ]; if let Some(s) = slug { args.push("-R".into()); diff --git a/crates/convergio-tui/src/client_pr_cache.rs b/crates/convergio-tui/src/client_pr_cache.rs new file mode 100644 index 0000000..be8cc9a --- /dev/null +++ b/crates/convergio-tui/src/client_pr_cache.rs @@ -0,0 +1,63 @@ +//! Time-bounded memo for `gh pr list` results. +//! +//! Split out from [`crate::client`] so that file stays under the +//! 300-line cap and the locking pattern is testable in isolation. + +use crate::client::PrSummary; +use anyhow::Result; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +/// PR data is cached for this long before the next `gh pr list` +/// shell-out. The dashboard tick is 5s by default; 30s means the +/// `gh` cost is amortised across ~6 refreshes without making the +/// PR pane feel stale (PR state turns over much slower than tasks). +pub const PR_CACHE_TTL: Duration = Duration::from_secs(30); + +/// Time-stamped cache entry for one slice of the PR list. +pub type PrCacheCell = Arc)>>>; + +/// Read the current cache entry, if any. +pub fn read_slot(slot: &PrCacheCell) -> Option<(Instant, Vec)> { + let guard = match slot.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + guard.clone() +} + +/// Replace the cache entry with `prs` stamped at the current +/// instant. +pub fn write_slot(slot: &PrCacheCell, prs: Vec) { + let mut guard = match slot.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + *guard = Some((Instant::now(), prs)); +} + +/// Read-through cache: return the cached vector when fresh, +/// otherwise await `fetch`, store its result on success, and fall +/// back to the previous (possibly stale) cache on failure. Returns +/// an empty vector when `enable_gh` is `false`. +pub async fn cached_fetch( + enable_gh: bool, + slot: &PrCacheCell, + fetch: impl std::future::Future>>, +) -> Vec { + if !enable_gh { + return Vec::new(); + } + if let Some((stamped_at, cached)) = read_slot(slot) { + if stamped_at.elapsed() < PR_CACHE_TTL { + return cached; + } + } + match fetch.await { + Ok(prs) => { + write_slot(slot, prs.clone()); + prs + } + Err(_) => read_slot(slot).map(|(_, v)| v).unwrap_or_default(), + } +} diff --git a/crates/convergio-tui/src/lib.rs b/crates/convergio-tui/src/lib.rs index 443ad68..7cdc436 100644 --- a/crates/convergio-tui/src/lib.rs +++ b/crates/convergio-tui/src/lib.rs @@ -25,6 +25,7 @@ pub mod agent_filter; pub mod bus_stream; pub mod client; pub mod client_gh; +pub mod client_pr_cache; pub mod header_banner; pub mod keymap; pub mod mode; @@ -62,10 +63,25 @@ use std::io::{self, Stdout}; use std::time::Duration; use tokio::sync::mpsc; -use crate::client::{Client, Snapshot}; +use crate::client::{Client, PrSummary, Snapshot}; use crate::keymap::{Action, KeyMap}; use crate::state::{AppMode, AppState}; +/// One step of a progressive snapshot. The dashboard refresh emits +/// these in order: `Core` first (no gh shell-out, ~50ms), then +/// `Prs` once for the open list (~0.5s), then `Prs` again for +/// open+closed combined (~1-3s on busy repos). Each event lets the +/// UI repaint without waiting for the slowest part. +#[derive(Debug)] +pub enum SnapshotEvent { + /// Core dataset (plans, tasks, agents, messages, audit). PRs + /// arrive separately via [`SnapshotEvent::Prs`]. + Core(Result), + /// Updated PR list — replaces whatever was there. Sent twice + /// per refresh (open-only, then open+closed). + Prs(Vec), +} + /// Tick interval bounds. Outside this band the dashboard is either /// hammering the daemon (too fast) or sleeping past usefulness (too /// slow); we clamp. @@ -118,10 +134,11 @@ async fn event_loop( }; let keymap = KeyMap; - // Snapshot results flow back through this channel. Capacity of 2 - // keeps a stale-then-fresh pair in flight without ever blocking - // the producer; we are the single consumer. - let (snap_tx, mut snap_rx) = mpsc::channel::>(2); + // Snapshot events flow back through this channel. Capacity of + // 4 holds the worst-case in-flight set (Core + 2× Prs from one + // refresh, plus a margin for an overlapping tick) without ever + // blocking the producer; we are the single consumer. + let (snap_tx, mut snap_rx) = mpsc::channel::(4); let mut refresh_in_flight = false; // Kick the first refresh off the event loop so the skeleton frame @@ -144,9 +161,18 @@ async fn event_loop( _ = interval.tick() => { spawn_refresh(&client, &snap_tx, &mut refresh_in_flight); } - Some(snap) = snap_rx.recv() => { - refresh_in_flight = false; - state.apply_snapshot(snap); + Some(event) = snap_rx.recv() => { + match event { + SnapshotEvent::Core(snap) => { + // Free the in-flight slot as soon as the core + // arrives; the trailing PR fetches keep + // running but the next tick can already start + // a fresh core fetch if the user pressed `r`. + refresh_in_flight = false; + state.apply_snapshot(snap); + } + SnapshotEvent::Prs(prs) => state.apply_prs(prs), + } } poll = poll_key() => { if let Some(action) = poll? { @@ -188,10 +214,19 @@ async fn event_loop( Ok(()) } -/// Spawn the snapshot fetch off the event loop. `in_flight` debounces -/// concurrent refreshes — the periodic tick and a manual `r` arriving -/// inside the same gh shell-out window must not stack. -fn spawn_refresh(client: &Client, tx: &mpsc::Sender>, in_flight: &mut bool) { +/// Spawn the progressive snapshot fetch off the event loop. +/// +/// Emits three events on `tx` so the dashboard can repaint in +/// stages instead of waiting for the slowest fetch: +/// +/// 1. [`SnapshotEvent::Core`] — every dataset except PRs (~50ms +/// on a warm pool). Frees `in_flight` so the next tick can +/// overlap the trailing PR fetches. +/// 2. [`SnapshotEvent::Prs`] with open PRs only (~0.5s). +/// 3. [`SnapshotEvent::Prs`] with open + closed combined +/// (~1-3s on busy repos with `statusCheckRollup`-driven API +/// fan-out). Skipped when `gh` is disabled. +fn spawn_refresh(client: &Client, tx: &mpsc::Sender, in_flight: &mut bool) { if *in_flight { return; } @@ -199,8 +234,21 @@ fn spawn_refresh(client: &Client, tx: &mpsc::Sender>, in_flight let client = client.clone(); let tx = tx.clone(); tokio::spawn(async move { - let snap = client.snapshot().await; - let _ = tx.send(snap).await; + let snap = client.snapshot_core().await; + if tx.send(SnapshotEvent::Core(snap)).await.is_err() { + return; + } + if !client.gh_enabled() { + return; + } + let open = client.fetch_prs_open_cached().await; + if tx.send(SnapshotEvent::Prs(open.clone())).await.is_err() { + return; + } + let closed = client.fetch_prs_closed_cached().await; + let mut combined = open; + combined.extend(closed); + let _ = tx.send(SnapshotEvent::Prs(combined)).await; }); } diff --git a/crates/convergio-tui/src/state.rs b/crates/convergio-tui/src/state.rs index 95cd9c6..d41fec9 100644 --- a/crates/convergio-tui/src/state.rs +++ b/crates/convergio-tui/src/state.rs @@ -218,6 +218,14 @@ impl AppState { self.show_exited_agents = !self.show_exited_agents; } + /// Replace the PR list without touching the rest of the state. + /// Lets the progressive-snapshot path emit PRs as soon as + /// `gh pr list` returns, without waiting for a full + /// [`AppState::apply_snapshot`]. + pub fn apply_prs(&mut self, prs: Vec) { + self.prs = prs; + } + /// Active transport for the Bus pane footer hint. pub fn bus_transport(&self) -> BusTransport { self.bus_stream diff --git a/crates/convergio-tui/src/state_lifecycle.rs b/crates/convergio-tui/src/state_lifecycle.rs index 7c9fd47..073bace 100644 --- a/crates/convergio-tui/src/state_lifecycle.rs +++ b/crates/convergio-tui/src/state_lifecycle.rs @@ -22,6 +22,13 @@ impl AppState { /// client. Used by the async refresh path in [`crate::run`] so /// the event loop never blocks on the snapshot. Failed fetches /// keep the previous data and flip the connection indicator. + /// + /// The PR list is only overwritten when the snapshot carries a + /// non-empty vector. This protects the progressive-snapshot + /// path: `Client::snapshot_core` returns an empty `prs` field + /// because the gh shell-out runs separately, and we don't want + /// the cached PR list to flash to empty between the core + /// arrival and the first PR payload. pub fn apply_snapshot(&mut self, snapshot: anyhow::Result) { match snapshot { Ok(s) => { @@ -29,7 +36,9 @@ impl AppState { self.tasks = s.tasks; self.agents = s.agents; self.agent_processes = s.agent_processes; - self.prs = s.prs; + if !s.prs.is_empty() { + self.prs = s.prs; + } self.messages = s.messages; self.audit_ok = s.audit_ok; self.daemon_version = s.daemon_version;