Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions apps/cli/src/mcp/channel/log-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import { basename } from "node:path";
import type { Server as McpServer } from "@modelcontextprotocol/sdk/server/index.js";
import { authFetch } from "../../lib/auth/index.ts";
import { getControlApiUrl, startLogSession } from "../../lib/control-plane.ts";
import { getDeployMode, getProjectId } from "../../lib/project-link.ts";
import type { DebugLogger, McpServerOptions } from "../types.ts";

export interface LogEvent {
type: string;
ts: number;
outcome: string | null;
request: { method?: string; url?: string } | null;
logs: Array<{ ts: number | null; level: string | null; message: unknown[] }>;
exceptions: Array<{
ts: number | null;
name: string | null;
message: string | null;
}>;
}

const ERROR_OUTCOMES = new Set([
"exception",
"exceededCpu",
"exceededMemory",
"exceededWallTime",
"scriptNotFound",
]);

/** Determine whether a log event should trigger a channel notification. */
export function shouldEmitChannelNotification(event: LogEvent): boolean {
if (event.exceptions.length > 0) return true;
if (event.logs.some((l) => l.level === "error")) return true;
if (event.outcome && ERROR_OUTCOMES.has(event.outcome)) return true;
return false;
}

/**
* Format a log event into channel notification content and metadata.
*/
export function formatChannelContent(event: LogEvent): {
content: string;
meta: Record<string, string>;
} {
const parts: string[] = [];

for (const exc of event.exceptions) {
parts.push(`${exc.name ?? "Error"}: ${exc.message ?? "Unknown error"}`);
}
for (const log of event.logs) {
if (log.level === "error") {
parts.push(log.message.map(String).join(" "));
}
}
// For resource-limit outcomes with no exceptions/error logs, describe the outcome
if (parts.length === 0 && event.outcome && ERROR_OUTCOMES.has(event.outcome)) {
parts.push(`Worker ${event.outcome}`);
}
if (event.request) {
parts.push(`Request: ${event.request.method ?? "?"} ${event.request.url ?? "?"}`);
}

const eventType = event.exceptions.length > 0 ? "exception" : "error";

return {
content: parts.join("\n"),
meta: {
event: eventType,
outcome: event.outcome ?? "unknown",
},
};
}

/**
* Subscribe to a project's real-time log stream and emit channel notifications
* for errors and exceptions. Runs until the server closes, with reconnect-on-failure.
*
* Only works for managed (Jack Cloud) projects — silently skips BYO projects.
* Deduplicates repeated errors within a 60-second window to avoid flooding Claude's context.
*/
export async function startChannelLogSubscriber(
server: McpServer,
options: McpServerOptions,
debug: DebugLogger,
): Promise<void> {
const projectPath = options.projectPath ?? process.cwd();

const deployMode = await getDeployMode(projectPath).catch(() => null);
if (deployMode !== "managed") {
debug("Channel log subscriber: not a managed project, skipping");
return;
}

const projectId = await getProjectId(projectPath);
if (!projectId) {
debug("Channel log subscriber: no project ID found, skipping");
return;
}

const projectName = basename(projectPath);

debug("Channel log subscriber: starting", { projectId, projectName });

// Abort when the server closes so the process can exit cleanly
const abortController = new AbortController();
server.onclose = () => abortController.abort();

// Deduplicate: suppress identical error messages within a 60s window
const DEDUP_WINDOW_MS = 60_000;
const recentErrors = new Map<string, { count: number; firstSeen: number }>();

function isDuplicate(content: string): boolean {
const now = Date.now();
// Prune expired entries
for (const [key, entry] of recentErrors) {
if (now - entry.firstSeen > DEDUP_WINDOW_MS) {
recentErrors.delete(key);
}
}
const existing = recentErrors.get(content);
if (existing) {
existing.count++;
return true;
}
recentErrors.set(content, { count: 1, firstSeen: now });
return false;
}

let backoff = 5000;
const maxBackoff = 60000;

while (!abortController.signal.aborted) {
try {
const session = await startLogSession(projectId, "channel");
const streamUrl = `${getControlApiUrl()}${session.stream.url}`;

debug("Channel log subscriber: connected to SSE stream");
backoff = 5000;

const response = await authFetch(streamUrl, {
method: "GET",
headers: { Accept: "text/event-stream" },
signal: abortController.signal,
});

if (!response.ok || !response.body) {
throw new Error(`Failed to open log stream: ${response.status}`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (!abortController.signal.aborted) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";

for (const line of lines) {
if (!line.startsWith("data:")) continue;
const data = line.slice(5).trim();
if (!data) continue;

let parsed: LogEvent | null = null;
try {
parsed = JSON.parse(data) as LogEvent;
} catch {
continue;
}

if (parsed?.type !== "event") continue;
if (!shouldEmitChannelNotification(parsed)) continue;

const { content, meta } = formatChannelContent(parsed);

if (isDuplicate(content)) {
debug("Channel: suppressed duplicate error", { content: content.slice(0, 80) });
continue;
}

await server.notification({
method: "notifications/claude/channel",
params: {
content,
meta: { ...meta, project: projectName },
},
});

debug("Channel: emitted error notification", {
event: meta.event,
project: projectName,
});
}
}
} catch (err) {
if (abortController.signal.aborted) break;
debug("Channel log subscriber: connection error, retrying", {
error: String(err),
backoff,
});
}

if (abortController.signal.aborted) break;
await new Promise((r) => setTimeout(r, backoff));
backoff = Math.min(backoff * 1.5, maxBackoff);
}

debug("Channel log subscriber: stopped");
}
22 changes: 22 additions & 0 deletions apps/cli/src/mcp/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Server as McpServer } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import pkg from "../../package.json" with { type: "json" };
import { startChannelLogSubscriber } from "./channel/log-subscriber.ts";
import { registerResources } from "./resources/index.ts";
import { registerTools } from "./tools/index.ts";
import type { McpServerOptions } from "./types.ts";
Expand All @@ -19,6 +20,20 @@ export function createDebugLogger(enabled: boolean) {
};
}

const CHANNEL_INSTRUCTIONS = `Events from the jack channel are production alerts from your deployed project.
They arrive as <channel source="jack" event="..." ...> tags.

- event="error": A request to your deployed app hit an error. Read the error message,
find the relevant source code, and suggest a fix. Use tail_logs to check if it's
recurring. Use test_endpoint to reproduce if possible.
- event="exception": An uncaught exception in your deployed code. Check the source,
understand the cause, and suggest a fix.
- outcome="exceededCpu" / "exceededMemory" / "exceededWallTime": A resource limit was hit.
Suggest code optimization rather than looking for bugs.

If you see the same error repeatedly, diagnose it once and note the frequency.
Do NOT redeploy automatically. Present the fix and let the user decide.`;

export async function createMcpServer(options: McpServerOptions = {}) {
const debug = createDebugLogger(options.debug ?? false);

Expand All @@ -33,7 +48,9 @@ export async function createMcpServer(options: McpServerOptions = {}) {
capabilities: {
tools: {},
resources: {},
experimental: { "claude/channel": {} },
},
instructions: CHANNEL_INSTRUCTIONS,
},
);

Expand Down Expand Up @@ -93,6 +110,11 @@ export async function startMcpServer(options: McpServerOptions = {}) {

await server.connect(transport);

// Start channel log subscriber for production error streaming (fire-and-forget)
startChannelLogSubscriber(server, options, debug).catch((err) =>
debug("Channel log subscriber failed to start", { error: String(err) }),
);

debug("MCP server connected and ready");

// Keep the server running indefinitely.
Expand Down
48 changes: 47 additions & 1 deletion apps/cli/src/mcp/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import type { Server as McpServer } from "@modelcontextprotocol/sdk/server/index
import { CallToolRequestSchema, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import { authFetch } from "../../lib/auth/index.ts";
import { getControlApiUrl, startLogSession } from "../../lib/control-plane.ts";
import {
fetchProjectOverview,
getControlApiUrl,
startLogSession,
} from "../../lib/control-plane.ts";
import { JackError, JackErrorCode } from "../../lib/errors.ts";
import { getDeployMode, getProjectId } from "../../lib/project-link.ts";
import { createProject, deployProject, getProjectStatus } from "../../lib/project-operations.ts";
Expand Down Expand Up @@ -42,6 +46,31 @@ import { Events, track, withTelemetry } from "../../lib/telemetry.ts";
import type { DebugLogger, McpServerOptions } from "../types.ts";
import { formatErrorResponse, formatSuccessResponse } from "../utils.ts";

/**
* Poll control plane until a deployment reaches a terminal status.
* Used by the deploy_project tool to return final status instead of "building".
*/
async function pollDeploymentStatus(
projectId: string,
deploymentId: string,
maxAttempts = 60,
intervalMs = 3000,
): Promise<{ status: string; error_message: string | null } | null> {
for (let i = 0; i < maxAttempts; i++) {
await new Promise((r) => setTimeout(r, intervalMs));
try {
const overview = await fetchProjectOverview(projectId);
const dep = overview.latest_deployment;
if (dep?.id === deploymentId && (dep.status === "live" || dep.status === "failed")) {
return { status: dep.status, error_message: dep.error_message ?? null };
}
} catch {
// Network blip — keep polling
}
}
return null;
}

// Tool schemas
const CreateProjectSchema = z.object({
name: z.string().optional().describe("Project name (auto-generated if not provided)"),
Expand Down Expand Up @@ -1038,6 +1067,23 @@ export function registerTools(server: McpServer, _options: McpServerOptions, deb

const result = await wrappedDeployProject(args.project_path, args.message);

// For managed deploys, poll until final status
if (
result.deploymentId &&
result.deployMode === "managed" &&
result.deployStatus !== "live" &&
result.deployStatus !== "failed"
) {
const projectId = await getProjectId(args.project_path ?? process.cwd());
if (projectId) {
const final = await pollDeploymentStatus(projectId, result.deploymentId);
if (final) {
result.deployStatus = final.status;
result.errorMessage = final.error_message;
}
}
}

return {
content: [
{
Expand Down
Loading
Loading