Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions crates/convergio-tui/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
use crate::client_gh::fetch_prs;
use anyhow::Result;
use serde::Deserialize;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

pub use crate::plan_counts::PlanCounts;
pub use crate::types::{AgentProcess, BusMessage, Plan, PrSummary, RegistryAgent, TaskSummary};
Expand All @@ -34,13 +35,23 @@ pub struct Snapshot {
pub daemon_version: Option<String>,
}

/// PR data is cached for this long before the next `gh pr list`
/// shell-out. The dashboard tick is 5s by default; 30s means the
/// `gh` cost is amortised across ~6 refreshes without making the
/// PR pane feel stale (PR state turns over much slower than tasks).
const PR_CACHE_TTL: Duration = Duration::from_secs(30);

/// Time-stamped cache entry for the PR list.
type PrCacheCell = Arc<Mutex<Option<(Instant, Vec<PrSummary>)>>>;

/// Read-only HTTP client. Cloneable.
#[derive(Debug, Clone)]
pub struct Client {
base: String,
inner: reqwest::Client,
enable_gh: bool,
github_slug: Option<String>,
pr_cache: PrCacheCell,
}

impl Client {
Expand All @@ -55,6 +66,7 @@ impl Client {
.unwrap_or_else(|_| reqwest::Client::new()),
enable_gh,
github_slug: None,
pr_cache: Arc::new(Mutex::new(None)),
}
}

Expand All @@ -75,8 +87,9 @@ impl Client {
/// ms per refresh on loopback. Global fetches (registry,
/// processes, audit, health) run concurrently with the
/// per-plan fan-out via `tokio::join!`. The PRs `gh pr list`
/// shell-out is sequential because it blocks anyway and runs
/// after the parallel batch returns.
/// shell-out also runs concurrently (it is the dominant cost,
/// ~600ms on a warm cache) and is additionally memoised for
/// [`PR_CACHE_TTL`] so most refreshes pay zero gh cost.
pub async fn snapshot(&self) -> Result<Snapshot> {
let mut plans: Vec<Plan> = self
.get_json("/v1/plans")
Expand All @@ -92,9 +105,10 @@ impl Client {
);

let global_fetches = self.fetch_globals();
let prs_future = self.fetch_prs_cached();

let (plan_results, (agents, agent_processes, audit_ok, daemon_version)) =
tokio::join!(plan_fetches, global_fetches);
let (plan_results, (agents, agent_processes, audit_ok, daemon_version), prs) =
tokio::join!(plan_fetches, global_fetches, prs_future);

let mut tasks: Vec<TaskSummary> = Vec::new();
let mut messages: Vec<BusMessage> = Vec::new();
Expand All @@ -108,12 +122,6 @@ impl Client {
messages.sort_by_key(|m| std::cmp::Reverse(m.seq));
messages.truncate(200);

let prs = if self.enable_gh {
fetch_prs(self.github_slug.as_deref()).unwrap_or_default()
} else {
Vec::new()
};

Ok(Snapshot {
plans,
tasks,
Expand All @@ -126,6 +134,46 @@ impl Client {
})
}

/// PR fetch with a [`PR_CACHE_TTL`] memo. Returns the cached
/// vector when fresh; otherwise shells out to `gh` (off the
/// blocking thread, see [`crate::client_gh::fetch_prs`]) and
/// updates the cache on success. A failed fetch keeps the
/// stale cache rather than blanking the PRs pane — partial
/// data beats no data.
async fn fetch_prs_cached(&self) -> Vec<PrSummary> {
if !self.enable_gh {
return Vec::new();
}
if let Some((stamped_at, cached)) = self.pr_cache_snapshot() {
if stamped_at.elapsed() < PR_CACHE_TTL {
return cached;
}
}
match fetch_prs(self.github_slug.as_deref()).await {
Ok(prs) => {
self.pr_cache_store(prs.clone());
prs
}
Err(_) => self.pr_cache_snapshot().map(|(_, v)| v).unwrap_or_default(),
}
}

fn pr_cache_snapshot(&self) -> Option<(Instant, Vec<PrSummary>)> {
let guard = match self.pr_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
guard.clone()
}

fn pr_cache_store(&self, prs: Vec<PrSummary>) {
let mut guard = match self.pr_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
*guard = Some((Instant::now(), prs));
}

async fn fetch_plan_overview(
&self,
plan_id: String,
Expand Down
18 changes: 14 additions & 4 deletions crates/convergio-tui/src/client_gh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,39 @@

use crate::client::PrSummary;
use anyhow::Result;
use std::process::Command;
use tokio::process::Command;

/// How many PRs we ask `gh` for. The dashboard previously requested
/// 100, which routinely cost ~1s and blocked the (then-synchronous)
/// refresh. 50 still covers the active queue with margin and keeps
/// `gh` under ~600ms in our measurements.
const PR_LIMIT: &str = "50";

/// Run `gh pr list` and parse the JSON. When `slug` is `Some`, the
/// query is scoped to that `owner/repo` (`gh pr list -R <slug>`) so
/// the dashboard works from any cwd. When `None`, gh inherits cwd —
/// original behaviour, kept for shells run inside a repo with no
/// workspace `Cargo.toml`.
pub fn fetch_prs(slug: Option<&str>) -> Result<Vec<PrSummary>> {
///
/// Async because the pre-1s shell-out used to block the dashboard's
/// event loop; with `tokio::process::Command` it cooperates with the
/// rest of the snapshot's `tokio::join!` fan-out.
pub async fn fetch_prs(slug: Option<&str>) -> Result<Vec<PrSummary>> {
let mut args: Vec<String> = vec![
"pr".into(),
"list".into(),
"--state".into(),
"all".into(),
"--limit".into(),
"100".into(),
PR_LIMIT.into(),
"--json".into(),
"number,title,headRefName,state,statusCheckRollup,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body".into(),
];
if let Some(s) = slug {
args.push("-R".into());
args.push(s.to_string());
}
let out = Command::new("gh").args(&args).output();
let out = Command::new("gh").args(&args).output().await;
let out = match out {
Ok(o) if o.status.success() => o,
_ => return Ok(Vec::new()),
Expand Down
62 changes: 48 additions & 14 deletions crates/convergio-tui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod plan_counts;
pub mod render;
pub mod scope;
pub mod state;
pub mod state_lifecycle;
pub mod theme;
pub mod tick;
pub mod types;
Expand All @@ -50,7 +51,7 @@ pub mod panes {
}

use anyhow::{Context, Result};
use crossterm::event::{self, DisableMouseCapture, EnableMouseCapture, Event};
use crossterm::event::{self, Event};
use crossterm::execute;
use crossterm::terminal::{
disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
Expand All @@ -59,8 +60,9 @@ use ratatui::backend::CrosstermBackend;
use ratatui::Terminal;
use std::io::{self, Stdout};
use std::time::Duration;
use tokio::sync::mpsc;

use crate::client::Client;
use crate::client::{Client, Snapshot};
use crate::keymap::{Action, KeyMap};
use crate::state::{AppMode, AppState};

Expand Down Expand Up @@ -88,19 +90,17 @@ pub async fn run(daemon_url: &str, tick_secs: u64, github_slug: Option<String>)
fn setup_terminal() -> Result<Terminal<CrosstermBackend<Stdout>>> {
enable_raw_mode().context("enable raw mode")?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture).context("enter alt screen")?;
// Mouse capture deliberately not enabled — no mouse handler exists,
// and capturing mouse events stole the terminal's native scroll
// while spamming the input poll with `Noop`-translated events.
execute!(stdout, EnterAlternateScreen).context("enter alt screen")?;
let backend = CrosstermBackend::new(stdout);
Terminal::new(backend).context("ratatui terminal")
}

fn restore_terminal(term: &mut Terminal<CrosstermBackend<Stdout>>) -> Result<()> {
disable_raw_mode().ok();
execute!(
term.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)
.ok();
execute!(term.backend_mut(), LeaveAlternateScreen).ok();
term.show_cursor().ok();
Ok(())
}
Expand All @@ -117,7 +117,17 @@ async fn event_loop(
..AppState::default()
};
let keymap = KeyMap;
state.refresh(&client).await;

// Snapshot results flow back through this channel. Capacity of 2
// keeps a stale-then-fresh pair in flight without ever blocking
// the producer; we are the single consumer.
let (snap_tx, mut snap_rx) = mpsc::channel::<Result<Snapshot>>(2);
let mut refresh_in_flight = false;

// Kick the first refresh off the event loop so the skeleton frame
// renders immediately. Previously `state.refresh().await` here
// blocked the first paint behind a ~1s `gh pr list`.
spawn_refresh(&client, &snap_tx, &mut refresh_in_flight);

let mut interval = tokio::time::interval(Duration::from_secs(tick_secs));
interval.tick().await; // first tick fires immediately; consume it
Expand All @@ -132,13 +142,19 @@ async fn event_loop(

tokio::select! {
_ = interval.tick() => {
state.refresh(&client).await;
spawn_refresh(&client, &snap_tx, &mut refresh_in_flight);
}
Some(snap) = snap_rx.recv() => {
refresh_in_flight = false;
state.apply_snapshot(snap);
}
poll = poll_key() => {
if let Some(action) = poll? {
match keymap.translate(action) {
Action::Quit => break,
Action::RefreshNow => state.refresh(&client).await,
Action::RefreshNow => {
spawn_refresh(&client, &snap_tx, &mut refresh_in_flight);
}
Action::PaneNext => state.focus_next(),
Action::PanePrev => state.focus_prev(),
Action::RowDown => state.row_down(),
Expand Down Expand Up @@ -172,12 +188,30 @@ async fn event_loop(
Ok(())
}

/// Spawn the snapshot fetch off the event loop. `in_flight` debounces
/// concurrent refreshes — the periodic tick and a manual `r` arriving
/// inside the same gh shell-out window must not stack.
fn spawn_refresh(client: &Client, tx: &mpsc::Sender<Result<Snapshot>>, in_flight: &mut bool) {
if *in_flight {
return;
}
*in_flight = true;
let client = client.clone();
let tx = tx.clone();
tokio::spawn(async move {
let snap = client.snapshot().await;
let _ = tx.send(snap).await;
});
}

/// Non-blocking key polling. Returns `None` when the available event
/// is not a key press (e.g. mouse, resize), so the caller's `select!`
/// can keep cycling without busy-waiting.
/// can keep cycling without busy-waiting. Poll window kept short
/// (50ms) so keystrokes feel snappy — at this granularity the cost
/// is one cheap `spawn_blocking` round-trip per cycle.
async fn poll_key() -> Result<Option<event::KeyEvent>> {
tokio::task::spawn_blocking(|| -> Result<Option<event::KeyEvent>> {
if event::poll(Duration::from_millis(200)).context("poll")? {
if event::poll(Duration::from_millis(50)).context("poll")? {
if let Event::Key(k) = event::read().context("read")? {
return Ok(Some(k));
}
Expand Down
31 changes: 1 addition & 30 deletions crates/convergio-tui/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
//! [`AppState::refresh`] which delegates to [`crate::client::Client`].

use crate::bus_stream::{BusStreamHandle, Transport as BusTransport};
use crate::client::{
AgentProcess, BusMessage, Client, Plan, PrSummary, RegistryAgent, TaskSummary,
};
use crate::client::{AgentProcess, BusMessage, Plan, PrSummary, RegistryAgent, TaskSummary};
pub use crate::mode::{AppMode, DetailTarget, Scope};

/// The panes rendered by the dashboard, in tab order.
Expand Down Expand Up @@ -168,33 +166,6 @@ pub fn version_drift(daemon: Option<&str>) -> Option<String> {
}

impl AppState {
/// Refresh every dataset. Failures roll up into
/// [`Connection::Disconnected`] and leave the previous data in
/// place — the dashboard never blanks itself on a transient
/// network error.
pub async fn refresh(&mut self, client: &Client) {
let snapshot = client.snapshot().await;
match snapshot {
Ok(s) => {
self.plans = s.plans;
self.tasks = s.tasks;
self.agents = s.agents;
self.agent_processes = s.agent_processes;
self.prs = s.prs;
self.messages = s.messages;
self.audit_ok = s.audit_ok;
self.daemon_version = s.daemon_version;
self.connection = Connection::Connected;
self.last_refresh = Some(chrono::Utc::now());
}
Err(_) => {
self.connection = Connection::Disconnected;
}
}
self.update_bus_subscription();
self.merge_live_bus();
}

/// Re-point the Bus pane SSE subscription at the currently-scoped
/// plan, falling back to the first plan if no scope is set. No-op
/// when the live handle was never installed.
Expand Down
46 changes: 46 additions & 0 deletions crates/convergio-tui/src/state_lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Snapshot lifecycle for [`crate::state::AppState`].
//!
//! Split out from `state.rs` so the main file stays under the
//! 300-line cap. Both entry points end in
//! [`AppState::apply_snapshot`]; the refresh wrapper exists only
//! for the legacy `Client`-driven test path.

use crate::client::{Client, Snapshot};
use crate::state::{AppState, Connection};

impl AppState {
/// Refresh every dataset. Failures roll up into
/// [`Connection::Disconnected`] and leave the previous data in
/// place — the dashboard never blanks itself on a transient
/// network error.
pub async fn refresh(&mut self, client: &Client) {
let snapshot = client.snapshot().await;
self.apply_snapshot(snapshot);
}

/// Apply a pre-fetched [`Snapshot`] without re-awaiting the
/// client. Used by the async refresh path in [`crate::run`] so
/// the event loop never blocks on the snapshot. Failed fetches
/// keep the previous data and flip the connection indicator.
pub fn apply_snapshot(&mut self, snapshot: anyhow::Result<Snapshot>) {
match snapshot {
Ok(s) => {
self.plans = s.plans;
self.tasks = s.tasks;
self.agents = s.agents;
self.agent_processes = s.agent_processes;
self.prs = s.prs;
self.messages = s.messages;
self.audit_ok = s.audit_ok;
self.daemon_version = s.daemon_version;
self.connection = Connection::Connected;
self.last_refresh = Some(chrono::Utc::now());
}
Err(_) => {
self.connection = Connection::Disconnected;
}
}
self.update_bus_subscription();
self.merge_live_bus_pub();
}
}
Loading