diff --git a/bin/build-npm.ts b/bin/build-npm.ts index 023547f..5189a2e 100644 --- a/bin/build-npm.ts +++ b/bin/build-npm.ts @@ -31,6 +31,7 @@ await build({ }, dependencies: { "bun-sqlite-key-value": undefined as unknown as string, + "ws": "^8.18.0", "zod": undefined as unknown as string, }, devDependencies: { diff --git a/deno.json b/deno.json index ae8e84f..dabc5aa 100644 --- a/deno.json +++ b/deno.json @@ -30,6 +30,7 @@ "@flowcore/data-pump": "jsr:@flowcore/data-pump@^0.16.4", "@flowcore/sdk": "npm:@flowcore/sdk@^1.54.0", "postgres": "npm:postgres@^3.4.3", + "ws": "npm:ws@^8.18.0", "zod": "npm:zod@^3.25.63" }, "fmt": { diff --git a/deno.lock b/deno.lock index 167aa0b..d8ff8ed 100644 --- a/deno.lock +++ b/deno.lock @@ -44,6 +44,7 @@ "npm:postgres@^3.4.3": "3.4.5", "npm:prom-client@^15.1.3": "15.1.3", "npm:rxjs@^7.8.1": "7.8.2", + "npm:ws@^8.18.0": "8.19.0", "npm:zod@^3.25.63": "3.25.63" }, "jsr": { @@ -432,6 +433,7 @@ "npm:node-cache@^5.1.2", "npm:postgres@^3.4.3", "npm:rxjs@^7.8.1", + "npm:ws@^8.18.0", "npm:zod@^3.25.63" ] } diff --git a/src/pathways/cluster/cluster-manager.ts b/src/pathways/cluster/cluster-manager.ts index 12525ce..409ab27 100644 --- a/src/pathways/cluster/cluster-manager.ts +++ b/src/pathways/cluster/cluster-manager.ts @@ -12,6 +12,7 @@ import type { WsFailMessage, WsMessage, } from "./types.ts" +import { createNodeTransport } from "./node-transport.ts" const DEFAULT_LEASE_TTL_MS = 30_000 const DEFAULT_LEASE_RENEW_INTERVAL_MS = 10_000 @@ -21,34 +22,32 @@ const DEFAULT_DELIVERY_TIMEOUT_MS = 30_000 const LEASE_KEY = "pathway-cluster-leader" /** - * Creates a default transport using Deno APIs. - * Uses runtime detection to avoid compile-time dependency on Deno types. + * Creates a default transport with runtime auto-detection. + * Uses Deno APIs when available, falls back to ws-based Node.js transport. */ function createDefaultTransport(): ClusterTransport { // deno-lint-ignore no-explicit-any const D = (globalThis as any).Deno - if (!D?.serve) { - throw new Error( - "Default cluster transport requires Deno. Provide a custom ClusterTransport via options.transport for Node.js.", - ) + if (D?.serve) { + return { + startServer(port, onConnection) { + const server = D.serve({ port, hostname: "0.0.0.0" }, (req: Request) => { + if (req.headers.get("upgrade")?.toLowerCase() !== "websocket") { + return new Response("Expected WebSocket", { status: 426 }) + } + const { socket, response } = D.upgradeWebSocket(req) + socket.onopen = () => onConnection(socket as unknown as ClusterSocket) + return response + }) + return Promise.resolve({ shutdown: () => server.shutdown() }) + }, + connect(address) { + return new WebSocket(address) as unknown as ClusterSocket + }, + } } - return { - startServer(port, onConnection) { - const server = D.serve({ port, hostname: "0.0.0.0" }, (req: Request) => { - if (req.headers.get("upgrade")?.toLowerCase() !== "websocket") { - return new Response("Expected WebSocket", { status: 426 }) - } - const { socket, response } = D.upgradeWebSocket(req) - socket.onopen = () => onConnection(socket as unknown as ClusterSocket) - return response - }) - return Promise.resolve({ shutdown: () => server.shutdown() }) - }, - connect(address) { - return new WebSocket(address) as unknown as ClusterSocket - }, - } + return createNodeTransport() } /** diff --git a/src/pathways/cluster/index.ts b/src/pathways/cluster/index.ts index 1b6a2a1..7103497 100644 --- a/src/pathways/cluster/index.ts +++ b/src/pathways/cluster/index.ts @@ -13,4 +13,5 @@ export type { WsPongMessage, } from "./types.ts" export { ClusterManager } from "./cluster-manager.ts" +export { createNodeTransport } from "./node-transport.ts" export { createPostgresPathwayCoordinator, PostgresPathwayCoordinator } from "./postgres-coordinator.ts" diff --git a/src/pathways/cluster/node-transport.ts b/src/pathways/cluster/node-transport.ts new file mode 100644 index 0000000..86b1d5c --- /dev/null +++ b/src/pathways/cluster/node-transport.ts @@ -0,0 +1,57 @@ +import type { ClusterSocket, ClusterTransport } from "./types.ts" + +/** + * Creates a cluster transport for Node.js/Bun using the `ws` package. + * The `ws` import is deferred to `startServer()` to avoid loading it in Deno. + */ +export function createNodeTransport(): ClusterTransport { + return { + async startServer(port, onConnection) { + // deno-lint-ignore no-explicit-any + const ws = await import("ws") as any + const WebSocketServer = ws.WebSocketServer ?? ws.default?.WebSocketServer + const wss = new WebSocketServer({ port, host: "0.0.0.0" }) + + return new Promise((resolve) => { + wss.on("listening", () => { + resolve({ + async shutdown() { + await new Promise((res, rej) => { + wss.close((err?: Error) => (err ? rej(err) : res())) + }) + }, + }) + }) + + // deno-lint-ignore no-explicit-any + wss.on("connection", (rawWs: any) => { + const socket: ClusterSocket = { + send: (data: string) => rawWs.send(data), + close: () => rawWs.close(), + get readyState() { + return rawWs.readyState + }, + onopen: null, + onmessage: null, + onclose: null, + onerror: null, + } + + // deno-lint-ignore no-explicit-any + rawWs.on("message", (data: any) => { + socket.onmessage?.({ data: data.toString() }) + }) + rawWs.on("close", (ev: unknown) => socket.onclose?.(ev)) + rawWs.on("error", (ev: unknown) => socket.onerror?.(ev)) + + // Connection is already open when "connection" fires + onConnection(socket) + }) + }) + }, + + connect(address) { + return new WebSocket(address) as unknown as ClusterSocket + }, + } +}