diff --git a/README.md b/README.md index 0c61db0a..2eb82373 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,23 @@ Explore outputs to `.opencli/explore//` (manifest.json, endpoints.json, ca See **[TESTING.md](./TESTING.md)** for how to run and write tests. +## Channel — Event Subscriptions + +Subscribe to platform events from the command line. Channel brings the reverse direction: instead of you querying platforms, platforms notify you when something changes. + +```bash +# Subscribe to comments on a GitHub issue +opencli channel subscribe github:owner/repo#42 + +# Start polling +opencli channel start + +# One-shot poll +opencli channel poll github:owner/repo#42 +``` + +See **[docs/channel.md](./docs/channel.md)** for the full guide. + ## Troubleshooting - **"Extension not connected"** diff --git a/docs/channel.md b/docs/channel.md new file mode 100644 index 00000000..69c18a19 --- /dev/null +++ b/docs/channel.md @@ -0,0 +1,309 @@ +# OpenCLI Channel — Event Subscription Protocol + +**Subscribe to platform events from the command line.** Channel brings the reverse direction to OpenCLI: instead of you asking platforms for data, platforms tell you when something happens. + +## The Idea + +OpenCLI today is pull-only: `opencli twitter post`, `opencli notion read`, `opencli gh pr list`. You ask, the platform answers. But what about the other direction? + +- A reviewer leaves comments on your GitHub PR → you want your agent to pick them up automatically. +- Someone comments on your Notion doc → you want to be notified and respond. +- A new issue appears in your repo → you want it routed to the right handler. + +Channel fills this gap. It's [fetchmail](https://en.wikipedia.org/wiki/Fetchmail) for APIs: poll remote platforms, track what you've already seen, deliver new events to whoever subscribed. + +## Quick Start + +```bash +# 1. See what sources are available +opencli channel sources + +# 2. Subscribe to an issue's comments +opencli channel subscribe github:owner/repo#42 + +# 3. One-shot poll (prints events as JSON lines) +opencli channel poll github:owner/repo#42 + +# 4. Start the daemon for continuous polling +opencli channel start + +# 5. Check status +opencli channel status + +# 6. Stop the daemon +opencli channel stop +``` + +## Architecture + +``` +┌─ opencli channel ─────────────────────────────────────┐ +│ │ +│ Sources (platform adapters): │ +│ └── github.ts → poll via `gh api` │ +│ │ +│ Core: │ +│ ├── Scheduler → per-origin poll loop + backoff │ +│ ├── Cursor Store → persists position per origin │ +│ ├── Dedup → ring buffer, no re-delivery │ +│ └── Registry → who subscribed to what │ +│ │ +│ Sinks (output adapters): │ +│ ├── stdout → JSON lines (pipe-friendly) │ +│ └── webhook → POST to any URL │ +│ │ +└────────────────────────────────────────────────────────┘ +``` + +**Three boundaries, cleanly separated:** +- **Sources** know how to poll a specific platform. They don't know about subscribers or sinks. +- **Core** knows scheduling, state, dedup, and the subscription registry. It doesn't know about platforms. +- **Sinks** know how to deliver events. They don't know where events came from. + +### Consumer-Side Subscription + +The key design choice: **Channel doesn't decide where events go.** Instead, consumers (humans or agents) subscribe to the origins they care about. + +This means: +- No routing logic in Channel. No dispatcher, no "smart" routing. +- Multi-to-multi is free: one consumer subscribes to many sources, one source has many consumers. +- Session lifecycle is not Channel's problem: if a consumer dies, delivery fails, Channel cleans up. + +## Event Schema + +Every event follows a unified envelope: + +```json +{ + "id": "gh-comment-123456", + "source": "github", + "type": "issue_comment.created", + "timestamp": "2026-03-24T17:30:00Z", + "origin": "github:user/repo#42", + "payload": { + "author": "reviewer", + "body": "This needs error handling", + "htmlUrl": "https://github.com/user/repo/issues/42#issuecomment-123456" + } +} +``` + +| Field | Description | +|-------|-------------| +| `id` | Globally unique event ID (used for dedup) | +| `source` | Which source adapter produced this event | +| `type` | Platform-specific event type (dot-namespaced) | +| `timestamp` | When the event occurred on the platform (ISO-8601) | +| `origin` | Origin identifier — what subscriptions match against | +| `payload` | Platform-specific event data | + +## GitHub Source + +The GitHub source adapter uses `gh api` for all API calls, inheriting your existing `gh` authentication, proxy settings, and host configuration. + +### Origin Formats + +| Origin | What it watches | +|--------|-----------------| +| `github:owner/repo` | All repo events (pushes, PRs, issues, stars, etc.) | +| `github:owner/repo#42` | Comments on issue/PR #42 | +| `github:owner/repo/pulls` | All pull request activity | +| `github:owner/repo/issues` | All issue activity | + +### Event Types + +| Event Type | Origin | Description | +|------------|--------|-------------| +| `issue_comment.created` | `#number` | New comment on an issue/PR | +| `pull_request.open` | `/pulls` | PR opened or updated | +| `pull_request.closed` | `/pulls` | PR closed | +| `issue.open` | `/issues` | Issue opened or updated | +| `issue.closed` | `/issues` | Issue closed | +| `push` | repo-level | Code pushed | +| `pull_request_review` | repo-level | PR review submitted | +| `release` | repo-level | New release published | +| `star` | repo-level | Repo starred | + +### Examples + +```bash +# Watch a specific issue for new comments +opencli channel subscribe github:jackwener/opencli#369 +opencli channel start + +# Watch all PRs in a repo +opencli channel subscribe github:myorg/myproject/pulls + +# One-shot: grab recent events for an issue +opencli channel poll github:myorg/myproject#100 + +# Poll from a specific point in time +opencli channel poll github:myorg/myproject#100 --since 2026-03-01T00:00:00Z +``` + +## CLI Reference + +### `opencli channel sources [name]` + +List available event sources. With a source name, lists subscribable items. + +```bash +opencli channel sources # all sources +opencli channel sources github # GitHub-specific items +``` + +### `opencli channel subscribe ` + +Subscribe to events from an origin. + +```bash +opencli channel subscribe github:owner/repo#42 +opencli channel subscribe github:owner/repo/pulls --sink webhook --webhook-url http://localhost:3000/events +opencli channel subscribe github:owner/repo --interval 120000 # 2 min interval +``` + +Options: +- `-s, --sink ` — Sink to deliver to (default: `stdout`) +- `-i, --interval ` — Poll interval in ms (default: `60000`) +- `--webhook-url ` — URL for webhook sink + +### `opencli channel unsubscribe ` + +Remove a subscription. + +```bash +opencli channel unsubscribe github:owner/repo#42 +``` + +### `opencli channel subscriptions` + +List all current subscriptions. + +```bash +opencli channel subscriptions +opencli channel subscriptions --format json +``` + +### `opencli channel start` + +Start the polling daemon. + +```bash +opencli channel start # foreground (Ctrl+C to stop) +opencli channel start -d # background daemon +``` + +### `opencli channel stop` + +Stop the background daemon. + +### `opencli channel status` + +Show daemon status, subscription list, and cursor positions. + +### `opencli channel poll ` + +One-shot poll: fetch events and print to stdout as JSON lines. + +```bash +opencli channel poll github:owner/repo#42 +opencli channel poll github:owner/repo#42 --since 2026-03-01T00:00:00Z +``` + +## Writing a Custom Source Adapter + +A source adapter implements the `ChannelSource` interface: + +```typescript +import type { ChannelSource, ChannelEvent, PollResult, SourcePollConfig, SubscribableItem } from '../types.js'; + +export class MySource implements ChannelSource { + readonly name = 'mysource'; + + async listSubscribable(config: Record): Promise { + // Return items users can subscribe to + return [ + { origin: 'mysource:channel/general', description: 'General channel' }, + ]; + } + + parseOrigin(origin: string): SourcePollConfig | null { + // Parse "mysource:channel/general" → config object + if (!origin.startsWith('mysource:')) return null; + const channel = origin.slice('mysource:'.length); + return { channel }; + } + + async poll(config: SourcePollConfig, cursor: string | null): Promise { + // Fetch new events since cursor + // Use CLI tools (not raw HTTP) when possible + const events: ChannelEvent[] = [/* ... */]; + return { events, cursor: 'new-cursor-value' }; + } +} +``` + +Then register it in `src/channel/index.ts`: + +```typescript +import { MySource } from './sources/mysource.js'; + +function getSources(): Map { + const map = new Map(); + map.set('github', new GitHubSource()); + map.set('mysource', new MySource()); // ← add here + return map; +} +``` + +## Writing a Custom Sink Adapter + +A sink adapter implements the `ChannelSink` interface: + +```typescript +import type { ChannelSink, ChannelEvent } from '../types.js'; + +export class MySink implements ChannelSink { + readonly name = 'mysink'; + + async init(config: Record): Promise { + // Initialize with config from subscription + } + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + // Deliver each event + } + } +} +``` + +## Configuration Files + +All state lives in `~/.opencli/channel/`: + +| File | Purpose | +|------|---------| +| `subscriptions.json` | Subscription registry | +| `cursors.json` | Poll cursor positions per origin | +| `daemon.pid` | PID of running daemon | + +These are plain JSON — human-readable and inspectable. + +## Design Philosophy + +Channel is a **pipe**, not a brain. + +It borrows from Unix `fetchmail`: poll remote sources, track what you've seen, deliver to whoever asked. It doesn't decide what to do with events — that's the consumer's job. + +The consumer-side subscription model means Channel stays thin: +1. **Deliver** — get the event to the right place +2. **Continuity** — same origin always goes to the same subscriber +3. **Isolation** — different subscriptions don't cross + +Everything else — how to respond, whether to spawn new sessions, whether to write to a doc or reply in chat — is the consumer's decision. + +## Related + +- [RFC: OpenCLI Channel](https://github.com/jackwener/opencli/issues/369) +- [fetchmail](https://en.wikipedia.org/wiki/Fetchmail) — the Unix inspiration diff --git a/src/channel/cursor-store.ts b/src/channel/cursor-store.ts new file mode 100644 index 00000000..be1bf775 --- /dev/null +++ b/src/channel/cursor-store.ts @@ -0,0 +1,71 @@ +/** + * Cursor store — persists poll positions per origin. + * File: ~/.opencli/channel/cursors.json + */ + +import { mkdirSync } from 'node:fs'; +import { readFile, writeFile, rename } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import { homedir } from 'node:os'; + +export interface CursorEntry { + cursor: string; + lastPoll: string; + eventsDelivered: number; +} + +const DEFAULT_PATH = join(homedir(), '.opencli', 'channel', 'cursors.json'); + +export class CursorStore { + private entries = new Map(); + + constructor(private readonly path: string = DEFAULT_PATH) {} + + async load(): Promise { + try { + const raw = await readFile(this.path, 'utf8'); + const parsed = JSON.parse(raw) as Record; + this.entries = new Map(Object.entries(parsed)); + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'ENOENT') { + this.entries = new Map(); + return; + } + throw e; + } + } + + private saveQueue: Promise = Promise.resolve(); + + async save(): Promise { + // Chain saves to serialize concurrent calls + this.saveQueue = this.saveQueue.then(() => this._doSave(), () => this._doSave()); + await this.saveQueue; + } + + private async _doSave(): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + const tmp = `${this.path}.${process.pid}.${Date.now()}.tmp`; + const data = JSON.stringify(Object.fromEntries(this.entries), null, 2); + await writeFile(tmp, data, 'utf8'); + await rename(tmp, this.path); + } + + get(origin: string): CursorEntry | undefined { + return this.entries.get(origin); + } + + set(origin: string, cursor: string, newEventsDelivered: number): void { + const existing = this.entries.get(origin); + const cumulative = (existing?.eventsDelivered ?? 0) + newEventsDelivered; + this.entries.set(origin, { + cursor, + lastPoll: new Date().toISOString(), + eventsDelivered: cumulative, + }); + } + + getAll(): Map { + return new Map(this.entries); + } +} diff --git a/src/channel/dedup.ts b/src/channel/dedup.ts new file mode 100644 index 00000000..2df487eb --- /dev/null +++ b/src/channel/dedup.ts @@ -0,0 +1,35 @@ +/** + * In-memory circular-buffer dedup. + * Keeps last N event IDs to prevent re-delivery. + * O(1) add/check via Set + circular index (no Array.shift overhead). + */ + +export class Dedup { + private readonly ring: (string | undefined)[]; + private readonly seen = new Set(); + private readonly maxSize: number; + private writeIdx = 0; + + constructor(maxSize = 10_000) { + this.maxSize = maxSize; + this.ring = new Array(maxSize); + } + + isDuplicate(id: string): boolean { + return this.seen.has(id); + } + + add(id: string): void { + if (this.seen.has(id)) return; + + // Evict the oldest entry at the write position + const evicted = this.ring[this.writeIdx]; + if (evicted !== undefined) { + this.seen.delete(evicted); + } + + this.ring[this.writeIdx] = id; + this.seen.add(id); + this.writeIdx = (this.writeIdx + 1) % this.maxSize; + } +} diff --git a/src/channel/index.ts b/src/channel/index.ts new file mode 100644 index 00000000..2baee4b0 --- /dev/null +++ b/src/channel/index.ts @@ -0,0 +1,399 @@ +/** + * Channel — Event subscription protocol for OpenCLI. + * + * CLI subcommands: + * opencli channel sources [name] — list available sources + * opencli channel subscribe — subscribe to events + * opencli channel unsubscribe — remove subscription + * opencli channel subscriptions — list current subscriptions + * opencli channel start [-d] — start polling daemon + * opencli channel stop — stop daemon + * opencli channel status — show stats + * opencli channel poll — one-shot poll + */ + +import { Command } from 'commander'; +import { existsSync, readFileSync, writeFileSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import { homedir } from 'node:os'; +import { spawn } from 'node:child_process'; + +import { CursorStore } from './cursor-store.js'; +import { Dedup } from './dedup.js'; +import { SubscriptionRegistry } from './registry.js'; +import { Scheduler, type SinkFactory } from './scheduler.js'; +import type { ChannelSink, ChannelSource } from './types.js'; + +// Sources +import { GitHubSource } from './sources/github.js'; + +// Sinks +import { StdoutSink } from './sinks/stdout.js'; +import { WebhookSink } from './sinks/webhook.js'; + +// ── Constants ─────────────────────────────────────────────────────── + +const CHANNEL_DIR = join(homedir(), '.opencli', 'channel'); +const PID_FILE = join(CHANNEL_DIR, 'daemon.pid'); + +// ── Source / Sink registries ──────────────────────────────────────── + +function getSources(): Map { + const map = new Map(); + map.set('github', new GitHubSource()); + return map; +} + +/** Factory that creates a fresh sink instance per subscription. */ +function createSink(name: string, _config: Record): ChannelSink { + switch (name) { + case 'stdout': return new StdoutSink(); + case 'webhook': return new WebhookSink(); + default: throw new Error(`Unknown sink: ${name}`); + } +} + +// ── CLI registration ──────────────────────────────────────────────── + +export function registerChannelCommand(program: Command): void { + const channel = program + .command('channel') + .description('Event subscription — subscribe to platform events and receive them in your session'); + + // ── sources ───────────────────────────────────────────────────── + + channel + .command('sources [name]') + .description('List available event sources (or subscribable items for a specific source)') + .action(async (name?: string) => { + const sources = getSources(); + + if (!name) { + // List all sources + console.log('Available sources:\n'); + for (const [sourceName, source] of sources) { + console.log(` ${sourceName}`); + const items = await source.listSubscribable({}); + for (const item of items.slice(0, 5)) { + console.log(` ${item.origin} — ${item.description}`); + } + if (items.length > 5) console.log(` ... and ${items.length - 5} more`); + console.log(); + } + return; + } + + const source = sources.get(name); + if (!source) { + console.error(`Unknown source: ${name}`); + console.error(`Available: ${[...sources.keys()].join(', ')}`); + process.exit(1); + } + + const items = await source.listSubscribable({}); + console.log(`Subscribable items from ${name}:\n`); + for (const item of items) { + console.log(` ${item.origin} — ${item.description}`); + } + }); + + // ── subscribe ─────────────────────────────────────────────────── + + channel + .command('subscribe ') + .description('Subscribe to events from an origin (e.g. github:owner/repo#42)') + .option('-s, --sink ', 'Sink to deliver events to', 'stdout') + .option('-i, --interval ', 'Poll interval in milliseconds', '60000') + .option('--webhook-url ', 'Webhook URL (when sink=webhook)') + .action(async (origin: string, opts: { sink: string; interval: string; webhookUrl?: string }) => { + // Validate origin can be parsed by some source + const sources = getSources(); + let parsed = false; + for (const source of sources.values()) { + if (source.parseOrigin(origin)) { parsed = true; break; } + } + if (!parsed) { + console.error(`Cannot parse origin: ${origin}`); + console.error('Expected format: github:owner/repo, github:owner/repo#42, etc.'); + process.exit(1); + } + + // Validate interval + const intervalMs = parseInt(opts.interval, 10); + if (isNaN(intervalMs) || intervalMs < 0) { + console.error(`Invalid interval: ${opts.interval}. Must be a positive number in milliseconds.`); + process.exit(1); + } + + // Validate webhook config + if (opts.sink === 'webhook' && !opts.webhookUrl) { + console.error('Webhook sink requires --webhook-url.'); + process.exit(1); + } + + const sinkConfig: Record = {}; + if (opts.sink === 'webhook' && opts.webhookUrl) { + sinkConfig.url = opts.webhookUrl; + } + + const registry = new SubscriptionRegistry(); + const sub = await registry.withLock(() => + registry.add(origin, opts.sink, sinkConfig, intervalMs), + ); + + console.log(`✅ Subscribed to ${origin}`); + console.log(` ID: ${sub.id}`); + console.log(` Sink: ${opts.sink}`); + console.log(` Interval: ${opts.interval}ms`); + }); + + // ── unsubscribe ───────────────────────────────────────────────── + + channel + .command('unsubscribe ') + .description('Remove subscription for an origin') + .action(async (origin: string) => { + const registry = new SubscriptionRegistry(); + const removed = await registry.withLock(() => registry.remove(origin)); + + if (removed) { + console.log(`✅ Unsubscribed from ${origin}`); + } else { + console.log(`No subscription found for ${origin}`); + } + }); + + // ── subscriptions ─────────────────────────────────────────────── + + channel + .command('subscriptions') + .alias('list') + .description('List current subscriptions') + .option('-f, --format ', 'Output format: table, json', 'table') + .action(async (opts: { format: string }) => { + const registry = new SubscriptionRegistry(); + await registry.load(); + const subs = registry.list(); + + if (subs.length === 0) { + console.log('No subscriptions. Use `opencli channel subscribe ` to add one.'); + return; + } + + if (opts.format === 'json') { + console.log(JSON.stringify(subs, null, 2)); + return; + } + + // Table output + const header = ['ORIGIN', 'SINK', 'INTERVAL', 'CREATED']; + const rows = subs.map(s => [ + s.origin, + s.sink, + s.intervalMs > 0 ? `${s.intervalMs}ms` : 'default', + new Date(s.createdAt).toLocaleDateString(), + ]); + + const widths = header.map((h, i) => + Math.max(h.length, ...rows.map(r => r[i].length)), + ); + + console.log(header.map((h, i) => h.padEnd(widths[i])).join(' ')); + console.log(widths.map(w => '─'.repeat(w)).join(' ')); + for (const row of rows) { + console.log(row.map((v, i) => v.padEnd(widths[i])).join(' ')); + } + }); + + // ── start ─────────────────────────────────────────────────────── + + channel + .command('start') + .description('Start the channel polling daemon') + .option('-d, --daemon', 'Run in background') + .action(async (opts: { daemon?: boolean }) => { + if (opts.daemon) { + // Check for stale PID file + if (existsSync(PID_FILE)) { + const existingPid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(existingPid, 0); + console.error(`Channel daemon already running (PID: ${existingPid}). Use 'opencli channel stop' first.`); + process.exit(1); + } catch { + // Stale PID, clean up + unlinkSync(PID_FILE); + } + } + + // Spawn detached child + const child = spawn(process.execPath, [process.argv[1], 'channel', 'start'], { + detached: true, + stdio: 'ignore', + }); + child.unref(); + if (!child.pid) { + console.error('Failed to spawn daemon process.'); + process.exit(1); + } + writeFileSync(PID_FILE, String(child.pid)); + console.log(`Channel daemon started (PID: ${child.pid})`); + return; + } + + // Foreground mode + console.log('Starting channel daemon (foreground)...'); + const registry = new SubscriptionRegistry(); + await registry.load(); + + const subs = registry.list(); + if (subs.length === 0) { + console.log('No subscriptions. Use `opencli channel subscribe ` first.'); + process.exit(0); + } + + const cursors = new CursorStore(); + await cursors.load(); + + const sources = getSources(); + + const dedup = new Dedup(); + const scheduler = new Scheduler(sources, createSink, registry, cursors, dedup); + + const shutdown = (): void => { + console.log('\nStopping channel daemon...'); + scheduler.stop(); + try { unlinkSync(PID_FILE); } catch {} + // Flush cursors before exit + cursors.save().catch(() => {}).finally(() => process.exit(0)); + }; + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); + + // Write PID for status/stop + writeFileSync(PID_FILE, String(process.pid)); + + console.log(`Polling ${subs.length} subscription(s)...`); + for (const sub of subs) { + console.log(` ${sub.origin} → ${sub.sink}`); + } + console.log(); + + await scheduler.start(); + }); + + // ── stop ──────────────────────────────────────────────────────── + + channel + .command('stop') + .description('Stop the channel daemon') + .action(() => { + if (!existsSync(PID_FILE)) { + console.log('No daemon running (no PID file).'); + return; + } + + const pid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(pid, 'SIGTERM'); + unlinkSync(PID_FILE); + console.log(`Channel daemon stopped (PID: ${pid})`); + } catch { + unlinkSync(PID_FILE); + console.log('Daemon was not running. Cleaned up PID file.'); + } + }); + + // ── status ────────────────────────────────────────────────────── + + channel + .command('status') + .description('Show channel daemon status and cursor positions') + .action(async () => { + // Check daemon + let daemonRunning = false; + if (existsSync(PID_FILE)) { + const pid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(pid, 0); // Check if alive + daemonRunning = true; + } catch { + // Not running + } + } + console.log(`Daemon: ${daemonRunning ? '🟢 running' : '⚪ stopped'}`); + console.log(); + + // Subscriptions + const registry = new SubscriptionRegistry(); + await registry.load(); + const subs = registry.list(); + console.log(`Subscriptions: ${subs.length}`); + + if (subs.length === 0) return; + + // Cursors + const cursors = new CursorStore(); + await cursors.load(); + + console.log(); + const header = ['ORIGIN', 'SINK', 'LAST POLL', 'EVENTS']; + const rows = subs.map(s => { + const c = cursors.get(s.origin); + return [ + s.origin, + s.sink, + c?.lastPoll ? new Date(c.lastPoll).toLocaleString() : 'never', + String(c?.eventsDelivered ?? 0), + ]; + }); + + const widths = header.map((h, i) => + Math.max(h.length, ...rows.map(r => r[i].length)), + ); + console.log(header.map((h, i) => h.padEnd(widths[i])).join(' ')); + console.log(widths.map(w => '─'.repeat(w)).join(' ')); + for (const row of rows) { + console.log(row.map((v, i) => v.padEnd(widths[i])).join(' ')); + } + }); + + // ── poll (one-shot) ───────────────────────────────────────────── + + channel + .command('poll ') + .description('One-shot poll: fetch events and print to stdout') + .option('--since ', 'Poll from specific cursor/timestamp') + .action(async (origin: string, opts: { since?: string }) => { + const sources = getSources(); + + // Find matching source + let matchedSource: ChannelSource | undefined; + for (const source of sources.values()) { + if (source.parseOrigin(origin)) { + matchedSource = source; + break; + } + } + + if (!matchedSource) { + console.error(`Cannot parse origin: ${origin}`); + process.exit(1); + } + + const config = matchedSource.parseOrigin(origin)!; + const cursor = opts.since ?? null; + + const result = await matchedSource.poll(config, cursor); + + for (const event of result.events) { + process.stdout.write(JSON.stringify(event) + '\n'); + } + + if (result.events.length === 0) { + console.error('(no new events)'); + } else { + console.error(`${result.events.length} event(s). cursor: ${result.cursor}`); + } + }); +} diff --git a/src/channel/registry.ts b/src/channel/registry.ts new file mode 100644 index 00000000..43db04e7 --- /dev/null +++ b/src/channel/registry.ts @@ -0,0 +1,152 @@ +/** + * Subscription registry — persists who subscribes to what. + * File: ~/.opencli/channel/subscriptions.json + * + * Uses a lockfile to prevent concurrent CLI invocations from clobbering + * each other's changes (e.g. two `subscribe` commands in parallel). + */ + +import { mkdirSync, existsSync, unlinkSync } from 'node:fs'; +import { readFile, writeFile, rename, open } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import { homedir } from 'node:os'; +import { randomUUID } from 'node:crypto'; +import type { Subscription } from './types.js'; + +const DEFAULT_PATH = join(homedir(), '.opencli', 'channel', 'subscriptions.json'); + +export class SubscriptionRegistry { + private subs: Subscription[] = []; + private readonly lockPath: string; + + constructor(private readonly path: string = DEFAULT_PATH) { + this.lockPath = `${path}.lock`; + } + + /** + * Atomically load → mutate → save with file locking. + * Ensures concurrent CLI invocations don't clobber each other. + */ + async withLock(fn: () => T | Promise): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + + // Acquire lock via O_CREAT|O_EXCL (atomic create-or-fail) + const maxRetries = 50; + const retryMs = 100; + let lockFd: Awaited> | null = null; + + for (let i = 0; i < maxRetries; i++) { + try { + lockFd = await open(this.lockPath, 'wx'); + break; + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'EEXIST') { + // Lock held by another process — check if stale (>10s old) + try { + const { mtimeMs } = await (await open(this.lockPath, 'r')).stat(); + (await open(this.lockPath, 'r')).close(); + if (Date.now() - mtimeMs > 10_000) { + // Stale lock — remove and retry + try { unlinkSync(this.lockPath); } catch {} + continue; + } + } catch {} + await new Promise(r => setTimeout(r, retryMs)); + continue; + } + throw e; + } + } + + if (!lockFd) { + throw new Error(`Failed to acquire lock on ${this.lockPath} after ${maxRetries} retries`); + } + + try { + await this._load(); + const result = await fn(); + await this._save(); + return result; + } finally { + await lockFd.close(); + try { unlinkSync(this.lockPath); } catch {} + } + } + + /** Load without lock (for read-only operations like list/status). */ + async load(): Promise { + await this._load(); + } + + private async _load(): Promise { + try { + const raw = await readFile(this.path, 'utf8'); + this.subs = JSON.parse(raw) as Subscription[]; + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'ENOENT') { + this.subs = []; + return; + } + throw e; + } + } + + async save(): Promise { + await this._save(); + } + + private async _save(): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + const tmp = `${this.path}.${process.pid}.${Date.now()}.tmp`; + await writeFile(tmp, JSON.stringify(this.subs, null, 2), 'utf8'); + await rename(tmp, this.path); + } + + add(origin: string, sink: string, sinkConfig: Record = {}, intervalMs = 0): Subscription { + const configKey = JSON.stringify(sinkConfig); + const existing = this.subs.find( + s => s.origin === origin && s.sink === sink && JSON.stringify(s.sinkConfig) === configKey, + ); + if (existing) { + if (intervalMs !== existing.intervalMs) { + existing.intervalMs = intervalMs; + } + return existing; + } + + const sub: Subscription = { + id: randomUUID(), + origin, + sink, + sinkConfig, + intervalMs, + createdAt: new Date().toISOString(), + }; + this.subs.push(sub); + return sub; + } + + remove(origin: string): boolean { + const before = this.subs.length; + this.subs = this.subs.filter(s => s.origin !== origin); + return this.subs.length < before; + } + + removeById(id: string): boolean { + const before = this.subs.length; + this.subs = this.subs.filter(s => s.id !== id); + return this.subs.length < before; + } + + list(): Subscription[] { + return [...this.subs]; + } + + origins(): string[] { + return [...new Set(this.subs.map(s => s.origin))]; + } + + forOrigin(origin: string): Subscription[] { + return this.subs.filter(s => s.origin === origin); + } +} diff --git a/src/channel/scheduler.ts b/src/channel/scheduler.ts new file mode 100644 index 00000000..e6b6b39b --- /dev/null +++ b/src/channel/scheduler.ts @@ -0,0 +1,176 @@ +/** + * Scheduler — manages per-origin poll loops. + * One loop per unique origin across all subscriptions. + */ + +import { CursorStore } from './cursor-store.js'; +import { Dedup } from './dedup.js'; +import { SubscriptionRegistry } from './registry.js'; +import type { ChannelEvent, ChannelSink, ChannelSource, SourcePollConfig } from './types.js'; + +const DEFAULT_INTERVAL_MS = 60_000; +const MIN_INTERVAL_MS = 10_000; +const MAX_BACKOFF_MS = 5 * 60_000; + +/** Factory that creates a fresh sink instance per subscription. */ +export type SinkFactory = (name: string, config: Record) => ChannelSink; + +interface SubscriptionSink { + subscriptionId: string; + sink: ChannelSink; +} + +interface OriginLoop { + origin: string; + source: ChannelSource; + pollConfig: SourcePollConfig; + sinks: SubscriptionSink[]; + timer: ReturnType | null; + intervalMs: number; + consecutiveErrors: number; +} + +export class Scheduler { + private readonly loops = new Map(); + private running = false; + + constructor( + private readonly sources: Map, + private readonly sinkFactory: SinkFactory, + private readonly registry: SubscriptionRegistry, + private readonly cursors: CursorStore, + private readonly dedup: Dedup, + ) {} + + async start(): Promise { + this.running = true; + const origins = this.registry.origins(); + + for (const origin of origins) { + await this.startOriginLoop(origin); + } + } + + stop(): void { + this.running = false; + for (const loop of this.loops.values()) { + if (loop.timer) clearTimeout(loop.timer); + } + this.loops.clear(); + } + + private async startOriginLoop(origin: string): Promise { + if (this.loops.has(origin)) return; + + // Parse origin → source + config + const [sourceName] = origin.split(':', 1); + const source = this.sources.get(sourceName); + if (!source) { + console.error(`[channel] Unknown source "${sourceName}" for origin "${origin}"`); + return; + } + + const pollConfig = source.parseOrigin(origin); + if (!pollConfig) { + console.error(`[channel] Source "${sourceName}" cannot parse origin "${origin}"`); + return; + } + + // Create a dedicated sink instance per subscription + const subs = this.registry.forOrigin(origin); + const sinkInstances: SubscriptionSink[] = []; + for (const sub of subs) { + try { + const sink = this.sinkFactory(sub.sink, sub.sinkConfig); + await sink.init(sub.sinkConfig); + sinkInstances.push({ subscriptionId: sub.id, sink }); + } catch (e) { + console.error(`[channel] Failed to init sink "${sub.sink}" for subscription ${sub.id}:`, e); + } + } + + if (sinkInstances.length === 0) { + console.error(`[channel] No valid sinks for origin "${origin}", skipping.`); + return; + } + + // Use subscription-level interval override, or default + const overrideMs = subs.reduce((min, s) => { + if (s.intervalMs > 0 && s.intervalMs < min) return s.intervalMs; + return min; + }, DEFAULT_INTERVAL_MS); + + const loop: OriginLoop = { + origin, + source, + pollConfig, + sinks: sinkInstances, + timer: null, + intervalMs: Math.max(overrideMs, MIN_INTERVAL_MS), + consecutiveErrors: 0, + }; + + this.loops.set(origin, loop); + void this.tick(loop); + } + + private async tick(loop: OriginLoop): Promise { + if (!this.running) return; + + try { + const cursorEntry = this.cursors.get(loop.origin); + const cursor = cursorEntry?.cursor ?? null; + + const result = await loop.source.poll(loop.pollConfig, cursor); + + // Dedup + const fresh: ChannelEvent[] = []; + for (const event of result.events) { + const dedupKey = `${loop.origin}:${event.id}`; + if (!this.dedup.isDuplicate(dedupKey)) { + this.dedup.add(dedupKey); + fresh.push(event); + } + } + + // Deliver to each subscription's dedicated sink + let allDelivered = true; + if (fresh.length > 0) { + for (const { subscriptionId, sink } of loop.sinks) { + try { + await sink.deliver(fresh); + } catch (e) { + allDelivered = false; + console.error(`[channel] Sink delivery failed for subscription ${subscriptionId}:`, e); + } + } + } + + // Only advance cursor if ALL sinks succeeded (or no events). + // This prevents data loss: a failing sink will see the same events + // on the next poll rather than missing them forever. + if (fresh.length === 0 || allDelivered) { + this.cursors.set(loop.origin, result.cursor, fresh.length); + await this.cursors.save(); + } + + // Reset backoff on success + loop.consecutiveErrors = 0; + + // Respect server-recommended interval + if (result.recommendedIntervalMs && result.recommendedIntervalMs > loop.intervalMs) { + loop.intervalMs = result.recommendedIntervalMs; + } + } catch (e) { + loop.consecutiveErrors++; + console.error(`[channel] Poll failed for "${loop.origin}" (attempt ${loop.consecutiveErrors}):`, e); + } + + // Schedule next tick with backoff + const backoff = loop.consecutiveErrors > 0 + ? Math.min(loop.intervalMs * Math.pow(2, loop.consecutiveErrors - 1), MAX_BACKOFF_MS) + : loop.intervalMs; + + loop.timer = setTimeout(() => void this.tick(loop), backoff); + } +} diff --git a/src/channel/sinks/stdout.ts b/src/channel/sinks/stdout.ts new file mode 100644 index 00000000..8a996b07 --- /dev/null +++ b/src/channel/sinks/stdout.ts @@ -0,0 +1,18 @@ +/** + * Stdout sink — prints events as JSON lines. + * Pipe-friendly, zero config. + */ + +import type { ChannelEvent, ChannelSink } from '../types.js'; + +export class StdoutSink implements ChannelSink { + readonly name = 'stdout'; + + async init(_config: Record): Promise {} + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + process.stdout.write(JSON.stringify(event) + '\n'); + } + } +} diff --git a/src/channel/sinks/webhook.ts b/src/channel/sinks/webhook.ts new file mode 100644 index 00000000..8fd3136a --- /dev/null +++ b/src/channel/sinks/webhook.ts @@ -0,0 +1,50 @@ +/** + * Webhook sink — POST events to a URL. + */ + +import type { ChannelEvent, ChannelSink } from '../types.js'; + +export class WebhookSink implements ChannelSink { + readonly name = 'webhook'; + private url = ''; + private headers: Record = {}; + + async init(config: Record): Promise { + if (typeof config.url !== 'string' || !config.url) { + throw new Error('Webhook sink requires a "url" config.'); + } + // Validate URL scheme to prevent SSRF + const parsed = new URL(config.url); + if (!['http:', 'https:'].includes(parsed.protocol)) { + throw new Error(`Webhook sink only supports http/https URLs, got: ${parsed.protocol}`); + } + this.url = config.url; + if (config.headers && typeof config.headers === 'object') { + this.headers = config.headers as Record; + } + } + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 30_000); + try { + const res = await fetch(this.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...this.headers }, + body: JSON.stringify(event), + signal: controller.signal, + }); + if (!res.ok) { + console.error(`[webhook] ${res.status} ${res.statusText} for event ${event.id}`); + } + } finally { + clearTimeout(timeout); + } + } catch (e) { + console.error(`[webhook] Failed to deliver event ${event.id}:`, e); + } + } + } +} diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts new file mode 100644 index 00000000..c971db3f --- /dev/null +++ b/src/channel/sources/github.ts @@ -0,0 +1,321 @@ +/** + * GitHub source adapter. + * + * Uses `gh api` CLI for all API calls — inherits auth, proxy, host config. + * + * Origin formats: + * github:owner/repo — repo-level events + * github:owner/repo#42 — issue/PR comments + * github:owner/repo/pulls — all PR activity + * github:owner/repo/issues — all issue activity + */ + +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; +import type { + ChannelEvent, + ChannelSource, + PollResult, + SourcePollConfig, + SubscribableItem, +} from '../types.js'; + +const execFileAsync = promisify(execFile); + +// ── Origin parsing ────────────────────────────────────────────────── + +interface RepoPollConfig extends SourcePollConfig { + kind: 'repo'; + owner: string; + repo: string; +} + +interface IssuePollConfig extends SourcePollConfig { + kind: 'issue'; + owner: string; + repo: string; + number: number; +} + +interface PullsPollConfig extends SourcePollConfig { + kind: 'pulls'; + owner: string; + repo: string; +} + +interface IssuesPollConfig extends SourcePollConfig { + kind: 'issues'; + owner: string; + repo: string; +} + +type GitHubPollConfig = RepoPollConfig | IssuePollConfig | PullsPollConfig | IssuesPollConfig; + +// ── Helpers ───────────────────────────────────────────────────────── + +async function ghJson(endpoint: string): Promise<{ data: T; pollInterval?: number }> { + const { stdout } = await execFileAsync('gh', ['api', '--include', endpoint], { + encoding: 'utf8', + timeout: 30_000, + }); + + const jsonStart = stdout.search(/^\s*[\[{]/m); + const headers = jsonStart > 0 ? stdout.slice(0, jsonStart) : ''; + const body = jsonStart >= 0 ? stdout.slice(jsonStart) : stdout; + + const data = JSON.parse(body) as T; + + // Extract X-Poll-Interval if present + let pollInterval: number | undefined; + const match = headers.match(/x-poll-interval:\s*(\d+)/i); + if (match) pollInterval = parseInt(match[1], 10) * 1000; + + return { data, pollInterval }; +} + +// ── Source implementation ─────────────────────────────────────────── + +export class GitHubSource implements ChannelSource { + readonly name = 'github'; + + async listSubscribable(_config: Record): Promise { + // List user's repos as subscribable items + try { + const { data } = await ghJson>( + '/user/repos?per_page=30&sort=updated', + ); + return data.map(r => ({ + origin: `github:${r.full_name}`, + description: r.description ?? r.full_name, + })); + } catch { + return [ + { origin: 'github:/', description: 'Subscribe to repo events' }, + { origin: 'github:/#', description: 'Subscribe to issue/PR comments' }, + { origin: 'github://pulls', description: 'Subscribe to all PR activity' }, + { origin: 'github://issues', description: 'Subscribe to all issue activity' }, + ]; + } + } + + parseOrigin(origin: string): GitHubPollConfig | null { + if (!origin.startsWith('github:')) return null; + const rest = origin.slice('github:'.length); + + // github:owner/repo/pulls + const pullsMatch = rest.match(/^([^/]+)\/([^/]+)\/pulls$/); + if (pullsMatch) return { kind: 'pulls', owner: pullsMatch[1], repo: pullsMatch[2] }; + + // github:owner/repo/issues + const issuesMatch = rest.match(/^([^/]+)\/([^/]+)\/issues$/); + if (issuesMatch) return { kind: 'issues', owner: issuesMatch[1], repo: issuesMatch[2] }; + + // github:owner/repo#42 + const issueMatch = rest.match(/^([^/]+)\/([^/#]+)#(\d+)$/); + if (issueMatch) return { kind: 'issue', owner: issueMatch[1], repo: issueMatch[2], number: parseInt(issueMatch[3], 10) }; + + // github:owner/repo + const repoMatch = rest.match(/^([^/]+)\/([^/#]+)$/); + if (repoMatch) return { kind: 'repo', owner: repoMatch[1], repo: repoMatch[2] }; + + return null; + } + + async poll(config: SourcePollConfig, cursor: string | null): Promise { + const c = config as GitHubPollConfig; + switch (c.kind) { + case 'repo': return this.pollRepoEvents(c, cursor); + case 'issue': return this.pollIssueComments(c, cursor); + case 'pulls': return this.pollPullRequests(c, cursor); + case 'issues': return this.pollIssues(c, cursor); + } + } + + // ── Poll strategies ───────────────────────────────────────────── + + private async pollRepoEvents(c: RepoPollConfig, cursor: string | null): Promise { + const endpoint = `/repos/${c.owner}/${c.repo}/events?per_page=100`; + const { data, pollInterval } = await ghJson; + }>>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(e => Date.parse(e.created_at) > cursorTs) + .map(e => ({ + id: `gh-event-${e.id}`, + source: 'github', + type: mapGitHubEventType(e.type), + timestamp: e.created_at, + origin: `github:${c.owner}/${c.repo}`, + payload: { + actor: e.actor.login, + eventType: e.type, + ...e.payload, + }, + })); + + const newCursor = data.length > 0 ? data[0].created_at : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private async pollIssueComments(c: IssuePollConfig, cursor: string | null): Promise { + const sinceParam = cursor ? `?since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/issues/${c.number}/comments${sinceParam}`; + const { data, pollInterval } = await ghJson>(endpoint); + + // GitHub's `since` param filters by updated_at, so we must also + // filter by updated_at to avoid missing edits or re-delivering stale items. + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(comment => Date.parse(comment.updated_at) > cursorTs) + .map(comment => ({ + id: `gh-comment-${comment.id}-${comment.updated_at}`, + source: 'github', + type: comment.created_at === comment.updated_at + ? 'issue_comment.created' + : 'issue_comment.updated', + timestamp: comment.updated_at, + origin: `github:${c.owner}/${c.repo}#${c.number}`, + payload: { + author: comment.user.login, + body: comment.body, + htmlUrl: comment.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[data.length - 1].updated_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private async pollPullRequests(c: PullsPollConfig, cursor: string | null): Promise { + const sinceParam = cursor ? `&since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/pulls?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; + const { data, pollInterval } = await ghJson>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(pr => Date.parse(pr.updated_at) > cursorTs) + .map(pr => ({ + id: `gh-pr-${pr.id}-${pr.updated_at}`, + source: 'github', + type: `pull_request.${pr.state}`, + timestamp: pr.updated_at, + origin: `github:${c.owner}/${c.repo}/pulls`, + payload: { + number: pr.number, + title: pr.title, + state: pr.state, + author: pr.user.login, + htmlUrl: pr.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[0].updated_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private async pollIssues(c: IssuesPollConfig, cursor: string | null): Promise { + const sinceParam = cursor ? `&since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/issues?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; + const { data, pollInterval } = await ghJson>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + // Filter out PRs (GitHub issues API includes PRs) + .filter(issue => !issue.pull_request) + .filter(issue => Date.parse(issue.updated_at) > cursorTs) + .map(issue => ({ + id: `gh-issue-${issue.id}-${issue.updated_at}`, + source: 'github', + type: `issue.${issue.state}`, + timestamp: issue.updated_at, + origin: `github:${c.owner}/${c.repo}/issues`, + payload: { + number: issue.number, + title: issue.title, + state: issue.state, + author: issue.user.login, + htmlUrl: issue.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[0].updated_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } +} + +// ── Event type mapping ────────────────────────────────────────────── + +function mapGitHubEventType(ghType: string): string { + const map: Record = { + PushEvent: 'push', + PullRequestEvent: 'pull_request', + PullRequestReviewEvent: 'pull_request_review', + PullRequestReviewCommentEvent: 'pull_request_review_comment', + IssuesEvent: 'issue', + IssueCommentEvent: 'issue_comment', + CreateEvent: 'create', + DeleteEvent: 'delete', + ForkEvent: 'fork', + WatchEvent: 'star', + ReleaseEvent: 'release', + }; + return map[ghType] ?? ghType.replace(/Event$/, '').toLowerCase(); +} diff --git a/src/channel/types.ts b/src/channel/types.ts new file mode 100644 index 00000000..2addb974 --- /dev/null +++ b/src/channel/types.ts @@ -0,0 +1,84 @@ +/** + * Channel — Event subscription protocol for OpenCLI. + * + * Core types: events, sources, sinks, subscriptions. + */ + +/** Unified event envelope emitted by all sources. */ +export interface ChannelEvent { + /** Globally unique event ID (used for dedup). */ + id: string; + /** Which source adapter produced this event. */ + source: string; + /** Platform-specific event type (dot-namespaced). */ + type: string; + /** When the event occurred on the platform (ISO-8601). */ + timestamp: string; + /** + * Origin identifier — what subscriptions match against. + * Format: `source:path` e.g. `github:user/repo#42` + */ + origin: string; + /** Platform-specific event data. */ + payload: Record; +} + +/** A source adapter knows how to poll a specific platform for events. */ +export interface ChannelSource { + readonly name: string; + + /** Return human-readable list of subscribable items for discovery. */ + listSubscribable(config: Record): Promise; + + /** + * Parse an origin string into source-specific config. + * e.g. "github:user/repo#42" → { owner: "user", repo: "repo", issue: 42 } + * Returns null if this source can't handle the origin. + */ + parseOrigin(origin: string): SourcePollConfig | null; + + /** + * Poll for new events since cursor. + * Returns events + new cursor position. + */ + poll(config: SourcePollConfig, cursor: string | null): Promise; +} + +export interface SubscribableItem { + origin: string; + description: string; +} + +export interface SourcePollConfig { + [key: string]: unknown; +} + +export interface PollResult { + events: ChannelEvent[]; + cursor: string; + /** Server-recommended poll interval in ms (e.g. GitHub X-Poll-Interval). */ + recommendedIntervalMs?: number; +} + +/** A sink adapter knows how to deliver events to a consumer. */ +export interface ChannelSink { + readonly name: string; + init(config: Record): Promise; + deliver(events: ChannelEvent[]): Promise; +} + +/** A subscription: consumer → origin mapping. */ +export interface Subscription { + /** Unique subscription ID. */ + id: string; + /** Origin pattern to match, e.g. "github:user/repo#42". */ + origin: string; + /** Sink name to deliver to. */ + sink: string; + /** Sink-specific config. */ + sinkConfig: Record; + /** Poll interval override in ms (0 = use source default). */ + intervalMs: number; + /** When this subscription was created. */ + createdAt: string; +} diff --git a/src/cli.ts b/src/cli.ts index d9c26684..f4290ef1 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -16,6 +16,7 @@ import { printCompletionScript } from './completion.js'; import { loadExternalClis, executeExternalCli, installExternalCli, registerExternalCli, isBinaryInstalled } from './external.js'; import { registerAllCommands } from './commanderAdapter.js'; import { getErrorMessage } from './errors.js'; +import { registerChannelCommand } from './channel/index.js'; export function runCli(BUILTIN_CLIS: string, USER_CLIS: string): void { const program = new Command(); @@ -451,6 +452,9 @@ export function runCli(BUILTIN_CLIS: string, USER_CLIS: string): void { siteGroups.set('antigravity', antigravityCmd); registerAllCommands(program, siteGroups); + // ── Channel (event subscriptions) ─────────────────────────────────────── + registerChannelCommand(program); + // ── Unknown command fallback ────────────────────────────────────────────── // Security: do NOT auto-discover and register arbitrary system binaries. // Only explicitly registered external CLIs (via `opencli register`) are allowed.