diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a7b23e5..bd4f05ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,9 +4,12 @@ on: pull_request: paths: - ".github/workflows/*.yml" + - "Cargo.lock" + - "Cargo.toml" - "apps/cloud/**" - "apps/desktop/**" - "apps/mesh-front-door/**" + - "crates/**" - "packages/agent-sessions/**" - "packages/cli/**" - "packages/protocol/**" @@ -14,6 +17,8 @@ on: - "packages/session-trace/**" - "packages/session-trace-react/**" - "packages/web/**" + - "scripts/cargo.sh" + - "scripts/supervisor-smoke.sh" - "scripts/**" - "package.json" - "bun.lock" @@ -69,3 +74,18 @@ jobs: bun run --cwd packages/web test:happy bun test packages/session-trace/src/*.test.ts packages/session-trace-react/src/*.test.tsx fi + + rust-supervisor: + name: Rust supervisor + runs-on: macos-latest + timeout-minutes: 10 + + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@stable + + - name: Smoke + run: bash scripts/supervisor-smoke.sh diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..ce587f4d --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "openscout-supervisor" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..ecae4441 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] +members = [ + "crates/openscout-supervisor", +] +resolver = "2" diff --git a/apps/desktop/src/core/mcp/scout-channel.ts b/apps/desktop/src/core/mcp/scout-channel.ts index 78fd7b7a..62f2c86f 100644 --- a/apps/desktop/src/core/mcp/scout-channel.ts +++ b/apps/desktop/src/core/mcp/scout-channel.ts @@ -31,6 +31,8 @@ type InboxStreamEvent = { items?: InboxItem[]; }; +const scoutChannelStartedAt = Date.now(); + async function resolveAgentId( currentDirectory: string, env: NodeJS.ProcessEnv, @@ -231,7 +233,7 @@ function buildScoutChannelEndpoint(input: { metadata: { source: "scout-channel", processId: process.pid, - startedAt: now, + startedAt: scoutChannelStartedAt, lastSeenAt: now, }, }; diff --git a/apps/ios/ScoutNext/HomeSurface.swift b/apps/ios/ScoutNext/HomeSurface.swift index a1b12bae..9e90b601 100644 --- a/apps/ios/ScoutNext/HomeSurface.swift +++ b/apps/ios/ScoutNext/HomeSurface.swift @@ -15,6 +15,7 @@ import ScoutIOSCore /// then drill to its agents. The flat agent list still lives on the Agents tab. struct HomeSurface: View { let model: AppModel + @Environment(\.scoutNextLayout) private var layout /// Opens the connection detail for a tapped machine — switching / probing /// lives there, not on the rail itself. var onSelectMachine: (AppModel.PairedMachine) -> Void = { _ in } @@ -41,7 +42,7 @@ struct HomeSurface: View { var body: some View { ScrollView { - VStack(alignment: .leading, spacing: HudSpacing.xxl) { + VStack(alignment: .leading, spacing: layout.surfaceSectionSpacing) { if isLoading { HudEmptyState(title: "Loading fleet", subtitle: "Reading agents from the broker.", icon: "antenna.radiowaves.left.and.right") } else if isFleetEmpty { @@ -54,8 +55,9 @@ struct HomeSurface: View { if !recentActivity.isEmpty { activitySection } } } - .padding(.horizontal, HudSpacing.xxl) - .padding(.vertical, HudSpacing.xxl) + .padding(.horizontal, layout.surfacePadding) + .padding(.top, layout.surfaceTopPadding) + .padding(.bottom, layout.surfaceBottomPadding) } .refreshable { await load() } .task(id: reloadToken) { await load() } @@ -552,29 +554,35 @@ private struct ProjectRow: View { let group: ProjectGroup let isExpanded: Bool let onToggle: () -> Void + @Environment(\.scoutNextLayout) private var layout var body: some View { Button(action: onToggle) { - HStack(spacing: HudSpacing.md) { + HStack(alignment: layout.isMiniPhone ? .top : .center, spacing: HudSpacing.md) { Glyphic.chevron(isExpanded ? .bottom : .trailing, size: 13) .foregroundStyle(HudPalette.muted) .frame(width: 12) - Text(group.name) - .font(HudFont.ui(HudTextSize.md, weight: .semibold)) - .foregroundStyle(HudPalette.ink) - .lineLimit(1) - if group.liveCount > 0 { - HudStatusDot(color: HudPalette.accent, size: 6, pulses: true) - } - Spacer(minLength: HudSpacing.md) - Text("\(group.agents.count) agents") - .font(HudFont.mono(HudTextSize.xs)) - .foregroundStyle(HudPalette.muted) - if let age = relativeAgeString(group.lastActiveAt) { - Text(age) - .font(HudFont.mono(HudTextSize.xs)) - .foregroundStyle(HudPalette.muted) - .monospacedDigit() + .padding(.top, layout.isMiniPhone ? 2 : 0) + if layout.isMiniPhone { + VStack(alignment: .leading, spacing: HudSpacing.xxs) { + HStack(spacing: HudSpacing.sm) { + projectName + if group.liveCount > 0 { liveDot } + } + HStack(spacing: HudSpacing.sm) { + agentCount + if let age = lastActiveAge { ageText(age) } + } + } + .frame(maxWidth: .infinity, alignment: .leading) + } else { + projectName + if group.liveCount > 0 { liveDot } + Spacer(minLength: HudSpacing.md) + agentCount + if let age = lastActiveAge { + ageText(age) + } } } .padding(.horizontal, HudSpacing.xl) @@ -583,6 +591,35 @@ private struct ProjectRow: View { } .buttonStyle(.plain) } + + private var projectName: some View { + Text(group.name) + .font(HudFont.ui(HudTextSize.md, weight: .semibold)) + .foregroundStyle(HudPalette.ink) + .lineLimit(1) + .truncationMode(.tail) + } + + private var liveDot: some View { + HudStatusDot(color: HudPalette.accent, size: 6, pulses: true) + } + + private var agentCount: some View { + Text("\(group.agents.count) agents") + .font(HudFont.mono(HudTextSize.xs)) + .foregroundStyle(HudPalette.muted) + } + + private var lastActiveAge: String? { + relativeAgeString(group.lastActiveAt) + } + + private func ageText(_ age: String) -> some View { + Text(age) + .font(HudFont.mono(HudTextSize.xs)) + .foregroundStyle(HudPalette.muted) + .monospacedDigit() + } } // MARK: - AgentFleetRow @@ -600,6 +637,7 @@ private struct AgentFleetRow: View { /// its name aligns with the expandable projects around it. var leadingLeaf: Bool = false let onTap: (() -> Void)? + @Environment(\.scoutNextLayout) private var layout var body: some View { Button(action: { onTap?() }) { @@ -642,7 +680,7 @@ private struct AgentFleetRow: View { .foregroundStyle(HudPalette.ink) .lineLimit(1) .layoutPriority(1) - if let locator = locator, !leadingLeaf { + if let locator = locator, !leadingLeaf, !layout.isMiniPhone { Text(locator) .font(HudFont.mono(HudTextSize.xs)) // Subordinate to the name: the mono locator was reading at full diff --git a/apps/ios/ScoutNext/ResponsiveFrame.swift b/apps/ios/ScoutNext/ResponsiveFrame.swift index a1094a35..a7fee904 100644 --- a/apps/ios/ScoutNext/ResponsiveFrame.swift +++ b/apps/ios/ScoutNext/ResponsiveFrame.swift @@ -1,52 +1,108 @@ import SwiftUI +import HudsonUI -/// ScoutNext is authored against a single reference width — the standard iPhone -/// (393pt portrait). `DesignFrame` is the responsive envelope that honors that -/// contract so every surface can be tuned once, for the optimized (larger) -/// canvas, and still render correctly on the small one. +/// ScoutNext layout metrics derived from the real phone width. The app keeps +/// standard iPhone as the roomy baseline, but the 13 mini gets native text/hit +/// sizes with tighter chrome instead of a blanket downscale. +struct ScoutNextLayoutMetrics: Equatable { + let physicalWidth: CGFloat + let designWidth: CGFloat + let scale: CGFloat + + var isMiniPhone: Bool { physicalWidth > 0 && physicalWidth <= 380 } + var isNarrowPhone: Bool { physicalWidth > 0 && physicalWidth < 390 } + + var titleHorizontalPadding: CGFloat { isNarrowPhone ? HudSpacing.xl : HudSpacing.xxl } + var titleTopPadding: CGFloat { isNarrowPhone ? HudSpacing.md : HudSpacing.lg } + var titleBottomPadding: CGFloat { isNarrowPhone ? HudSpacing.md : HudSpacing.xl } + var wordmarkSize: CGFloat { isNarrowPhone ? HudTextSize.xl : HudTextSize.xxl } + var nextBadgeSize: CGFloat { isNarrowPhone ? HudTextSize.xxs : HudTextSize.xs } + var nextBadgeTracking: CGFloat { isNarrowPhone ? 1.5 : 2 } + + var surfacePadding: CGFloat { isNarrowPhone ? HudSpacing.xl : HudSpacing.xxl } + var surfaceTopPadding: CGFloat { isNarrowPhone ? HudSpacing.lg : HudSpacing.xxl } + var surfaceBottomPadding: CGFloat { isNarrowPhone ? HudSpacing.xl : HudSpacing.xxl } + var surfaceSectionSpacing: CGFloat { isNarrowPhone ? HudSpacing.xl : HudSpacing.xxl } + + var tabBarTopPadding: CGFloat { isNarrowPhone ? HudSpacing.xs : HudSpacing.sm } + var tabBarHorizontalPadding: CGFloat { isNarrowPhone ? HudSpacing.md : HudSpacing.lg } + var tabButtonHeight: CGFloat { isNarrowPhone ? 44 : 48 } + var tabGlyphSize: CGFloat { isNarrowPhone ? 19 : 21 } + var tabLabelSize: CGFloat { isNarrowPhone ? HudTextSize.micro : HudTextSize.xxs } + + var statusSideInset: CGFloat { isNarrowPhone ? HudSpacing.xxxl : 42 } + var statusCenterGap: CGFloat { isNarrowPhone ? HudSpacing.md : HudSpacing.lg } + var statusMachineMaxLabelWidth: CGFloat { isNarrowPhone ? 72 : 120 } +} + +private struct ScoutNextLayoutMetricsKey: EnvironmentKey { + static let defaultValue = ScoutNextLayoutMetrics(physicalWidth: 393, designWidth: 393, scale: 1) +} + +extension EnvironmentValues { + var scoutNextLayout: ScoutNextLayoutMetrics { + get { self[ScoutNextLayoutMetricsKey.self] } + set { self[ScoutNextLayoutMetricsKey.self] = newValue } + } +} + +/// ScoutNext is authored against the standard iPhone width (393pt portrait), but +/// compact phones are real layout targets rather than compatibility-scaled +/// previews. `DesignFrame` publishes responsive metrics so app chrome can tighten +/// on the 13 mini while keeping native text size and 44pt tap targets. /// /// - **Larger screens (≥ reference).** The optimized native target. No scaling: /// the layout fills the available width fluidly at 1.0×. Standard, Plus, Pro, /// and Pro Max all land here. -/// - **The 13 mini (375pt) — and any narrower device.** Graceful degradation: -/// the whole UI is laid out at the 393pt reference and uniformly scaled down -/// to fit (≈0.95×). Proportions stay pixel-identical — nothing is re-tuned -/// per device. Because the mini shares the standard aspect ratio almost -/// exactly (375×812 ≈ 2.165 vs 393×852 ≈ 2.168), the single width-ratio -/// scale fits both dimensions, so the bottom-docked chrome stays flush. +/// - **The 13 mini (375pt).** Native rendering with compact chrome metrics: +/// slightly tighter padding, shorter tab bar, and narrower status readouts. +/// - **Anything narrower than the mini.** Graceful degradation: lay out at the +/// mini width and uniformly shrink from there. /// -/// Implementation: lay the content out at the design width and a height that, -/// once scaled, exactly fills the available height (no letterbox); apply the -/// uniform `scaleEffect`; then claim the real available footprint so siblings -/// (the full-bleed canvas) cover the physical edges. +/// Implementation: lay the content out at either the real width or the mini +/// minimum, publish metrics, then apply shrink-only scaling for ultra-narrow +/// widths and claim the real footprint so the full-bleed canvas covers the edges. struct DesignFrame: View { /// Width every surface is designed against — standard iPhone portrait. - /// Devices at or above this render natively; narrower ones scale down. + /// Devices at or above this render with roomy metrics. var referenceWidth: CGFloat = 393 + /// Smallest native phone target. The 13 mini is 375pt wide. + var nativeMinimumWidth: CGFloat = 375 - @ViewBuilder var content: () -> Content + private let content: (ScoutNextLayoutMetrics) -> Content + + init(@ViewBuilder content: @escaping () -> Content) { + self.content = { _ in content() } + } + + init(@ViewBuilder content: @escaping (ScoutNextLayoutMetrics) -> Content) { + self.content = content + } var body: some View { GeometryReader { proxy in let avail = proxy.size let scale = scale(forWidth: avail.width) // At ≥ reference we lay out at the device's own width (fluid fill); - // below it we lay out at the fixed reference and shrink to fit. - let designWidth = scale < 1 ? referenceWidth : avail.width + // between mini and reference we lay out natively with compact metrics; + // below mini we lay out at the mini width and shrink to fit. + let designWidth = scale < 1 ? nativeMinimumWidth : avail.width let designHeight = scale > 0 ? avail.height / scale : avail.height + let metrics = ScoutNextLayoutMetrics(physicalWidth: avail.width, designWidth: designWidth, scale: scale) - content() + content(metrics) + .environment(\.scoutNextLayout, metrics) .frame(width: designWidth, height: designHeight, alignment: .top) .scaleEffect(scale, anchor: .top) .frame(width: avail.width, height: avail.height, alignment: .top) } } - /// Shrink-only: `1.0` for the optimized large canvas, the width ratio below - /// the reference. Floored so a hypothetical ultra-narrow device can't shrink - /// the UI into illegibility. + /// Shrink-only: `1.0` for mini and larger, the width ratio below the mini. + /// Floored so a hypothetical ultra-narrow device can't shrink the UI into + /// illegibility. private func scale(forWidth width: CGFloat) -> CGFloat { guard width > 0 else { return 1 } - return max(0.8, min(1, width / referenceWidth)) + return max(0.84, min(1, width / nativeMinimumWidth)) } } diff --git a/apps/ios/ScoutNext/RootView.swift b/apps/ios/ScoutNext/RootView.swift index 68938eee..f5c2e42b 100644 --- a/apps/ios/ScoutNext/RootView.swift +++ b/apps/ios/ScoutNext/RootView.swift @@ -59,13 +59,13 @@ struct RootView: View { var body: some View { HudPhoneAppShell { - // Author every surface at the standard-iPhone reference width; the - // frame fills fluidly on larger screens and gracefully scales the - // whole UI down to fit the 13 mini. See `DesignFrame`. - DesignFrame { + // Author every surface through ScoutNext's phone layout frame. The + // 13 mini gets native sizing with compact metrics; only narrower + // widths scale down. See `DesignFrame`. + DesignFrame { layout in ZStack(alignment: .bottom) { VStack(spacing: 0) { - titleBar + titleBar(layout) Group { switch surface { @@ -83,7 +83,7 @@ struct RootView: View { // the surfaces' scroll content above it, and the material masks // anything that scrolls behind it — the conventional iOS pattern. .safeAreaInset(edge: .bottom, spacing: 0) { - dockedTabBar + dockedTabBar(layout) } // Read-only connection readout pinned flush to the true screen @@ -92,7 +92,7 @@ struct RootView: View { // boundary: fill down + bottom-align the content, THEN ignore the // bottom safe area. Safe to sit on the swipe-up gesture — // hit-testing is off, it's a pure readout. - ScoutStatusBar(leading: appReadouts, trailing: statsReadouts) + ScoutStatusBar(leading: appReadouts(layout), trailing: statsReadouts(layout)) .frame(maxWidth: .infinity, maxHeight: .infinity, alignment: .bottom) .ignoresSafeArea(edges: .bottom) // The nav stack leaves a residual inset; push the last bit so @@ -138,13 +138,13 @@ struct RootView: View { /// indicator. App-local on purpose — it renders the unified hand-drawn glyph /// set, which the shared `HudLiquidBarTabRow` can't (it takes SF Symbol /// strings only). Selection chrome mirrors the shared component exactly. - private var dockedTabBar: some View { + private func dockedTabBar(_ layout: ScoutNextLayoutMetrics) -> some View { HStack(spacing: HudSpacing.sm) { - ForEach(Surface.allCases) { tabButton($0) } + ForEach(Surface.allCases) { tabButton($0, layout: layout) } } .frame(maxWidth: .infinity) - .padding(.top, HudSpacing.sm) - .padding(.horizontal, HudSpacing.lg) + .padding(.top, layout.tabBarTopPadding) + .padding(.horizontal, layout.tabBarHorizontalPadding) .background(alignment: .top) { Rectangle() // Light, glassy translucency — frosted blur that lets the @@ -163,7 +163,7 @@ struct RootView: View { /// Leading run of the bottom status bar: how and where we're connected — the /// route (LAN / TSN / OSN, with a wi-fi glyph) or current state, then the Mac /// it lands on. - private var appReadouts: [StatusReadout] { + private func appReadouts(_ layout: ScoutNextLayoutMetrics) -> [StatusReadout] { let stateLabel: String if case .connected(let route) = model.connectionState, !route.label.isEmpty { stateLabel = route.label.uppercased() @@ -175,22 +175,25 @@ struct RootView: View { // Cap only the machine readout: a long hostname truncates here instead // of shoving the fleet stats — and every surface — off the screen. The // route + stat readouts stay intrinsic, so none of them truncate. - items.append(StatusReadout(label: machine, tint: HudPalette.muted, maxLabelWidth: 120)) + items.append(StatusReadout(label: machine, tint: HudPalette.muted, maxLabelWidth: layout.statusMachineMaxLabelWidth)) } return items } /// Trailing run: the fleet rollup — total agents, paired machines, and how /// many are active right now (accent when something's running). - private var statsReadouts: [StatusReadout] { - [ + private func statsReadouts(_ layout: ScoutNextLayoutMetrics) -> [StatusReadout] { + var items = [ StatusReadout(label: pluralized(model.agentCount, "agent"), tint: HudPalette.muted), - StatusReadout(label: pluralized(model.pairedMachines.count, "machine"), tint: HudPalette.muted), StatusReadout( label: "\(model.activeAgentCount) active", tint: model.activeAgentCount > 0 ? HudPalette.accent : HudPalette.dim ), ] + if !layout.isMiniPhone { + items.insert(StatusReadout(label: pluralized(model.pairedMachines.count, "machine"), tint: HudPalette.muted), at: 1) + } + return items } private func pluralized(_ count: Int, _ noun: String) -> String { @@ -198,7 +201,7 @@ struct RootView: View { } @ViewBuilder - private func tabButton(_ s: Surface) -> some View { + private func tabButton(_ s: Surface, layout: ScoutNextLayoutMetrics) -> some View { let isSelected = surface == s Button { guard surface != s else { return } @@ -208,14 +211,14 @@ struct RootView: View { withAnimation(.spring(response: 0.34, dampingFraction: 0.82)) { surface = s } } label: { VStack(spacing: HudSpacing.xxs) { - Glyphic(kind: s.glyph, size: 21) + Glyphic(kind: s.glyph, size: layout.tabGlyphSize) Text(s.rawValue) - .font(HudFont.mono(HudTextSize.xxs, weight: .medium)) + .font(HudFont.mono(layout.tabLabelSize, weight: .medium)) .lineLimit(1) } .foregroundStyle(isSelected ? HudPalette.ink : HudPalette.muted) .frame(maxWidth: .infinity) - .frame(height: 48) + .frame(height: layout.tabButtonHeight) .background { if isSelected { Capsule() @@ -233,26 +236,26 @@ struct RootView: View { .accessibilityAddTraits(isSelected ? .isSelected : []) } - private var titleBar: some View { + private func titleBar(_ layout: ScoutNextLayoutMetrics) -> some View { // Center-aligned so the trailing complications (status pill + gear button) // sit on one axis. The wordmark keeps its own baseline alignment inside a // nested group so "Scout"/"Next" stay typographically locked. HStack(spacing: HudSpacing.md) { HStack(alignment: .firstTextBaseline, spacing: HudSpacing.sm) { Text("Scout") - .font(HudFont.ui(HudTextSize.xxl, weight: .semibold)) + .font(HudFont.ui(layout.wordmarkSize, weight: .semibold)) .foregroundStyle(HudPalette.ink) Text("Next") - .font(HudFont.mono(HudTextSize.xs, weight: .bold)) - .tracking(2) + .font(HudFont.mono(layout.nextBadgeSize, weight: .bold)) + .tracking(layout.nextBadgeTracking) .foregroundStyle(ScoutCanvas.accentGradient) } Spacer() settingsButton } - .padding(.horizontal, HudSpacing.xxl) - .padding(.top, HudSpacing.lg) - .padding(.bottom, HudSpacing.xl) + .padding(.horizontal, layout.titleHorizontalPadding) + .padding(.top, layout.titleTopPadding) + .padding(.bottom, layout.titleBottomPadding) } /// Settings as a contained icon complication — an inset circular button so it diff --git a/apps/ios/ScoutNext/StatusBar.swift b/apps/ios/ScoutNext/StatusBar.swift index 07b2666f..9273b2e7 100644 --- a/apps/ios/ScoutNext/StatusBar.swift +++ b/apps/ios/ScoutNext/StatusBar.swift @@ -57,18 +57,15 @@ struct StatusReadout: View, Identifiable { struct ScoutStatusBar: View { var leading: [StatusReadout] = [] var trailing: [StatusReadout] = [] - - /// Insets the runs to where the rounded bottom corners straighten out, so the - /// first/last readout sits on the flat edge rather than riding the curve. - private let sideInset: CGFloat = 42 + @Environment(\.scoutNextLayout) private var layout var body: some View { HStack(spacing: HudSpacing.md) { run(leading) - Spacer(minLength: HudSpacing.lg) + Spacer(minLength: layout.statusCenterGap) run(trailing) } - .padding(.horizontal, sideInset) + .padding(.horizontal, layout.statusSideInset) .padding(.vertical, HudSpacing.sm) .frame(maxWidth: .infinity, minHeight: HudLayout.statusBarHeight) // A slightly recessed strip with a top hairline so it reads as a status diff --git a/crates/openscout-supervisor/Cargo.toml b/crates/openscout-supervisor/Cargo.toml new file mode 100644 index 00000000..9119ea1d --- /dev/null +++ b/crates/openscout-supervisor/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "openscout-supervisor" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] diff --git a/crates/openscout-supervisor/README.md b/crates/openscout-supervisor/README.md new file mode 100644 index 00000000..6d153898 --- /dev/null +++ b/crates/openscout-supervisor/README.md @@ -0,0 +1,24 @@ +# openscout-supervisor + +Native first slice for SCO-062. + +This binary supervises the existing Bun-backed OpenScout base service. It is +intentionally stdlib-only for the first pass so the resulting package stays +small and easy to reason about. + +```bash +cargo run --manifest-path crates/openscout-supervisor/Cargo.toml -- status --json +cargo run --manifest-path crates/openscout-supervisor/Cargo.toml -- doctor --json +cargo run --manifest-path crates/openscout-supervisor/Cargo.toml -- start --json +cargo run --manifest-path crates/openscout-supervisor/Cargo.toml -- stop --json +cargo run --manifest-path crates/openscout-supervisor/Cargo.toml -- supervise +``` + +The repo currently keeps the TypeScript service manager as the production path. +The supervisor is a parallel slice until it has proven start/stop/restart +behavior locally. + +`supervise` is the long-running daemon mode intended for launchd. It starts the +existing Bun base service as a child, restarts it with bounded backoff, handles +shutdown by terminating the child, and writes `supervisor-state.json` under the +runtime directory for `status` and `doctor`. diff --git a/crates/openscout-supervisor/src/main.rs b/crates/openscout-supervisor/src/main.rs new file mode 100644 index 00000000..66946aae --- /dev/null +++ b/crates/openscout-supervisor/src/main.rs @@ -0,0 +1,1352 @@ +#[cfg(not(unix))] +compile_error!("openscout-supervisor first slice requires a Unix-like platform."); + +use std::collections::HashSet; +use std::env; +use std::fs; +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::os::unix::net::UnixStream; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, ExitCode, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +const DEFAULT_BROKER_HOST: &str = "127.0.0.1"; +const DEFAULT_BROKER_HOST_MESH: &str = "0.0.0.0"; +const DEFAULT_BROKER_PORT: u16 = 65_535; +const RESTART_MIN_DELAY: Duration = Duration::from_secs(1); +const RESTART_MAX_DELAY: Duration = Duration::from_secs(30); +const START_TIMEOUT: Duration = Duration::from_secs(15); +const STOP_TIMEOUT: Duration = Duration::from_secs(20); +const CHILD_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(12); +const POLL_INTERVAL: Duration = Duration::from_millis(100); +const STATE_WRITE_INTERVAL: Duration = Duration::from_secs(2); +const SIGINT: i32 = 2; +const SIGTERM: i32 = 15; +static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); +const OPTIONAL_LAUNCH_ENV_KEYS: &[&str] = &[ + "OPENSCOUT_MESH_ID", + "OPENSCOUT_MESH_SEEDS", + "OPENSCOUT_MESH_DISCOVERY_INTERVAL_MS", + "OPENSCOUT_NODE_NAME", + "OPENSCOUT_NODE_ID", + "OPENSCOUT_NODE_QUALIFIER", + "OPENSCOUT_TAILSCALE_BIN", + "OPENSCOUT_TAILSCALE_STATUS_JSON", + "OPENSCOUT_SSE_KEEPALIVE_MS", +]; + +unsafe extern "C" { + fn signal(signum: i32, handler: extern "C" fn(i32)) -> usize; +} + +extern "C" fn request_shutdown(_: i32) { + SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst); +} + +fn main() -> ExitCode { + match run() { + Ok(()) => ExitCode::SUCCESS, + Err(error) => { + eprintln!("{error}"); + ExitCode::from(1) + } + } +} + +fn run() -> Result<(), String> { + let args: Vec = env::args().skip(1).collect(); + if args + .iter() + .any(|arg| arg == "-h" || arg == "--help" || arg == "help") + { + print_help(); + return Ok(()); + } + + let json = args.iter().any(|arg| arg == "--json"); + let command = args + .iter() + .find(|arg| !arg.starts_with("--")) + .map(String::as_str) + .unwrap_or("status"); + let config = Config::resolve()?; + + match command { + "status" => { + let status = broker_service_status(&config); + print_status(&status, json); + Ok(()) + } + "doctor" => { + let report = doctor_report(&config); + print_doctor(&report, json); + Ok(()) + } + "start" => { + let status = start_service(&config)?; + print_status(&status, json); + Ok(()) + } + "stop" => { + let status = stop_service(&config)?; + print_status(&status, json); + Ok(()) + } + "restart" => { + stop_service(&config)?; + let status = start_service(&config)?; + print_status(&status, json); + Ok(()) + } + "supervise" | "daemon" => supervise_service(&config), + other => Err(format!("unknown command: {other}")), + } +} + +#[derive(Clone, Debug)] +struct Config { + label: String, + service_mode: String, + domain_target: String, + service_target: String, + launch_agent_path: PathBuf, + support_directory: PathBuf, + runtime_directory: PathBuf, + logs_directory: PathBuf, + stdout_log_path: PathBuf, + stderr_log_path: PathBuf, + control_home: PathBuf, + runtime_package_dir: PathBuf, + supervisor_executable: PathBuf, + supervisor_state_path: PathBuf, + bun_executable: String, + broker_host: String, + broker_port: u16, + broker_url: String, + broker_socket_path: PathBuf, + advertise_scope: String, +} + +impl Config { + fn resolve() -> Result { + let home = home_dir()?; + let uid = user_id()?; + let service_mode = match env_nonempty("OPENSCOUT_BROKER_SERVICE_MODE") + .unwrap_or_else(|| "dev".to_string()) + .to_lowercase() + .as_str() + { + "prod" | "production" => "prod".to_string(), + "custom" => "custom".to_string(), + _ => "dev".to_string(), + }; + let label = env_nonempty("OPENSCOUT_SERVICE_LABEL") + .or_else(|| env_nonempty("OPENSCOUT_BROKER_SERVICE_LABEL")) + .unwrap_or_else(|| match service_mode.as_str() { + "prod" => "com.openscout".to_string(), + "custom" => "com.openscout.custom".to_string(), + _ => "dev.openscout".to_string(), + }); + let default_support_directory = home.join("Library/Application Support/OpenScout"); + let support_directory = non_tmp_path_or_default( + env_nonempty("OPENSCOUT_SUPPORT_DIRECTORY") + .or_else(|| env_nonempty("OPENSCOUT_SUPPORT_DIR")) + .map(PathBuf::from), + default_support_directory, + ); + let runtime_directory = support_directory.join("runtime"); + let logs_directory = support_directory.join("logs/broker"); + let control_home = non_tmp_path_or_default( + env_nonempty("OPENSCOUT_CONTROL_HOME").map(PathBuf::from), + home.join(".openscout/control-plane"), + ); + let runtime_package_dir = match env_nonempty("OPENSCOUT_RUNTIME_PACKAGE_DIR") { + Some(value) => PathBuf::from(value), + None => { + find_workspace_runtime_dir(&env::current_dir().map_err(|error| error.to_string())?) + .ok_or_else(|| { + "unable to resolve runtime package dir; set OPENSCOUT_RUNTIME_PACKAGE_DIR" + .to_string() + })? + } + }; + let supervisor_executable = match env_nonempty("OPENSCOUT_SUPERVISOR_BIN") { + Some(value) => PathBuf::from(value), + None => env::current_exe().map_err(|error| error.to_string())?, + }; + let bun_executable = env_nonempty("OPENSCOUT_BUN_BIN").unwrap_or_else(|| { + let home_bun = home.join(".bun/bin/bun"); + if home_bun.exists() { + home_bun.to_string_lossy().to_string() + } else { + "bun".to_string() + } + }); + let advertise_scope = match env_nonempty("OPENSCOUT_ADVERTISE_SCOPE").as_deref() { + Some("mesh") => "mesh".to_string(), + _ => "local".to_string(), + }; + let default_broker_host = if advertise_scope == "mesh" { + DEFAULT_BROKER_HOST_MESH + } else { + DEFAULT_BROKER_HOST + }; + let broker_host = env_nonempty("OPENSCOUT_BROKER_HOST") + .unwrap_or_else(|| default_broker_host.to_string()); + let broker_port = env_nonempty("OPENSCOUT_BROKER_PORT") + .and_then(|value| value.parse::().ok()) + .unwrap_or(DEFAULT_BROKER_PORT); + let broker_url = env_nonempty("OPENSCOUT_BROKER_URL") + .unwrap_or_else(|| format!("http://{broker_host}:{broker_port}")); + let broker_socket_path = PathBuf::from( + env_nonempty("OPENSCOUT_BROKER_SOCKET_PATH").unwrap_or_else(|| { + runtime_directory + .join("broker.sock") + .to_string_lossy() + .to_string() + }), + ); + let supervisor_state_path = runtime_directory.join("supervisor-state.json"); + + Ok(Self { + label: label.clone(), + service_mode, + domain_target: format!("gui/{uid}"), + service_target: format!("gui/{uid}/{label}"), + launch_agent_path: home.join(format!("Library/LaunchAgents/{label}.plist")), + support_directory, + runtime_directory, + logs_directory: logs_directory.clone(), + stdout_log_path: logs_directory.join("stdout.log"), + stderr_log_path: logs_directory.join("stderr.log"), + control_home, + runtime_package_dir, + supervisor_executable, + supervisor_state_path, + bun_executable, + broker_host, + broker_port, + broker_url, + broker_socket_path, + advertise_scope, + }) + } + + fn runtime_entrypoint(&self) -> PathBuf { + self.runtime_package_dir.join("bin/openscout-runtime.mjs") + } +} + +#[derive(Clone, Debug)] +struct LaunchctlStatus { + loaded: bool, + pid: Option, + launchd_state: Option, + last_exit_status: Option, +} + +#[derive(Clone, Debug)] +struct HealthStatus { + reachable: bool, + ok: bool, + transport: Option, + status_code: Option, + body: Option, + error: Option, +} + +#[derive(Clone, Debug)] +struct ServiceStatus { + config: Config, + launchctl: LaunchctlStatus, + health: HealthStatus, + supervisor_state: Option, +} + +#[derive(Clone, Debug)] +struct ProcessInfo { + pid: u32, + ppid: u32, + pcpu: String, + pmem: String, + elapsed: String, + command: String, +} + +#[derive(Clone, Debug)] +struct DoctorReport { + status: ServiceStatus, + processes: Vec, + warnings: Vec, +} + +fn start_service(config: &Config) -> Result { + ensure_launch_agent(config)?; + let _ = run_command("/bin/launchctl", &["bootout", &config.service_target]); + let _ = wait_for_stopped(config); + run_command_checked( + "/bin/launchctl", + &[ + "bootstrap", + &config.domain_target, + path_str(&config.launch_agent_path)?, + ], + )?; + let _ = run_command( + "/bin/launchctl", + &["kickstart", "-k", &config.service_target], + ); + wait_for_healthy(config) +} + +fn stop_service(config: &Config) -> Result { + let _ = run_command("/bin/launchctl", &["bootout", &config.service_target]); + wait_for_stopped(config) +} + +fn supervise_service(config: &Config) -> Result<(), String> { + install_signal_handlers(); + ensure_supervisor_directories(config)?; + eprintln!( + "[openscout-supervisor] starting Bun base from {}", + config.runtime_entrypoint().display(), + ); + + let started_at_ms = epoch_ms(); + let mut restart_count = 0_u32; + let mut restart_delay = RESTART_MIN_DELAY; + let mut child = spawn_base_process(config)?; + write_supervisor_state( + config, + started_at_ms, + Some(child.id()), + "running", + restart_count, + )?; + let mut next_state_write = Instant::now() + STATE_WRITE_INTERVAL; + + while !SHUTDOWN_REQUESTED.load(Ordering::SeqCst) { + match child.try_wait().map_err(|error| error.to_string())? { + Some(status) => { + write_supervisor_state(config, started_at_ms, None, "exited", restart_count)?; + eprintln!("[openscout-supervisor] Bun base exited: {status}"); + restart_count = restart_count.saturating_add(1); + sleep_until_or_shutdown(Instant::now() + restart_delay); + if SHUTDOWN_REQUESTED.load(Ordering::SeqCst) { + break; + } + restart_delay = doubled_delay(restart_delay); + child = spawn_base_process(config)?; + write_supervisor_state( + config, + started_at_ms, + Some(child.id()), + "running", + restart_count, + )?; + next_state_write = Instant::now() + STATE_WRITE_INTERVAL; + } + None => { + if Instant::now() >= next_state_write { + write_supervisor_state( + config, + started_at_ms, + Some(child.id()), + "running", + restart_count, + )?; + next_state_write = Instant::now() + STATE_WRITE_INTERVAL; + } + thread::sleep(POLL_INTERVAL); + } + } + } + + write_supervisor_state( + config, + started_at_ms, + Some(child.id()), + "stopping", + restart_count, + )?; + terminate_child(&mut child, "Bun base", CHILD_SHUTDOWN_TIMEOUT)?; + write_supervisor_state(config, started_at_ms, None, "stopped", restart_count)?; + Ok(()) +} + +fn install_signal_handlers() { + unsafe { + let _ = signal(SIGINT, request_shutdown); + let _ = signal(SIGTERM, request_shutdown); + } +} + +fn spawn_base_process(config: &Config) -> Result { + let mut command = Command::new(&config.bun_executable); + command + .arg(config.runtime_entrypoint()) + .arg("base") + .current_dir(&config.runtime_package_dir) + .env("OPENSCOUT_PARENT_PID", std::process::id().to_string()) + .env( + "OPENSCOUT_SUPPORT_DIRECTORY", + config.support_directory.to_string_lossy().to_string(), + ) + .env( + "OPENSCOUT_RUNTIME_PACKAGE_DIR", + config.runtime_package_dir.to_string_lossy().to_string(), + ) + .env("OPENSCOUT_BROKER_HOST", &config.broker_host) + .env("OPENSCOUT_BROKER_PORT", config.broker_port.to_string()) + .env("OPENSCOUT_BROKER_URL", &config.broker_url) + .env( + "OPENSCOUT_BROKER_SOCKET_PATH", + config.broker_socket_path.to_string_lossy().to_string(), + ) + .env( + "OPENSCOUT_CONTROL_HOME", + config.control_home.to_string_lossy().to_string(), + ) + .env("OPENSCOUT_BROKER_SERVICE_MODE", &config.service_mode) + .env("OPENSCOUT_BROKER_SERVICE_LABEL", &config.label) + .env("OPENSCOUT_SERVICE_LABEL", &config.label) + .env("OPENSCOUT_ADVERTISE_SCOPE", &config.advertise_scope) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + for &key in OPTIONAL_LAUNCH_ENV_KEYS { + if let Some(value) = env_nonempty(key) { + command.env(key, value); + } + } + if let Some(core_agents) = env_nonempty("OPENSCOUT_CORE_AGENTS") { + command.env("OPENSCOUT_CORE_AGENTS", core_agents); + } + + command + .spawn() + .map_err(|error| format!("failed to start Bun base: {error}")) +} + +fn sleep_until_or_shutdown(deadline: Instant) { + while Instant::now() < deadline && !SHUTDOWN_REQUESTED.load(Ordering::SeqCst) { + thread::sleep(POLL_INTERVAL); + } +} + +fn doubled_delay(delay: Duration) -> Duration { + let doubled = delay.as_millis().saturating_mul(2); + Duration::from_millis(doubled.min(RESTART_MAX_DELAY.as_millis()) as u64) +} + +fn terminate_child(child: &mut Child, label: &str, timeout: Duration) -> Result<(), String> { + if child + .try_wait() + .map_err(|error| error.to_string())? + .is_some() + { + return Ok(()); + } + + let _ = send_process_signal(child.id(), "TERM"); + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + if child + .try_wait() + .map_err(|error| error.to_string())? + .is_some() + { + return Ok(()); + } + thread::sleep(POLL_INTERVAL); + } + + eprintln!("[openscout-supervisor] {label} did not exit after SIGTERM; forcing shutdown"); + child.kill().map_err(|error| error.to_string())?; + let _ = child.wait(); + Ok(()) +} + +fn send_process_signal(pid: u32, signal_name: &str) -> Result<(), String> { + let status = Command::new("/bin/kill") + .arg(format!("-{signal_name}")) + .arg(pid.to_string()) + .status() + .map_err(|error| error.to_string())?; + if status.success() { + Ok(()) + } else { + Err(format!("kill -{signal_name} {pid} exited with {status}")) + } +} + +fn broker_service_status(config: &Config) -> ServiceStatus { + ServiceStatus { + config: config.clone(), + launchctl: inspect_launchctl(config), + health: fetch_health(config), + supervisor_state: read_supervisor_state_json(config), + } +} + +fn wait_for_healthy(config: &Config) -> Result { + let deadline = Instant::now() + START_TIMEOUT; + let mut last = broker_service_status(config); + while Instant::now() < deadline { + last = broker_service_status(config); + if last.health.reachable && last.health.ok { + return Ok(last); + } + thread::sleep(POLL_INTERVAL); + } + Err(format!( + "broker did not become healthy: {}", + last.health + .error + .clone() + .unwrap_or_else(|| "health check failed".to_string()), + )) +} + +fn wait_for_stopped(config: &Config) -> Result { + let deadline = Instant::now() + STOP_TIMEOUT; + let mut last = broker_service_status(config); + while Instant::now() < deadline { + last = broker_service_status(config); + if !last.launchctl.loaded && !last.health.reachable { + return Ok(last); + } + thread::sleep(POLL_INTERVAL); + } + Err(format!( + "service did not stop within {:?}: launchd loaded={}, broker reachable={}", + STOP_TIMEOUT, last.launchctl.loaded, last.health.reachable, + )) +} + +fn inspect_launchctl(config: &Config) -> LaunchctlStatus { + let output = match run_command("/bin/launchctl", &["print", &config.service_target]) { + Ok(output) => output, + Err(_) => { + return LaunchctlStatus { + loaded: false, + pid: None, + launchd_state: None, + last_exit_status: None, + }; + } + }; + + if output.status != 0 { + return LaunchctlStatus { + loaded: false, + pid: None, + launchd_state: None, + last_exit_status: None, + }; + } + + LaunchctlStatus { + loaded: true, + pid: parse_launchctl_u32(&output.stdout, "pid ="), + launchd_state: parse_launchctl_string(&output.stdout, "state ="), + last_exit_status: parse_launchctl_i32(&output.stdout, "last exit code =") + .or_else(|| parse_launchctl_i32(&output.stdout, "last exit status =")), + } +} + +fn fetch_health(config: &Config) -> HealthStatus { + match fetch_unix_health(&config.broker_socket_path) { + Ok(mut health) => { + health.transport = Some("unix_socket".to_string()); + return health; + } + Err(socket_error) => match fetch_tcp_health(config) { + Ok(mut health) => { + health.transport = Some("http".to_string()); + health + } + Err(http_error) => HealthStatus { + reachable: false, + ok: false, + transport: None, + status_code: None, + body: None, + error: Some(format!("{socket_error}; http fallback: {http_error}")), + }, + }, + } +} + +fn fetch_unix_health(socket_path: &Path) -> Result { + let mut stream = UnixStream::connect(socket_path).map_err(|error| error.to_string())?; + stream + .set_read_timeout(Some(Duration::from_secs(1))) + .map_err(|error| error.to_string())?; + stream + .set_write_timeout(Some(Duration::from_secs(1))) + .map_err(|error| error.to_string())?; + fetch_http_health(&mut stream, "localhost") +} + +fn fetch_tcp_health(config: &Config) -> Result { + let mut stream = TcpStream::connect((&config.broker_host[..], config.broker_port)) + .map_err(|error| error.to_string())?; + stream + .set_read_timeout(Some(Duration::from_secs(1))) + .map_err(|error| error.to_string())?; + stream + .set_write_timeout(Some(Duration::from_secs(1))) + .map_err(|error| error.to_string())?; + fetch_http_health(&mut stream, &config.broker_host) +} + +fn fetch_http_health(stream: &mut T, host: &str) -> Result { + let request = format!( + "GET /health HTTP/1.1\r\nHost: {host}\r\nAccept: application/json\r\nConnection: close\r\n\r\n" + ); + stream + .write_all(request.as_bytes()) + .map_err(|error| error.to_string())?; + let mut response = String::new(); + stream + .read_to_string(&mut response) + .map_err(|error| error.to_string())?; + parse_health_response(&response) +} + +fn parse_health_response(response: &str) -> Result { + let status_line = response.lines().next().unwrap_or_default(); + let status_code = status_line + .split_whitespace() + .nth(1) + .and_then(|value| value.parse::().ok()); + let body = response + .split_once("\r\n\r\n") + .map(|(_, body)| body.to_string()) + .unwrap_or_default(); + let ok = status_code == Some(200) && health_body_reports_ok(&body); + Ok(HealthStatus { + reachable: status_code.is_some(), + ok, + transport: None, + status_code, + body: if body.is_empty() { None } else { Some(body) }, + error: if status_code.is_some() { + None + } else { + Some("missing HTTP status".to_string()) + }, + }) +} + +fn health_body_reports_ok(body: &str) -> bool { + let Some((_, after_key)) = body.split_once("\"ok\"") else { + return false; + }; + let Some((_, after_colon)) = after_key.split_once(':') else { + return false; + }; + after_colon.trim_start().starts_with("true") +} + +fn doctor_report(config: &Config) -> DoctorReport { + let status = broker_service_status(config); + let processes = process_snapshot(); + let mut warnings = Vec::new(); + + if !config.runtime_entrypoint().exists() { + warnings.push(format!( + "runtime entrypoint is missing: {}", + config.runtime_entrypoint().display() + )); + } + if !command_available(&config.bun_executable) { + warnings.push(format!( + "bun executable is not available: {}", + config.bun_executable + )); + } + if !status.health.reachable { + warnings.push("broker health is unreachable".to_string()); + } + if status.config.broker_socket_path.exists() && !status.health.reachable { + warnings.push(format!( + "broker socket exists but health is unreachable: {}", + status.config.broker_socket_path.display(), + )); + } + if status.launchctl.loaded && status.supervisor_state.is_none() { + warnings.push(format!( + "launchd service is loaded but supervisor state is missing: {}", + status.config.supervisor_state_path.display(), + )); + } + + let supervisor_processes: Vec<&ProcessInfo> = processes + .iter() + .filter(|process| command_invokes_supervisor_daemon(&process.command)) + .collect(); + if supervisor_processes.len() > 1 { + warnings.push(format!( + "multiple openscout-supervisor processes found: {}", + supervisor_processes.len() + )); + } + for process in supervisor_processes { + if process.ppid == 1 && status.launchctl.pid != Some(process.pid) { + warnings.push(format!( + "orphaned openscout-supervisor process: pid {}", + process.pid + )); + } + } + let broker_processes: Vec<&ProcessInfo> = processes + .iter() + .filter(|process| command_references_process(&process.command, "scout-broker")) + .collect(); + if broker_processes.len() > 1 { + warnings.push(format!( + "multiple scout-broker processes found: {}", + broker_processes.len() + )); + } + for process in broker_processes { + if process.ppid == 1 { + warnings.push(format!( + "orphaned scout-broker process: pid {}", + process.pid + )); + } + } + for process in processes + .iter() + .filter(|process| command_references_process(&process.command, "scout-web")) + { + if process.ppid == 1 { + warnings.push(format!("orphaned scout-web process: pid {}", process.pid)); + } + } + + DoctorReport { + status, + processes, + warnings, + } +} + +fn process_snapshot() -> Vec { + let output = match run_command("ps", &["-axo", "pid=,ppid=,pcpu=,pmem=,etime=,command="]) { + Ok(output) if output.status == 0 => output.stdout, + _ => return Vec::new(), + }; + + output + .lines() + .filter_map(parse_process_line) + .filter(|process| { + process.command.contains("openscout-runtime") + || command_references_process(&process.command, "openscout-supervisor") + || command_references_process(&process.command, "scout-base") + || command_references_process(&process.command, "scout-broker") + || command_references_process(&process.command, "scout-web") + || command_references_process(&process.command, "OpenScoutMenu") + }) + .collect() +} + +fn command_references_process(command: &str, process_name: &str) -> bool { + command + .split_whitespace() + .any(|part| part == process_name || part.rsplit('/').next() == Some(process_name)) +} + +fn command_invokes_supervisor_daemon(command: &str) -> bool { + let mut parts = command.split_whitespace(); + while let Some(part) = parts.next() { + if part == "openscout-supervisor" || part.rsplit('/').next() == Some("openscout-supervisor") + { + return matches!(parts.next(), Some("supervise" | "daemon")); + } + } + false +} + +fn parse_process_line(line: &str) -> Option { + let mut parts = line.split_whitespace(); + let pid = parts.next()?.parse::().ok()?; + let ppid = parts.next()?.parse::().ok()?; + let pcpu = parts.next()?.to_string(); + let pmem = parts.next()?.to_string(); + let elapsed = parts.next()?.to_string(); + let command = parts.collect::>().join(" "); + Some(ProcessInfo { + pid, + ppid, + pcpu, + pmem, + elapsed, + command, + }) +} + +fn ensure_launch_agent(config: &Config) -> Result<(), String> { + ensure_supervisor_directories(config)?; + let plist = render_launch_agent_plist(config); + if fs::read_to_string(&config.launch_agent_path) + .ok() + .as_deref() + != Some(plist.as_str()) + { + fs::write(&config.launch_agent_path, plist).map_err(|error| error.to_string())?; + } + Ok(()) +} + +fn ensure_supervisor_directories(config: &Config) -> Result<(), String> { + fs::create_dir_all(&config.support_directory).map_err(|error| error.to_string())?; + fs::create_dir_all(&config.runtime_directory).map_err(|error| error.to_string())?; + fs::create_dir_all(&config.logs_directory).map_err(|error| error.to_string())?; + fs::create_dir_all(&config.control_home).map_err(|error| error.to_string())?; + if let Some(parent) = config.launch_agent_path.parent() { + fs::create_dir_all(parent).map_err(|error| error.to_string())?; + } + Ok(()) +} + +fn render_launch_agent_plist(config: &Config) -> String { + let mut env_entries = vec![ + ("OPENSCOUT_BROKER_HOST", config.broker_host.clone()), + ("OPENSCOUT_BROKER_PORT", config.broker_port.to_string()), + ("OPENSCOUT_BROKER_URL", config.broker_url.clone()), + ( + "OPENSCOUT_BROKER_SOCKET_PATH", + config.broker_socket_path.to_string_lossy().to_string(), + ), + ( + "OPENSCOUT_SUPPORT_DIRECTORY", + config.support_directory.to_string_lossy().to_string(), + ), + ( + "OPENSCOUT_RUNTIME_PACKAGE_DIR", + config.runtime_package_dir.to_string_lossy().to_string(), + ), + ( + "OPENSCOUT_CONTROL_HOME", + config.control_home.to_string_lossy().to_string(), + ), + ("OPENSCOUT_BROKER_SERVICE_MODE", config.service_mode.clone()), + ("OPENSCOUT_BROKER_SERVICE_LABEL", config.label.clone()), + ("OPENSCOUT_SERVICE_LABEL", config.label.clone()), + ("OPENSCOUT_ADVERTISE_SCOPE", config.advertise_scope.clone()), + ( + "HOME", + home_dir() + .map(|path| path.to_string_lossy().to_string()) + .unwrap_or_default(), + ), + ("PATH", launch_agent_path_env()), + ]; + for &key in OPTIONAL_LAUNCH_ENV_KEYS { + if let Some(value) = env_nonempty(key) { + env_entries.push((key, value)); + } + } + if let Some(core_agents) = env_nonempty("OPENSCOUT_CORE_AGENTS") { + env_entries.push(("OPENSCOUT_CORE_AGENTS", core_agents)); + } + let env_block = env_entries + .into_iter() + .map(|(key, value)| { + format!( + "\n {}\n {}", + xml_escape(key), + xml_escape(&value), + ) + }) + .collect::(); + + format!( + r#" + + + + Label + {label} + ProgramArguments + + {supervisor} + supervise + + WorkingDirectory + {cwd} + RunAtLoad + + KeepAlive + + SuccessfulExit + + + StandardOutPath + {stdout} + StandardErrorPath + {stderr} + EnvironmentVariables + {env_block} + + + +"#, + label = xml_escape(&config.label), + supervisor = xml_escape(&config.supervisor_executable.to_string_lossy()), + cwd = xml_escape(&config.runtime_package_dir.to_string_lossy()), + stdout = xml_escape(&config.stdout_log_path.to_string_lossy()), + stderr = xml_escape(&config.stderr_log_path.to_string_lossy()), + ) +} + +fn read_supervisor_state_json(config: &Config) -> Option { + let raw = fs::read_to_string(&config.supervisor_state_path).ok()?; + let trimmed = raw.trim(); + if trimmed.starts_with('{') && trimmed.ends_with('}') { + Some(trimmed.to_string()) + } else { + None + } +} + +fn write_supervisor_state( + config: &Config, + started_at_ms: u128, + base_pid: Option, + base_state: &str, + restart_count: u32, +) -> Result<(), String> { + fs::create_dir_all(&config.runtime_directory).map_err(|error| error.to_string())?; + let payload = format!( + "{{\ +\"schemaVersion\":1,\ +\"supervisorPid\":{},\ +\"startedAtMs\":{},\ +\"basePid\":{},\ +\"baseState\":{},\ +\"restartCount\":{},\ +\"updatedAtMs\":{}\ +}}\n", + std::process::id(), + started_at_ms, + json_opt_u32(base_pid), + json_string(base_state), + restart_count, + epoch_ms(), + ); + let temporary_path = config.supervisor_state_path.with_extension("json.tmp"); + fs::write(&temporary_path, payload).map_err(|error| error.to_string())?; + fs::rename(&temporary_path, &config.supervisor_state_path).map_err(|error| error.to_string()) +} + +fn epoch_ms() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis()) + .unwrap_or(0) +} + +#[derive(Debug)] +struct CommandOutput { + status: i32, + stdout: String, + stderr: String, +} + +fn run_command(command: &str, args: &[&str]) -> Result { + let output = Command::new(command) + .args(args) + .output() + .map_err(|error| format!("{command}: {error}"))?; + Ok(CommandOutput { + status: output.status.code().unwrap_or(1), + stdout: String::from_utf8_lossy(&output.stdout).trim().to_string(), + stderr: String::from_utf8_lossy(&output.stderr).trim().to_string(), + }) +} + +fn run_command_checked(command: &str, args: &[&str]) -> Result { + let output = run_command(command, args)?; + if output.status == 0 { + Ok(output) + } else { + Err(first_nonempty(&output.stderr, &output.stdout)) + } +} + +fn parse_launchctl_u32(raw: &str, prefix: &str) -> Option { + parse_launchctl_string(raw, prefix).and_then(|value| value.parse::().ok()) +} + +fn parse_launchctl_i32(raw: &str, prefix: &str) -> Option { + parse_launchctl_string(raw, prefix).and_then(|value| value.parse::().ok()) +} + +fn parse_launchctl_string(raw: &str, prefix: &str) -> Option { + raw.lines().map(str::trim).find_map(|line| { + line.strip_prefix(prefix) + .map(|value| value.trim().to_string()) + }) +} + +fn command_available(command: &str) -> bool { + if command.contains('/') { + Path::new(command).exists() + } else { + run_command("which", &[command]) + .map(|output| output.status == 0) + .unwrap_or(false) + } +} + +fn env_nonempty(name: &str) -> Option { + env::var(name) + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn is_tmp_path(path: &Path) -> bool { + let value = path.to_string_lossy(); + value == "/tmp" + || value == "/private/tmp" + || value.starts_with("/tmp/") + || value.starts_with("/private/tmp/") +} + +fn non_tmp_path_or_default(value: Option, fallback: PathBuf) -> PathBuf { + match value { + Some(path) if !is_tmp_path(&path) => path, + _ => fallback, + } +} + +fn launch_agent_path_env() -> String { + let mut entries = Vec::new(); + if let Ok(home) = home_dir() { + entries.push(home.join(".bun/bin").to_string_lossy().to_string()); + } + entries.extend( + env::var("PATH") + .unwrap_or_default() + .split(':') + .map(str::to_string), + ); + entries.extend([ + "/opt/homebrew/bin".to_string(), + "/usr/local/bin".to_string(), + "/usr/bin".to_string(), + "/bin".to_string(), + "/usr/sbin".to_string(), + "/sbin".to_string(), + ]); + + let mut seen = HashSet::new(); + entries + .into_iter() + .filter(|entry| !entry.is_empty() && !is_tmp_path(Path::new(entry))) + .filter(|entry| seen.insert(entry.clone())) + .collect::>() + .join(":") +} + +fn home_dir() -> Result { + env_nonempty("HOME") + .map(PathBuf::from) + .ok_or_else(|| "HOME is not set".to_string()) +} + +fn user_id() -> Result { + if let Some(uid) = env_nonempty("UID").and_then(|value| value.parse::().ok()) { + return Ok(uid); + } + let output = run_command_checked("id", &["-u"])?; + output + .stdout + .parse::() + .map_err(|error| error.to_string()) +} + +fn find_workspace_runtime_dir(start: &Path) -> Option { + let mut current = start.to_path_buf(); + loop { + let candidate = current.join("packages/runtime"); + if candidate.join("package.json").exists() + && candidate.join("bin/openscout-runtime.mjs").exists() + { + return Some(candidate); + } + if !current.pop() { + return None; + } + } +} + +fn path_str(path: &Path) -> Result<&str, String> { + path.to_str() + .ok_or_else(|| format!("path is not valid UTF-8: {}", path.display())) +} + +fn first_nonempty(first: &str, second: &str) -> String { + if !first.trim().is_empty() { + first.trim().to_string() + } else { + second.trim().to_string() + } +} + +fn print_help() { + println!( + "openscout-supervisor [--json]\n\n\ + First native supervisor slice for the OpenScout local control plane." + ); +} + +fn print_status(status: &ServiceStatus, json: bool) { + if json { + println!("{}", status_json(status)); + } else { + println!("label: {}", status.config.label); + println!("loaded: {}", yes_no(status.launchctl.loaded)); + println!( + "pid: {}", + status + .launchctl + .pid + .map(|pid| pid.to_string()) + .unwrap_or_else(|| "-".to_string()) + ); + println!( + "launchd state: {}", + status.launchctl.launchd_state.as_deref().unwrap_or("-") + ); + println!( + "supervisor state: {}", + if status.supervisor_state.is_some() { + "recorded" + } else { + "missing" + } + ); + println!("broker url: {}", status.config.broker_url); + println!( + "broker socket: {}", + status.config.broker_socket_path.display() + ); + println!("reachable: {}", yes_no(status.health.reachable)); + println!( + "health: {}", + if status.health.ok { "ok" } else { "unhealthy" } + ); + } +} + +fn print_doctor(report: &DoctorReport, json: bool) { + if json { + println!("{}", doctor_json(report)); + } else { + print_status(&report.status, false); + if report.warnings.is_empty() { + println!("warnings: none"); + } else { + println!("warnings:"); + for warning in &report.warnings { + println!("- {warning}"); + } + } + println!("processes: {}", report.processes.len()); + } +} + +fn status_json(status: &ServiceStatus) -> String { + format!( + "{{\ +\"label\":{},\ +\"launchAgentPath\":{},\ +\"supervisorExecutable\":{},\ +\"supervisorStatePath\":{},\ +\"supervisorState\":{},\ +\"brokerUrl\":{},\ +\"brokerSocketPath\":{},\ +\"loaded\":{},\ +\"pid\":{},\ +\"launchdState\":{},\ +\"lastExitStatus\":{},\ +\"reachable\":{},\ +\"health\":{},\ +\"healthTransport\":{},\ +\"healthStatusCode\":{},\ +\"healthBody\":{},\ +\"healthError\":{}\ +}}", + json_string(&status.config.label), + json_string(&status.config.launch_agent_path.to_string_lossy()), + json_string(&status.config.supervisor_executable.to_string_lossy()), + json_string(&status.config.supervisor_state_path.to_string_lossy()), + status.supervisor_state.as_deref().unwrap_or("null"), + json_string(&status.config.broker_url), + json_string(&status.config.broker_socket_path.to_string_lossy()), + status.launchctl.loaded, + json_opt_u32(status.launchctl.pid), + json_opt_str(status.launchctl.launchd_state.as_deref()), + json_opt_i32(status.launchctl.last_exit_status), + status.health.reachable, + status.health.ok, + json_opt_str(status.health.transport.as_deref()), + json_opt_u16(status.health.status_code), + json_opt_str(status.health.body.as_deref()), + json_opt_str(status.health.error.as_deref()), + ) +} + +fn doctor_json(report: &DoctorReport) -> String { + let warnings = report + .warnings + .iter() + .map(|warning| json_string(warning)) + .collect::>() + .join(","); + let processes = report + .processes + .iter() + .map(process_json) + .collect::>() + .join(","); + format!( + "{{\"status\":{},\"warnings\":[{}],\"processes\":[{}]}}", + status_json(&report.status), + warnings, + processes, + ) +} + +fn process_json(process: &ProcessInfo) -> String { + format!( + "{{\"pid\":{},\"ppid\":{},\"pcpu\":{},\"pmem\":{},\"elapsed\":{},\"command\":{}}}", + process.pid, + process.ppid, + json_string(&process.pcpu), + json_string(&process.pmem), + json_string(&process.elapsed), + json_string(&process.command), + ) +} + +fn yes_no(value: bool) -> &'static str { + if value { + "yes" + } else { + "no" + } +} + +fn json_opt_str(value: Option<&str>) -> String { + value.map(json_string).unwrap_or_else(|| "null".to_string()) +} + +fn json_opt_u16(value: Option) -> String { + value + .map(|number| number.to_string()) + .unwrap_or_else(|| "null".to_string()) +} + +fn json_opt_u32(value: Option) -> String { + value + .map(|number| number.to_string()) + .unwrap_or_else(|| "null".to_string()) +} + +fn json_opt_i32(value: Option) -> String { + value + .map(|number| number.to_string()) + .unwrap_or_else(|| "null".to_string()) +} + +fn json_string(value: &str) -> String { + format!("\"{}\"", json_escape(value)) +} + +fn json_escape(value: &str) -> String { + let mut escaped = String::new(); + for character in value.chars() { + match character { + '"' => escaped.push_str("\\\""), + '\\' => escaped.push_str("\\\\"), + '\n' => escaped.push_str("\\n"), + '\r' => escaped.push_str("\\r"), + '\t' => escaped.push_str("\\t"), + character if character.is_control() => { + escaped.push_str(&format!("\\u{:04x}", character as u32)) + } + character => escaped.push(character), + } + } + escaped +} + +fn xml_escape(value: &str) -> String { + value + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace('\'', "'") +} + +#[cfg(test)] +mod tests { + use super::{command_invokes_supervisor_daemon, health_body_reports_ok, parse_health_response}; + + #[test] + fn health_body_reports_ok_accepts_compact_and_pretty_json() { + assert!(health_body_reports_ok(r#"{"ok":true}"#)); + assert!(health_body_reports_ok( + r#"{ + "ok": true, + "status": "ready" +}"# + )); + } + + #[test] + fn health_body_reports_ok_rejects_missing_or_false_values() { + assert!(!health_body_reports_ok(r#"{"ok":false}"#)); + assert!(!health_body_reports_ok(r#"{"status":"ready"}"#)); + } + + #[test] + fn parse_health_response_marks_pretty_ok_body_healthy() { + let response = + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{\n \"ok\": true\n}"; + let health = parse_health_response(response).expect("health response parses"); + + assert!(health.reachable); + assert!(health.ok); + assert_eq!(health.status_code, Some(200)); + } + + #[test] + fn command_invokes_supervisor_daemon_only_matches_daemon_commands() { + assert!(command_invokes_supervisor_daemon( + "/Users/arach/dev/openscout/target/debug/openscout-supervisor supervise" + )); + assert!(command_invokes_supervisor_daemon( + "target/debug/openscout-supervisor daemon" + )); + assert!(!command_invokes_supervisor_daemon( + "target/debug/openscout-supervisor doctor --json" + )); + assert!(!command_invokes_supervisor_daemon( + "target/debug/openscout-supervisor status --json" + )); + } +} diff --git a/docs/eng/sco-062-native-supervisor.md b/docs/eng/sco-062-native-supervisor.md new file mode 100644 index 00000000..16817215 --- /dev/null +++ b/docs/eng/sco-062-native-supervisor.md @@ -0,0 +1,207 @@ +# SCO-062: Native Supervisor + +## 1. Status + +- **Status:** Draft +- **Owner:** OpenScout +- **Scope:** Local service/process supervision for the OpenScout control plane +- **Intent:** Add a small Rust supervisor that makes OpenScout's local Bun services easier to start, stop, inspect, and repair without moving product logic out of TypeScript. + +## 2. Summary + +OpenScout should keep Bun/TypeScript for the broker, web UI, CLI UX, MCP glue, +and agent protocol iteration. Those layers change often and benefit from the +same development loop as the rest of the product. + +The process supervision layer has different needs. It owns launchd service +installation, process-tree cleanup, stale socket diagnosis, health checks, and +machine-readable repair hints. Recent runtime investigation showed that small +supervision mistakes can leave orphaned Bun broker processes consuming CPU even +when the product code is otherwise healthy. + +SCO-062 introduces `openscout-supervisor`, a Rust CLI binary that becomes the +native service kernel for the local control plane. The first implementation is +intentionally narrow: supervise the existing Bun base/broker/web stack rather +than replacing it. + +## 3. Decision + +Build a Rust binary named `openscout-supervisor`. + +The first supported command surface: + +```bash +openscout-supervisor status --json +openscout-supervisor start --json +openscout-supervisor stop --json +openscout-supervisor restart --json +openscout-supervisor doctor --json +openscout-supervisor supervise +``` + +The existing JavaScript `scout` CLI remains the operator entrypoint. It may +shell out to `openscout-supervisor` when the binary is available, and fall back +to the current Bun service manager when it is not. + +`supervise` is the long-running process. The other commands are one-shot +operator commands that inspect launchd/broker state or ask launchd to start and +stop the daemon. + +## 4. Why Rust + +This decision is not primarily about speed. Rust is a good fit because the +supervisor is a small correctness-heavy local kernel: + +- one native binary +- explicit errors +- predictable signal handling +- direct process and filesystem APIs +- no JS runtime dependency for supervising JS runtime processes +- efficient npm packaging through prebuilt platform binaries + +## 5. Boundary + +The supervisor may know: + +- pids, parent pids, command lines, and process trees +- launchd labels, plist paths, and service state +- broker host, port, Unix socket path, health endpoint, and log paths +- support, runtime, and control-plane directories +- stale process and stale socket repair actions + +The supervisor must not know: + +- message routing semantics +- agent identity grammar beyond command-line diagnostics +- mesh forwarding rules +- invocation, flight, or delivery business logic +- protocol record ownership rules + +The broker remains the canonical writer for Scout-owned coordination records. + +## 6. First Slice + +Add a stdlib-only Rust crate under `crates/openscout-supervisor`. + +The first slice should: + +1. Resolve the same default service config used by the TypeScript manager. +2. Render and install a launchd plist when one is missing. +3. Read launchd state with `launchctl print`. +4. Probe broker health through the Unix socket first, then TCP HTTP. +5. Start `openscout-supervisor supervise` through launchd. +6. Have `supervise` start the existing `openscout-runtime.mjs base` process as + a child. +7. Restart the child with bounded backoff if it exits unexpectedly. +8. Stop the service through launchd and wait until both launchd and broker health report down. +9. Restart as stop-then-start. +10. Emit stable JSON for `status` and `doctor`. +11. Write a small supervisor state file with daemon pid, child pid, restart + count, and last update time. +12. Detect obvious orphan candidates such as `openscout-supervisor`, + `scout-broker`, and `scout-web` with parent pid `1`. + +This first crate should avoid external Rust dependencies. Once the shape feels +right, we can add `clap`, `serde`, and `serde_json` for maintainability. + +## 7. Dependency Implications + +### Runtime Users + +The npm install should not require Rust. + +The public `@openscout/scout` package should eventually ship a prebuilt +supervisor binary or depend on an optional platform package such as +`@openscout/supervisor-darwin-arm64`. + +Users should not need: + +- Cargo +- Rustup +- node-gyp +- Python +- a C/C++ build chain +- postinstall native compilation + +### Developers + +Repo developers need Rust only when editing or building the supervisor. + +The normal Bun/TypeScript workflows should continue to work when the supervisor +is absent. + +### Release + +Start with macOS arm64. Before public packaging, decide whether binaries live: + +- inside `@openscout/scout` +- inside optional platform packages +- as release artifacts resolved by the CLI + +Prefer bundled or optional packages over postinstall downloads. + +## 8. Command Contract + +### `status --json` + +Reports: + +- service label and launch agent path +- loaded state, pid, launchd state, last exit status +- broker URL and socket path +- health reachability, transport, and raw health body + +### `start --json` + +Ensures the launch agent exists, bootstraps it, kickstarts it, and waits for +broker health. + +### `stop --json` + +Boots out the launchd service and waits until the service is unloaded and broker +health is unreachable. + +### `restart --json` + +Runs the same stop/start sequence. Restart should not be a special launchd +shortcut until stop and start are boring. + +### `doctor --json` + +Includes `status` plus local process observations and warnings: + +- missing supervisor state while launchd is loaded +- multiple supervisors +- multiple brokers +- orphaned brokers +- stale web processes +- missing runtime entrypoint +- missing Bun executable +- broker socket present while health is unreachable + +## 9. Acceptance Criteria + +- `openscout-supervisor status --json` works when the broker is up or down. +- `openscout-supervisor start --json` can bring up the existing Bun service. +- launchd starts `openscout-supervisor supervise`, not Bun directly. +- `openscout-supervisor stop --json` leaves no `scout-broker` or supervised `scout-web` descendants behind. +- `openscout-supervisor restart --json` succeeds from a healthy service. +- `openscout-supervisor status --json` includes the last written supervisor state when the daemon is running. +- `openscout-supervisor doctor --json` reports orphaned broker processes without mutating state. +- Existing `scout` CLI and Bun service manager continue to work if the supervisor binary is absent. + +## 10. Non-Goals + +- Rewriting the broker in Rust. +- Moving web, CLI, MCP, or agent integration code out of TypeScript. +- Solving cross-platform service management beyond macOS in the first slice. +- Adding native Node addons. +- Requiring Rust to install the npm package. + +## 11. Follow-Ups + +- Wire the TypeScript service manager to prefer the supervisor when present. +- Add CI for Rust build/test/lint. +- Add prebuilt binary packaging. +- Decide whether doctor repair actions should be explicit subcommands or interactive prompts. +- Expand Linux support after macOS launchd is stable. diff --git a/docs/proposals/native-supervisor.md b/docs/proposals/native-supervisor.md new file mode 100644 index 00000000..bee6c60d --- /dev/null +++ b/docs/proposals/native-supervisor.md @@ -0,0 +1,138 @@ +# Native Supervisor Proposal + +## Decision + +Add a small native supervisor for OpenScout's local control plane, implemented in Rust, while keeping the broker, web UI, CLI UX, MCP glue, and agent protocol logic in Bun/TypeScript. + +This is not a rewrite of the broker. The first milestone is a dependable process and service kernel that can start, stop, restart, inspect, and repair the existing Bun services. + +## Why Rust Here + +The supervisor owns behavior where correctness matters more than iteration speed: + +- launchd/service lifecycle +- process-tree ownership +- signal forwarding and forced cleanup +- stale socket and port detection +- orphan detection +- health checks and restart backoff +- log path and runtime-directory hygiene +- machine-readable doctor output + +Rust gives this layer a small single-binary footprint, explicit error handling, predictable process control, and fewer runtime assumptions than supervising Bun from inside Bun. + +## Non-Goals + +- Do not move the web UI out of TypeScript. +- Do not move the broker's HTTP API, routing semantics, or protocol iteration out of TypeScript in this milestone. +- Do not replace SQLite projection, mesh forwarding, or harness adapters yet. +- Do not require native Node addons for the npm package. + +## First Milestone + +Create `openscout-supervisor` with these commands: + +```bash +openscout-supervisor status --json +openscout-supervisor start --json +openscout-supervisor stop --json +openscout-supervisor restart --json +openscout-supervisor doctor --json +openscout-supervisor supervise +``` + +The first repository slice should: + +- install the launchd plist for the base service when it is missing +- start `openscout-supervisor supervise` through launchd +- have `supervise` start the existing `openscout-runtime.mjs base` process as + a child +- restart the child with bounded backoff when it exits unexpectedly +- write a small supervisor state file for status and doctor commands +- inspect the base process, broker wrapper, broker process, supervised web process, and menu app +- detect and report orphaned `openscout-supervisor`, `scout-broker`, and + `scout-web` children +- verify the broker over the Unix socket first, then HTTP as fallback +- fail `stop` if launchd or broker health do not report stopped within a bounded timeout +- produce JSON status that the existing `scout` CLI can render + +The existing TypeScript service manager can become a thin compatibility wrapper +that shells out to `openscout-supervisor` when it is present. The long-running +process is the supervisor daemon; `status`, `doctor`, `start`, `stop`, and +`restart` remain short-lived operator commands. + +Later supervisor ownership can expand into explicit process-tree signal +forwarding, forced cleanup, edge proxy inspection, mDNS helper inspection, and +doctor repair actions after this launchd-only path feels boring. + +The numbered engineering proposal for this work is tracked in +[`docs/eng/sco-062-native-supervisor.md`](../eng/sco-062-native-supervisor.md). +The first repository slice lives in `crates/openscout-supervisor`. + +## Dependency Implications + +### Runtime Users + +For npm users, the goal is no new required system dependency. + +The public `@openscout/scout` package should ship or resolve a prebuilt supervisor binary. The CLI calls it as a child process. Users should not need Rust, Cargo, node-gyp, Python, or a native build chain during install. + +Preferred packaging shape: + +- `@openscout/scout` keeps the `scout` JavaScript CLI. +- Platform-specific optional packages may provide native binaries later, for example `@openscout/supervisor-darwin-arm64`. +- The CLI/runtime resolves the bundled supervisor, then falls back to current Bun service management when unavailable. + +This keeps the npm surface CLI-style, not native-addon-style. + +### Developers + +Repo developers will need Rust only when editing the supervisor: + +- Rust stable toolchain +- Cargo +- `Cargo.lock` checked in once the crate is first built +- CI job for supervisor build/test + +The main Bun/TypeScript workflows should keep working without touching Rust unless the supervisor code changed. + +### Rust Crates + +Keep the first crate small. The first slice intentionally uses no external Rust +crates. Reasonable follow-up dependencies, once the command shape settles: + +- `clap` for command parsing +- `serde` and `serde_json` for stable JSON output +- `thiserror` or `anyhow` for error reporting +- `tokio` only if async process and timeout handling clearly pays for itself + +Avoid early dependencies on service-manager frameworks, embedded HTTP servers, or cross-platform daemon abstractions until the macOS launchd path is boring. + +### Release And CI + +The release pipeline will need to build and attach native binaries per target. Initial target can be macOS arm64 because that is the active pilot environment. + +Before public packaging, decide between: + +- bundled binaries inside `@openscout/scout` +- optional platform packages +- postinstall download from a release artifact + +Prefer bundled or optional packages over postinstall compilation. + +### License And Security + +Adding Rust crates introduces a second dependency license set. CI should eventually include a Rust license/audit check. The supervisor should not add network access in the first milestone, except probing configured local broker URLs. + +## Boundary Contract + +The supervisor should treat Bun processes as opaque services. It can know command lines, pids, sockets, ports, log paths, and health endpoints. It should not know message-routing semantics, agent identities, mesh forwarding rules, or protocol record details. + +The broker remains the canonical writer for Scout-owned coordination records. + +## Open Questions + +- Should `scout service restart` become stop-then-start through the supervisor, or should launchd own restart entirely? +- Should the supervisor own web/menu/edge directly, or only own the base process and inspect descendants? +- How soon should stale-agent repair move into the supervisor versus remain a broker doctor operation? +- What is the minimum useful Linux story after macOS launchd is stable? diff --git a/package.json b/package.json index a71c5204..d69c4f9b 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,13 @@ "runtime:broker": "npm --prefix packages/runtime run broker", "runtime:discover": "npm --prefix packages/runtime run discover", "runtime:service": "npm --prefix packages/runtime run service", + "supervisor:fmt": "bash scripts/cargo.sh fmt --all --check", + "supervisor:check": "bash scripts/cargo.sh check --manifest-path crates/openscout-supervisor/Cargo.toml", + "supervisor:test": "bash scripts/cargo.sh test --manifest-path crates/openscout-supervisor/Cargo.toml", + "supervisor:build": "bash scripts/cargo.sh build --manifest-path crates/openscout-supervisor/Cargo.toml", + "supervisor:run": "bash scripts/cargo.sh run --manifest-path crates/openscout-supervisor/Cargo.toml --", + "supervisor:daemon": "bash scripts/cargo.sh run --manifest-path crates/openscout-supervisor/Cargo.toml -- supervise", + "supervisor:smoke": "bash scripts/supervisor-smoke.sh", "mesh:iroh:smoke": "bun scripts/mesh-iroh-smoke.mjs", "mesh-front-door:check": "bun run --cwd apps/mesh-front-door check", "mesh-front-door:dev": "bun run --cwd apps/mesh-front-door dev", diff --git a/packages/cli/bin/openscout-runtime.mjs b/packages/cli/bin/openscout-runtime.mjs index c2286cc0..ca98f910 100644 --- a/packages/cli/bin/openscout-runtime.mjs +++ b/packages/cli/bin/openscout-runtime.mjs @@ -44,10 +44,31 @@ const child = spawn(process.execPath, [entrypoint, ...args], { stdio: "inherit", }); +let forwardingSignal = false; +let childExited = false; +for (const signal of ["SIGINT", "SIGTERM"]) { + process.once(signal, () => { + forwardingSignal = true; + if (!child.killed) { + child.kill(signal); + } + setTimeout(() => { + if (!childExited) { + child.kill("SIGKILL"); + } + }, 10_000).unref(); + }); +} + +child.on("error", (error) => { + console.error(error.message); + process.exit(1); +}); child.on("exit", (code, signal) => { - if (signal) { + childExited = true; + if (signal && !forwardingSignal) { process.kill(process.pid, signal); } else { - process.exit(code ?? 0); + process.exit(code ?? (signal ? 0 : 1)); } }); diff --git a/packages/runtime/bin/openscout-runtime.mjs b/packages/runtime/bin/openscout-runtime.mjs index 7984c28b..478861ae 100755 --- a/packages/runtime/bin/openscout-runtime.mjs +++ b/packages/runtime/bin/openscout-runtime.mjs @@ -1,7 +1,7 @@ #!/usr/bin/env bun import { existsSync } from "node:fs"; -import { spawnSync } from "node:child_process"; +import { spawn, spawnSync } from "node:child_process"; import { dirname, resolve } from "node:path"; import { fileURLToPath } from "node:url"; @@ -39,26 +39,18 @@ if (!(command in sourceMain)) { const processName = processNames[command] ?? "scout-runtime"; process.title = processName; -function runEntrypoint(entry, entryArgs) { - const captureOutput = command === "service"; - const result = spawnSync(process.execPath, [entry, ...entryArgs], captureOutput - ? { - encoding: "utf8", - argv0: processName, - stdio: ["inherit", "pipe", "pipe"], - } - : { - argv0: processName, - stdio: "inherit", - }); - - if (captureOutput) { - if (result.stdout) { - process.stdout.write(result.stdout); - } - if (result.stderr) { - process.stderr.write(result.stderr); - } +function runServiceEntrypoint(entry, entryArgs) { + const result = spawnSync(process.execPath, [entry, ...entryArgs], { + encoding: "utf8", + argv0: processName, + stdio: ["inherit", "pipe", "pipe"], + }); + + if (result.stdout) { + process.stdout.write(result.stdout); + } + if (result.stderr) { + process.stderr.write(result.stderr); } if (result.error) { @@ -74,6 +66,50 @@ function runEntrypoint(entry, entryArgs) { process.exit(result.status ?? 0); } +function runLongLivedEntrypoint(entry, entryArgs) { + const child = spawn(process.execPath, [entry, ...entryArgs], { + argv0: processName, + stdio: "inherit", + }); + + let forwardingSignal = false; + let childExited = false; + for (const signal of ["SIGINT", "SIGTERM"]) { + process.once(signal, () => { + forwardingSignal = true; + if (!child.killed) { + child.kill(signal); + } + setTimeout(() => { + if (!childExited) { + child.kill("SIGKILL"); + } + }, 10_000).unref(); + }); + } + + child.on("error", (error) => { + console.error(error.message); + process.exit(1); + }); + child.on("exit", (code, signal) => { + childExited = true; + if (signal && !forwardingSignal) { + process.kill(process.pid, signal); + return; + } + process.exit(code ?? (signal ? 0 : 1)); + }); +} + +function runEntrypoint(entry, entryArgs) { + if (command === "service") { + runServiceEntrypoint(entry, entryArgs); + return; + } + runLongLivedEntrypoint(entry, entryArgs); +} + function canRunTypeScriptSource() { return typeof globalThis.Bun !== "undefined"; } @@ -93,9 +129,7 @@ const distEntry = distMain[command]; const sourceEntry = sourceMain[command]; if (canRunTypeScriptSource() && shouldPreferSourceEntry() && existsSync(sourceEntry)) { runEntrypoint(sourceEntry, args); -} - -if (existsSync(distEntry)) { +} else if (existsSync(distEntry)) { runEntrypoint(distEntry, args); } else { const buildResult = spawnSync(npmCommand, ["run", "build"], { diff --git a/packages/runtime/src/base-daemon.ts b/packages/runtime/src/base-daemon.ts index 6e81319b..0bc50151 100644 --- a/packages/runtime/src/base-daemon.ts +++ b/packages/runtime/src/base-daemon.ts @@ -25,6 +25,7 @@ const RESTART_MIN_DELAY_MS = 1_000; const RESTART_MAX_DELAY_MS = 30_000; const BROKER_HEALTH_TIMEOUT_MS = 30_000; const BROKER_HEALTH_POLL_MS = 250; +const CHILD_SHUTDOWN_TIMEOUT_MS = 12_000; const MENU_BUNDLE_ID = "com.openscout.menu"; const MENU_PROCESS_NAME = "OpenScoutMenu"; const PROCESS_NAME = "scout-base"; @@ -381,6 +382,51 @@ function stopMenuBarApp(): void { spawn("pkill", ["-x", MENU_PROCESS_NAME], { stdio: "ignore" }).unref(); } +function isChildExited(child: ChildProcess): boolean { + return child.exitCode !== null || child.signalCode !== null; +} + +function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise { + if (isChildExited(child)) { + return Promise.resolve(true); + } + return new Promise((resolve) => { + const timeout = setTimeout(() => { + child.off("exit", onExit); + resolve(false); + }, timeoutMs); + timeout.unref(); + const onExit = () => { + clearTimeout(timeout); + resolve(true); + }; + child.once("exit", onExit); + }); +} + +async function terminateChildProcess( + child: ChildProcess | null, + label: string, + timeoutMs = CHILD_SHUTDOWN_TIMEOUT_MS, +): Promise { + if (!child || isChildExited(child)) { + return; + } + if (!child.killed) { + child.kill("SIGTERM"); + } + if (await waitForChildExit(child, timeoutMs)) { + return; + } + warn(`${label} did not exit after SIGTERM; forcing shutdown`, { pid: child.pid }); + try { + child.kill("SIGKILL"); + } catch { + return; + } + await waitForChildExit(child, 2_000); +} + function stopSupervisedWeb(): void { if (supervisedWebPid && supervisedWebPid > 0) { try { @@ -403,11 +449,10 @@ async function shutdown(exitCode = 0): Promise { } stopSupervisedWeb(); stopMenuBarApp(); + const activeCaddyProcess = caddyProcess; stopEdgeProcesses(); - if (brokerProcess && !brokerProcess.killed) { - brokerProcess.kill("SIGTERM"); - } - await sleep(500); + await terminateChildProcess(brokerProcess, "broker"); + await terminateChildProcess(activeCaddyProcess, "local edge", 2_000); process.exit(exitCode); } diff --git a/packages/runtime/src/broker-daemon.ts b/packages/runtime/src/broker-daemon.ts index 8180784c..36542d16 100644 --- a/packages/runtime/src/broker-daemon.ts +++ b/packages/runtime/src/broker-daemon.ts @@ -353,9 +353,11 @@ const BROKER_VOICE_CHANNEL_ID = "channel.voice"; const BROKER_SYSTEM_CHANNEL_ID = "channel.system"; const WEB_START_POLL_TIMEOUT_MS = 15_000; const WEB_START_POLL_INTERVAL_MS = 250; +const SHUTDOWN_SERVER_CLOSE_TIMEOUT_MS = 5_000; let webServerProcess: ChildProcess | null = null; let webStartInFlight: Promise | null = null; let meshRendezvousPublisher: MeshRendezvousPublisher | null = null; +let parentWatcher: ReturnType | null = null; type LegacyRelayMessage = { id: string; @@ -1136,6 +1138,12 @@ async function upsertAgentDurably(agent: AgentDefinition): Promise { } async function upsertEndpointDurably(endpoint: AgentEndpoint): Promise { + const previous = runtime.peek().endpoints[endpoint.id]; + if (isEndpointLastSeenHeartbeat(previous, endpoint)) { + runtime.refreshEndpointSilently(endpoint); + return; + } + await runDurableWrite(async () => { await commitDurableEntries( { kind: "agent.endpoint.upsert", endpoint }, @@ -3302,6 +3310,66 @@ function sameSerializedRecord(left: T | undefined, right: T): boolean { return JSON.stringify(left) === JSON.stringify(right); } +function normalizeComparableValue(value: unknown): unknown { + if (value === undefined) { + return undefined; + } + + if (Array.isArray(value)) { + return value.map((entry) => normalizeComparableValue(entry)); + } + + if (value && typeof value === "object") { + const entries = Object.entries(value as Record) + .filter(([, entry]) => entry !== undefined) + .sort(([left], [right]) => left.localeCompare(right)) + .map(([key, entry]) => [key, normalizeComparableValue(entry)] as const); + return Object.fromEntries(entries); + } + + return value; +} + +function comparableEndpointWithoutHeartbeatMetadata(endpoint: AgentEndpoint): unknown { + const ignoredMetadataKeys = new Set(["lastSeenAt"]); + if (endpoint.metadata?.source === "scout-channel") { + // Older scout-channel clients recompute startedAt on every heartbeat. + ignoredMetadataKeys.add("startedAt"); + } + const metadata = endpoint.metadata + ? Object.fromEntries( + Object.entries(endpoint.metadata) + .filter(([key]) => !ignoredMetadataKeys.has(key)), + ) + : undefined; + + return normalizeComparableValue({ + ...endpoint, + metadata: metadata && Object.keys(metadata).length > 0 ? metadata : undefined, + }); +} + +function isEndpointLastSeenHeartbeat(previous: AgentEndpoint | undefined, next: AgentEndpoint): boolean { + if (!previous) { + return false; + } + + const previousLastSeenAt = previous.metadata?.lastSeenAt; + const nextLastSeenAt = next.metadata?.lastSeenAt; + if ( + typeof previousLastSeenAt !== "number" + || typeof nextLastSeenAt !== "number" + || !Number.isFinite(previousLastSeenAt) + || !Number.isFinite(nextLastSeenAt) + || nextLastSeenAt <= previousLastSeenAt + ) { + return false; + } + + return JSON.stringify(comparableEndpointWithoutHeartbeatMetadata(previous)) + === JSON.stringify(comparableEndpointWithoutHeartbeatMetadata(next)); +} + function staleLocalAgentReplacementId( definitionId: string | null, activeAgentIdsByDefinition: Map, @@ -8202,13 +8270,36 @@ if (Number.isFinite(localAgentSyncIntervalMs) && localAgentSyncIntervalMs > 0) { }, localAgentSyncIntervalMs).unref(); } +function forceCloseServer(serverInstance: ReturnType): void { + const forceCloseable = serverInstance as typeof serverInstance & { + closeAllConnections?: () => void; + closeIdleConnections?: () => void; + }; + forceCloseable.closeAllConnections?.(); + forceCloseable.closeIdleConnections?.(); +} + function closeServer(serverInstance: ReturnType): Promise { return new Promise((resolve) => { if (!serverInstance.listening) { resolve(); return; } - serverInstance.close(() => resolve()); + let settled = false; + const finish = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); + resolve(); + }; + const timeout = setTimeout(() => { + forceCloseServer(serverInstance); + finish(); + }, SHUTDOWN_SERVER_CLOSE_TIMEOUT_MS); + timeout.unref(); + serverInstance.close(() => finish()); }); } @@ -8217,6 +8308,10 @@ async function shutdownBroker(exitCode = 0): Promise { return; } shuttingDown = true; + if (parentWatcher) { + clearInterval(parentWatcher); + parentWatcher = null; + } if (webServerProcess && !webServerProcess.killed) { webServerProcess.kill("SIGTERM"); webServerProcess = null; @@ -8250,7 +8345,7 @@ for (const signal of ["SIGINT", "SIGTERM"] as const) { } if (Number.isFinite(parentPid) && parentPid > 0) { - setInterval(() => { + parentWatcher = setInterval(() => { try { process.kill(parentPid, 0); } catch { @@ -8260,5 +8355,6 @@ if (Number.isFinite(parentPid) && parentPid > 0) { process.exit(1); }); } - }, 2_000).unref(); + }, 2_000); + parentWatcher.unref(); } diff --git a/packages/runtime/src/broker-process-manager.ts b/packages/runtime/src/broker-process-manager.ts index 79e29e49..0d1f5aa7 100644 --- a/packages/runtime/src/broker-process-manager.ts +++ b/packages/runtime/src/broker-process-manager.ts @@ -138,6 +138,7 @@ export function isLoopbackHost(host: string): boolean { } const BROKER_SERVICE_POLL_INTERVAL_MS = 100; const DEFAULT_BROKER_START_TIMEOUT_MS = 15_000; +const DEFAULT_BROKER_STOP_TIMEOUT_MS = 20_000; export function buildDefaultBrokerUrl(host = DEFAULT_BROKER_HOST, port = DEFAULT_BROKER_PORT): string { return `http://${host}:${port}`; @@ -277,6 +278,14 @@ function resolveBrokerStartTimeoutMs(): number { return DEFAULT_BROKER_START_TIMEOUT_MS; } +function resolveBrokerStopTimeoutMs(): number { + const explicit = Number.parseInt(process.env.OPENSCOUT_BROKER_STOP_TIMEOUT_MS ?? "", 10); + if (Number.isFinite(explicit) && explicit > 0) { + return Math.max(explicit, BROKER_SERVICE_POLL_INTERVAL_MS); + } + return DEFAULT_BROKER_STOP_TIMEOUT_MS; +} + function resolveBrokerServiceLabel(mode: BrokerServiceMode): string { const explicit = process.env.OPENSCOUT_SERVICE_LABEL?.trim() || process.env.OPENSCOUT_BROKER_SERVICE_LABEL?.trim(); @@ -727,6 +736,7 @@ export async function startBrokerService(config: BrokerServiceConfig = resolveBr bootoutLegacyBrokerService(config); writeLaunchAgentPlist(config); runCommand(launchctlPath(), ["bootout", config.serviceTarget], { allowFailure: true }); + await waitForBrokerServiceStopped(config); runCommand(launchctlPath(), ["bootstrap", config.domainTarget, config.launchAgentPath], { allowFailure: true }); runCommand(launchctlPath(), ["kickstart", "-k", config.serviceTarget], { allowFailure: true }); @@ -747,16 +757,22 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } -export async function stopBrokerService(config: BrokerServiceConfig = resolveBrokerServiceConfig()): Promise { - runCommand(launchctlPath(), ["bootout", config.serviceTarget], { allowFailure: true }); - for (let attempt = 0; attempt < 30; attempt += 1) { - const status = await brokerServiceStatus(config); - if (!status.health.reachable) { +async function waitForBrokerServiceStopped(config: BrokerServiceConfig): Promise { + let status = await brokerServiceStatus(config); + const attempts = Math.ceil(resolveBrokerStopTimeoutMs() / BROKER_SERVICE_POLL_INTERVAL_MS); + for (let attempt = 0; attempt < attempts; attempt += 1) { + status = await brokerServiceStatus(config); + if (!status.loaded && !status.health.reachable) { return status; } await sleep(BROKER_SERVICE_POLL_INTERVAL_MS); } - return brokerServiceStatus(config); + return status; +} + +export async function stopBrokerService(config: BrokerServiceConfig = resolveBrokerServiceConfig()): Promise { + runCommand(launchctlPath(), ["bootout", config.serviceTarget], { allowFailure: true }); + return waitForBrokerServiceStopped(config); } export async function restartBrokerService(config: BrokerServiceConfig = resolveBrokerServiceConfig()): Promise { diff --git a/packages/runtime/src/broker.test.ts b/packages/runtime/src/broker.test.ts index ace78600..78eecb66 100644 --- a/packages/runtime/src/broker.test.ts +++ b/packages/runtime/src/broker.test.ts @@ -138,6 +138,44 @@ describe("InMemoryControlRuntime", () => { expect(runtime.flightForInvocation("invocation-1")?.id).toBe("flight-1"); }); + test("refreshes endpoint liveness without emitting events", async () => { + const runtime = createInMemoryControlRuntime(); + + await runtime.upsertEndpoint({ + id: "endpoint-1", + agentId: "fabric", + nodeId: "node-1", + harness: "claude", + transport: "claude_channel", + state: "active", + metadata: { + source: "scout-channel", + startedAt: 100, + lastSeenAt: 100, + }, + }); + + const eventCount = runtime.recentEvents().length; + + runtime.refreshEndpointSilently({ + id: "endpoint-1", + agentId: "fabric", + nodeId: "node-1", + harness: "claude", + transport: "claude_channel", + state: "active", + metadata: { + source: "scout-channel", + startedAt: 100, + lastSeenAt: 200, + }, + }); + + expect(runtime.snapshot().endpoints["endpoint-1"]?.metadata?.lastSeenAt).toBe(200); + expect(runtime.endpointsForAgent("fabric")[0]?.metadata?.lastSeenAt).toBe(200); + expect(runtime.recentEvents()).toHaveLength(eventCount); + }); + test("records broker-owned unblock requests and emits events", async () => { const runtime = createInMemoryControlRuntime({}, { localNodeId: "node-1" }); diff --git a/packages/runtime/src/broker.ts b/packages/runtime/src/broker.ts index a35a5960..a27d9b85 100644 --- a/packages/runtime/src/broker.ts +++ b/packages/runtime/src/broker.ts @@ -466,6 +466,17 @@ export class InMemoryControlRuntime implements ControlRuntime { }); } + refreshEndpointSilently(endpoint: AgentEndpoint): void { + const previous = this.registry.endpoints[endpoint.id]; + if (previous && previous.agentId !== endpoint.agentId) { + this.unindexEndpoint(previous); + } + this.registry.endpoints[endpoint.id] = endpoint; + if (!previous || previous.agentId !== endpoint.agentId) { + this.indexEndpoint(endpoint); + } + } + deleteEndpoint(id: string): void { const endpoint = this.registry.endpoints[id]; if (!endpoint) return; diff --git a/packages/runtime/src/claude-stream-json.test.ts b/packages/runtime/src/claude-stream-json.test.ts index 83f92a3b..4f5be530 100644 --- a/packages/runtime/src/claude-stream-json.test.ts +++ b/packages/runtime/src/claude-stream-json.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; -import { chmodSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { chmodSync, mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { delimiter, join } from "node:path"; @@ -254,6 +254,29 @@ describe("invokeClaudeStreamJsonAgent", () => { await shutdownClaudeStreamJsonAgent(options); }); + test("reset shutdown creates a missing runtime directory catalog", async () => { + const tempRoot = mkdtempSync(join(tmpdir(), "openscout-claude-missing-runtime-test-")); + tempPaths.add(tempRoot); + const runtimeDirectory = join(tempRoot, "missing", "runtime"); + const options = { + agentName: "hudson-missing-runtime", + sessionId: "relay-hudson-missing-runtime", + cwd: process.cwd(), + systemPrompt: "You are a test Claude relay agent.", + runtimeDirectory, + logsDirectory: join(tempRoot, "logs"), + launchArgs: [], + } as const; + + await expect(shutdownClaudeStreamJsonAgent(options, { resetSession: true })).resolves.toBeUndefined(); + + const catalog = JSON.parse(readFileSync(join(runtimeDirectory, "session-catalog.json"), "utf8")) as { + activeSessionId?: string | null; + sessions?: unknown[]; + }; + expect(catalog).toEqual({ activeSessionId: null, sessions: [] }); + }); + test("rejects completed stream-json turns with no visible output", async () => { const tempRoot = mkdtempSync(join(tmpdir(), "openscout-claude-empty-test-")); tempPaths.add(tempRoot); diff --git a/packages/runtime/src/claude-stream-json.ts b/packages/runtime/src/claude-stream-json.ts index e23296d5..fa59535d 100644 --- a/packages/runtime/src/claude-stream-json.ts +++ b/packages/runtime/src/claude-stream-json.ts @@ -176,6 +176,7 @@ export type SessionCatalog = { const SESSION_CATALOG_FILENAME = "session-catalog.json"; const SESSION_CATALOG_MAX_ENTRIES = 64; +const CLAUDE_STREAM_JSON_STARTUP_TIMEOUT_MS = 5_000; export function readSessionCatalogSync(runtimeDirectory: string): SessionCatalog { const catalogPath = join(runtimeDirectory, SESSION_CATALOG_FILENAME); @@ -211,6 +212,7 @@ async function readSessionCatalog(runtimeDirectory: string): Promise { const catalogPath = join(runtimeDirectory, SESSION_CATALOG_FILENAME); + await mkdir(runtimeDirectory, { recursive: true }); await writeFile(catalogPath, JSON.stringify(catalog, null, 2) + "\n"); } @@ -760,6 +762,14 @@ class ClaudeStreamJsonSession { private starting: Promise | null = null; + private onlineReady: Promise | null = null; + + private resolveOnlineReady: (() => void) | null = null; + + private rejectOnlineReady: ((error: Error) => void) | null = null; + + private onlineReadyTimer: NodeJS.Timeout | null = null; + private claudeSessionId: string | null = null; private resumedSessionId: string | null = null; @@ -787,7 +797,7 @@ class ClaudeStreamJsonSession { } async ensureOnline(): Promise<{ sessionId: string | null }> { - await this.ensureStarted(); + await this.ensureReadyForTurn(); return { sessionId: this.claudeSessionId, }; @@ -802,10 +812,13 @@ class ClaudeStreamJsonSession { timeoutMs: number | undefined, allowStaleResumeRetry: boolean, ): Promise<{ output: string; sessionId: string | null }> { - await this.ensureStarted(); + await this.ensureReadyForTurn(); const queuedTurn = this.turnQueue .catch(() => undefined) - .then(() => this.startTurn(prompt)); + .then(async () => { + await this.ensureReadyForTurn(); + return this.startTurn(prompt); + }); this.turnQueue = queuedTurn.then( (turn) => turn.outputPromise.then(() => undefined, () => undefined), () => undefined, @@ -834,7 +847,7 @@ class ClaudeStreamJsonSession { } private startTurn(prompt: string): { outputPromise: Promise } { - if (!this.process?.stdin) { + if (!this.isAlive() || !this.process?.stdin) { throw new Error(`Claude stream-json session for ${this.options.agentName} is not running.`); } if (this.activeTurn) { @@ -899,6 +912,7 @@ class ClaudeStreamJsonSession { this.starting = null; this.lineBuffer = ""; this.resumedSessionId = null; + this.rejectOnline(new Error(`Claude stream-json session for ${this.options.agentName} was shut down.`)); if (child && !child.killed && child.exitCode === null) { child.kill(); @@ -912,6 +926,58 @@ class ClaudeStreamJsonSession { } } + private async ensureReadyForTurn(): Promise { + await this.ensureStarted(); + if (!this.resumedSessionId && !this.claudeSessionId) { + await this.waitForOnlineReady(); + } + } + + private waitForOnlineReady(): Promise { + if (this.claudeSessionId) { + return Promise.resolve(); + } + if (this.onlineReady) { + return this.onlineReady; + } + + this.onlineReady = new Promise((resolve, reject) => { + this.resolveOnlineReady = () => { + this.clearOnlineReady(); + resolve(); + }; + this.rejectOnlineReady = (error) => { + this.clearOnlineReady(); + reject(error); + }; + this.onlineReadyTimer = setTimeout(() => { + this.rejectOnlineReady?.( + new Error(`Claude stream-json session for ${this.options.agentName} did not report ready within ${CLAUDE_STREAM_JSON_STARTUP_TIMEOUT_MS}ms.`), + ); + }, CLAUDE_STREAM_JSON_STARTUP_TIMEOUT_MS); + }); + + return this.onlineReady; + } + + private clearOnlineReady(): void { + if (this.onlineReadyTimer) { + clearTimeout(this.onlineReadyTimer); + } + this.onlineReadyTimer = null; + this.onlineReady = null; + this.resolveOnlineReady = null; + this.rejectOnlineReady = null; + } + + private resolveOnline(): void { + this.resolveOnlineReady?.(); + } + + private rejectOnline(error: Error): void { + this.rejectOnlineReady?.(error); + } + private configSignature(options: SessionRequestOptions): string { return JSON.stringify({ cwd: options.cwd, @@ -962,6 +1028,7 @@ class ClaudeStreamJsonSession { try { child = spawn(claudeExecutable, args, { cwd: this.options.cwd, + stdio: "pipe", env: buildManagedAgentEnvironment({ agentName: this.options.agentName, currentDirectory: this.options.cwd, @@ -983,6 +1050,7 @@ class ClaudeStreamJsonSession { } console.error(`[openscout-runtime] claude process error for ${this.options.agentName}: ${error.message}`); this.process = null; + this.rejectOnline(new Error(`Claude process error: ${error.message}`)); if (this.activeTurn) { const turn = this.activeTurn; this.activeTurn = null; @@ -1010,10 +1078,13 @@ class ClaudeStreamJsonSession { void appendFile(this.stderrLogPath, chunk).catch(() => undefined); }); - child.on("exit", (code: number | null) => { + child.on("close", (code: number | null) => { if (this.process !== child) { return; } + this.process = null; + this.lineBuffer = ""; + this.rejectOnline(new Error(`Claude exited with code ${code}`)); if (this.activeTurn) { const turn = this.activeTurn; this.activeTurn = null; @@ -1036,6 +1107,7 @@ class ClaudeStreamJsonSession { const nextSessionId = event.session_id ?? event.sessionId ?? null; if (nextSessionId && nextSessionId !== this.claudeSessionId) { this.claudeSessionId = nextSessionId; + this.resolveOnline(); void (async () => { const catalog = await readSessionCatalog(this.catalogDirectory); const updated = catalogRecordSession(catalog, nextSessionId, this.options.cwd); diff --git a/scripts/cargo.sh b/scripts/cargo.sh new file mode 100755 index 00000000..fac1f2c9 --- /dev/null +++ b/scripts/cargo.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ -n "${CARGO:-}" ]]; then + exec "$CARGO" "$@" +fi + +if command -v cargo >/dev/null 2>&1; then + exec cargo "$@" +fi + +if [[ -x "$HOME/.cargo/bin/cargo" ]]; then + exec "$HOME/.cargo/bin/cargo" "$@" +fi + +echo "cargo not found; install Rust with rustup or set CARGO=/path/to/cargo" >&2 +exit 127 diff --git a/scripts/supervisor-smoke.sh b/scripts/supervisor-smoke.sh new file mode 100755 index 00000000..f501841a --- /dev/null +++ b/scripts/supervisor-smoke.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +CARGO_SH="$ROOT_DIR/scripts/cargo.sh" +MANIFEST_PATH="$ROOT_DIR/crates/openscout-supervisor/Cargo.toml" + +"$CARGO_SH" fmt --all --check +"$CARGO_SH" check --manifest-path "$MANIFEST_PATH" +"$CARGO_SH" test --manifest-path "$MANIFEST_PATH" +"$CARGO_SH" build --manifest-path "$MANIFEST_PATH" +"$CARGO_SH" run --manifest-path "$MANIFEST_PATH" -- --help >/dev/null +"$CARGO_SH" run --manifest-path "$MANIFEST_PATH" -- status --json >/dev/null +"$CARGO_SH" run --manifest-path "$MANIFEST_PATH" -- doctor --json >/dev/null