Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 61 additions & 64 deletions crates/convergio-tui/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! `/v1/plans/{id}/messages/tail`, `/v1/audit/verify`, plus `gh pr list` (skipped when
//! `CONVERGIO_DASH_NO_GH=1`).

use crate::client_gh::fetch_prs;
use crate::client_gh::{fetch_prs_closed, fetch_prs_open};
use crate::client_pr_cache::{cached_fetch, PrCacheCell};
use anyhow::Result;
use serde::Deserialize;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Duration;

pub use crate::plan_counts::PlanCounts;
pub use crate::types::{AgentProcess, BusMessage, Plan, PrSummary, RegistryAgent, TaskSummary};
Expand All @@ -35,23 +36,15 @@ pub struct Snapshot {
pub daemon_version: Option<String>,
}

/// PR data is cached for this long before the next `gh pr list`
/// shell-out. The dashboard tick is 5s by default; 30s means the
/// `gh` cost is amortised across ~6 refreshes without making the
/// PR pane feel stale (PR state turns over much slower than tasks).
const PR_CACHE_TTL: Duration = Duration::from_secs(30);

/// Time-stamped cache entry for the PR list.
type PrCacheCell = Arc<Mutex<Option<(Instant, Vec<PrSummary>)>>>;

/// Read-only HTTP client. Cloneable.
#[derive(Debug, Clone)]
pub struct Client {
base: String,
inner: reqwest::Client,
enable_gh: bool,
github_slug: Option<String>,
pr_cache: PrCacheCell,
open_prs: PrCacheCell,
closed_prs: PrCacheCell,
}

impl Client {
Expand All @@ -66,10 +59,17 @@ impl Client {
.unwrap_or_else(|_| reqwest::Client::new()),
enable_gh,
github_slug: None,
pr_cache: Arc::new(Mutex::new(None)),
open_prs: Arc::new(Mutex::new(None)),
closed_prs: Arc::new(Mutex::new(None)),
}
}

/// `true` when `gh pr list` shell-out is allowed. Disabled when
/// `CONVERGIO_DASH_NO_GH=1` is set.
pub fn gh_enabled(&self) -> bool {
self.enable_gh
}

/// Scope `gh pr list` to `owner/repo` instead of inheriting cwd.
/// `cvg dash` derives the slug from `origin` so the dashboard
/// works from any directory.
Expand All @@ -78,19 +78,21 @@ impl Client {
self
}

/// One-shot fetch of every dataset. Sub-fetches fail soft —
/// partial data is more useful than blanking the dashboard.
/// Fetch every dataset *except* the PRs. Returns within ~50ms
/// even on a cold start because no shell-out runs here. The
/// dashboard event loop emits this snapshot first to populate
/// Plans / Tasks / Agents while `gh pr list` is still in
/// flight — the user no longer waits 1-3s for the slowest
/// part of the refresh.
///
/// Per-plan fetches (`tasks` + `messages/tail`) run in parallel
/// via `futures::future::join_all`; without this fan-out the
/// loop is N+1 and dashes with 40+ plans block for hundreds of
/// ms per refresh on loopback. Global fetches (registry,
/// processes, audit, health) run concurrently with the
/// per-plan fan-out via `tokio::join!`. The PRs `gh pr list`
/// shell-out also runs concurrently (it is the dominant cost,
/// ~600ms on a warm cache) and is additionally memoised for
/// [`PR_CACHE_TTL`] so most refreshes pay zero gh cost.
pub async fn snapshot(&self) -> Result<Snapshot> {
/// Per-plan fetches (`tasks` + `messages/tail`) run in
/// parallel via `futures::future::join_all`; without this
/// fan-out the loop is N+1 and dashes with 40+ plans block
/// for hundreds of ms per refresh on loopback. Global
/// fetches (registry, processes, audit, health) run
/// concurrently with the per-plan fan-out via
/// `tokio::join!`.
pub async fn snapshot_core(&self) -> Result<Snapshot> {
let mut plans: Vec<Plan> = self
.get_json("/v1/plans")
.await
Expand All @@ -103,12 +105,10 @@ impl Client {
.iter()
.map(|id| self.fetch_plan_overview(id.clone())),
);

let global_fetches = self.fetch_globals();
let prs_future = self.fetch_prs_cached();

let (plan_results, (agents, agent_processes, audit_ok, daemon_version), prs) =
tokio::join!(plan_fetches, global_fetches, prs_future);
let (plan_results, (agents, agent_processes, audit_ok, daemon_version)) =
tokio::join!(plan_fetches, global_fetches);

let mut tasks: Vec<TaskSummary> = Vec::new();
let mut messages: Vec<BusMessage> = Vec::new();
Expand All @@ -127,51 +127,48 @@ impl Client {
tasks,
agents,
agent_processes,
prs,
prs: Vec::new(),
messages,
audit_ok,
daemon_version,
})
}

/// PR fetch with a [`PR_CACHE_TTL`] memo. Returns the cached
/// vector when fresh; otherwise shells out to `gh` (off the
/// blocking thread, see [`crate::client_gh::fetch_prs`]) and
/// updates the cache on success. A failed fetch keeps the
/// stale cache rather than blanking the PRs pane — partial
/// data beats no data.
async fn fetch_prs_cached(&self) -> Vec<PrSummary> {
if !self.enable_gh {
return Vec::new();
}
if let Some((stamped_at, cached)) = self.pr_cache_snapshot() {
if stamped_at.elapsed() < PR_CACHE_TTL {
return cached;
}
}
match fetch_prs(self.github_slug.as_deref()).await {
Ok(prs) => {
self.pr_cache_store(prs.clone());
prs
}
Err(_) => self.pr_cache_snapshot().map(|(_, v)| v).unwrap_or_default(),
}
/// Fetch open PRs with a 30s memo (see
/// [`crate::client_pr_cache::PR_CACHE_TTL`]). Fast (~0.5s
/// cold) — the PR pane's first paint after a cache miss.
pub async fn fetch_prs_open_cached(&self) -> Vec<PrSummary> {
cached_fetch(
self.enable_gh,
&self.open_prs,
fetch_prs_open(self.github_slug.as_deref()),
)
.await
}

fn pr_cache_snapshot(&self) -> Option<(Instant, Vec<PrSummary>)> {
let guard = match self.pr_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
guard.clone()
/// Fetch closed/merged PRs with the same memo. Slower (~1-3s
/// cold on busy repos) so it runs after the open list paints.
pub async fn fetch_prs_closed_cached(&self) -> Vec<PrSummary> {
cached_fetch(
self.enable_gh,
&self.closed_prs,
fetch_prs_closed(self.github_slug.as_deref()),
)
.await
}

fn pr_cache_store(&self, prs: Vec<PrSummary>) {
let mut guard = match self.pr_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
*guard = Some((Instant::now(), prs));
/// Backwards-compatible single-shot snapshot: core + open +
/// closed PRs in sequence. Kept for the
/// `Client`-driven test path and the `snapshot_bench`
/// example; the live dashboard uses `snapshot_core` plus the
/// progressive PR fetch in [`crate::run`].
pub async fn snapshot(&self) -> Result<Snapshot> {
let mut snap = self.snapshot_core().await?;
let (open, closed) =
tokio::join!(self.fetch_prs_open_cached(), self.fetch_prs_closed_cached(),);
snap.prs = open;
snap.prs.extend(closed);
Ok(snap)
}

async fn fetch_plan_overview(
Expand Down
63 changes: 44 additions & 19 deletions crates/convergio-tui/src/client_gh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,56 @@ use crate::client::PrSummary;
use anyhow::Result;
use tokio::process::Command;

/// How many PRs we ask `gh` for. The dashboard previously requested
/// 100, which routinely cost ~1s and blocked the (then-synchronous)
/// refresh. 50 still covers the active queue with margin and keeps
/// `gh` under ~600ms in our measurements.
const PR_LIMIT: &str = "50";

/// Run `gh pr list` and parse the JSON. When `slug` is `Some`, the
/// query is scoped to that `owner/repo` (`gh pr list -R <slug>`) so
/// the dashboard works from any cwd. When `None`, gh inherits cwd —
/// original behaviour, kept for shells run inside a repo with no
/// workspace `Cargo.toml`.
///
/// Async because the pre-1s shell-out used to block the dashboard's
/// event loop; with `tokio::process::Command` it cooperates with the
/// rest of the snapshot's `tokio::join!` fan-out.
pub async fn fetch_prs(slug: Option<&str>) -> Result<Vec<PrSummary>> {
/// Open PRs are the dashboard's hot set: small, ever-changing, and
/// where CI status matters in real time. We pay for
/// `statusCheckRollup` here because the request is naturally cheap
/// (~0.5s on a typical repo).
const OPEN_LIMIT: &str = "30";

/// Closed/merged PRs are the historical tail. We fetch them with a
/// minimal field set — `statusCheckRollup` triggers a per-PR API
/// round-trip on the GitHub side and pushed our shell-out from
/// 0.5s to 4s on a busy repo. CI on a closed PR is finalised
/// anyway, so we render `?` rather than blocking the dashboard.
const CLOSED_LIMIT: &str = "30";

const OPEN_FIELDS: &str = "number,title,headRefName,state,statusCheckRollup,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body";
const CLOSED_FIELDS: &str = "number,title,headRefName,state,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body";

/// Fetch open PRs only. Fast (~0.5s) — the dashboard's first PR
/// paint after a cache miss. See [`OPEN_FIELDS`] for what's
/// included.
pub async fn fetch_prs_open(slug: Option<&str>) -> Result<Vec<PrSummary>> {
fetch_prs_with_args(slug, "open", OPEN_LIMIT, OPEN_FIELDS).await
}

/// Fetch merged + closed PRs. Slower (~1-3s on busy repos) and
/// runs after the open list has already painted. Field set
/// excludes `statusCheckRollup` — see [`CLOSED_LIMIT`] for the
/// rationale.
pub async fn fetch_prs_closed(slug: Option<&str>) -> Result<Vec<PrSummary>> {
// `gh pr list --state` accepts only one of {open, closed,
// merged, all}. We want merged+closed but not open; the
// closest single value is `closed`, which on `gh` covers
// both.
fetch_prs_with_args(slug, "closed", CLOSED_LIMIT, CLOSED_FIELDS).await
}

async fn fetch_prs_with_args(
slug: Option<&str>,
state: &str,
limit: &str,
fields: &str,
) -> Result<Vec<PrSummary>> {
let mut args: Vec<String> = vec![
"pr".into(),
"list".into(),
"--state".into(),
"all".into(),
state.into(),
"--limit".into(),
PR_LIMIT.into(),
limit.into(),
"--json".into(),
"number,title,headRefName,state,statusCheckRollup,additions,deletions,changedFiles,createdAt,updatedAt,closedAt,mergedAt,body".into(),
fields.into(),
];
if let Some(s) = slug {
args.push("-R".into());
Expand Down
63 changes: 63 additions & 0 deletions crates/convergio-tui/src/client_pr_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! Time-bounded memo for `gh pr list` results.
//!
//! Split out from [`crate::client`] so that file stays under the
//! 300-line cap and the locking pattern is testable in isolation.

use crate::client::PrSummary;
use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

/// PR data is cached for this long before the next `gh pr list`
/// shell-out. The dashboard tick is 5s by default; 30s means the
/// `gh` cost is amortised across ~6 refreshes without making the
/// PR pane feel stale (PR state turns over much slower than tasks).
pub const PR_CACHE_TTL: Duration = Duration::from_secs(30);

/// Time-stamped cache entry for one slice of the PR list.
pub type PrCacheCell = Arc<Mutex<Option<(Instant, Vec<PrSummary>)>>>;

/// Read the current cache entry, if any.
pub fn read_slot(slot: &PrCacheCell) -> Option<(Instant, Vec<PrSummary>)> {
let guard = match slot.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
guard.clone()
}

/// Replace the cache entry with `prs` stamped at the current
/// instant.
pub fn write_slot(slot: &PrCacheCell, prs: Vec<PrSummary>) {
let mut guard = match slot.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
*guard = Some((Instant::now(), prs));
}

/// Read-through cache: return the cached vector when fresh,
/// otherwise await `fetch`, store its result on success, and fall
/// back to the previous (possibly stale) cache on failure. Returns
/// an empty vector when `enable_gh` is `false`.
pub async fn cached_fetch(
enable_gh: bool,
slot: &PrCacheCell,
fetch: impl std::future::Future<Output = Result<Vec<PrSummary>>>,
) -> Vec<PrSummary> {
if !enable_gh {
return Vec::new();
}
if let Some((stamped_at, cached)) = read_slot(slot) {
if stamped_at.elapsed() < PR_CACHE_TTL {
return cached;
}
}
match fetch.await {
Ok(prs) => {
write_slot(slot, prs.clone());
prs
}
Err(_) => read_slot(slot).map(|(_, v)| v).unwrap_or_default(),
}
}
Loading
Loading