Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/build-npm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ await build({
},
dependencies: {
"bun-sqlite-key-value": undefined as unknown as string,
"ws": "^8.18.0",
"zod": undefined as unknown as string,
},
devDependencies: {
Expand Down
1 change: 1 addition & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@flowcore/data-pump": "jsr:@flowcore/data-pump@^0.16.4",
"@flowcore/sdk": "npm:@flowcore/sdk@^1.54.0",
"postgres": "npm:postgres@^3.4.3",
"ws": "npm:ws@^8.18.0",
"zod": "npm:zod@^3.25.63"
},
"fmt": {
Expand Down
2 changes: 2 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 21 additions & 22 deletions src/pathways/cluster/cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
WsFailMessage,
WsMessage,
} from "./types.ts"
import { createNodeTransport } from "./node-transport.ts"

const DEFAULT_LEASE_TTL_MS = 30_000
const DEFAULT_LEASE_RENEW_INTERVAL_MS = 10_000
Expand All @@ -21,34 +22,32 @@ const DEFAULT_DELIVERY_TIMEOUT_MS = 30_000
const LEASE_KEY = "pathway-cluster-leader"

/**
* Creates a default transport using Deno APIs.
* Uses runtime detection to avoid compile-time dependency on Deno types.
* Creates a default transport with runtime auto-detection.
* Uses Deno APIs when available, falls back to ws-based Node.js transport.
*/
function createDefaultTransport(): ClusterTransport {
// deno-lint-ignore no-explicit-any
const D = (globalThis as any).Deno
if (!D?.serve) {
throw new Error(
"Default cluster transport requires Deno. Provide a custom ClusterTransport via options.transport for Node.js.",
)
if (D?.serve) {
return {
startServer(port, onConnection) {
const server = D.serve({ port, hostname: "0.0.0.0" }, (req: Request) => {
if (req.headers.get("upgrade")?.toLowerCase() !== "websocket") {
return new Response("Expected WebSocket", { status: 426 })
}
const { socket, response } = D.upgradeWebSocket(req)
socket.onopen = () => onConnection(socket as unknown as ClusterSocket)
return response
})
return Promise.resolve({ shutdown: () => server.shutdown() })
},
connect(address) {
return new WebSocket(address) as unknown as ClusterSocket
},
}
}

return {
startServer(port, onConnection) {
const server = D.serve({ port, hostname: "0.0.0.0" }, (req: Request) => {
if (req.headers.get("upgrade")?.toLowerCase() !== "websocket") {
return new Response("Expected WebSocket", { status: 426 })
}
const { socket, response } = D.upgradeWebSocket(req)
socket.onopen = () => onConnection(socket as unknown as ClusterSocket)
return response
})
return Promise.resolve({ shutdown: () => server.shutdown() })
},
connect(address) {
return new WebSocket(address) as unknown as ClusterSocket
},
}
return createNodeTransport()
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/pathways/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ export type {
WsPongMessage,
} from "./types.ts"
export { ClusterManager } from "./cluster-manager.ts"
export { createNodeTransport } from "./node-transport.ts"
export { createPostgresPathwayCoordinator, PostgresPathwayCoordinator } from "./postgres-coordinator.ts"
57 changes: 57 additions & 0 deletions src/pathways/cluster/node-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { ClusterSocket, ClusterTransport } from "./types.ts"

/**
* Creates a cluster transport for Node.js/Bun using the `ws` package.
* The `ws` import is deferred to `startServer()` to avoid loading it in Deno.
*/
export function createNodeTransport(): ClusterTransport {
return {
async startServer(port, onConnection) {
// deno-lint-ignore no-explicit-any
const ws = await import("ws") as any
const WebSocketServer = ws.WebSocketServer ?? ws.default?.WebSocketServer
const wss = new WebSocketServer({ port, host: "0.0.0.0" })

return new Promise((resolve) => {
wss.on("listening", () => {
resolve({
async shutdown() {
await new Promise<void>((res, rej) => {
wss.close((err?: Error) => (err ? rej(err) : res()))
})
},
})
})

// deno-lint-ignore no-explicit-any
wss.on("connection", (rawWs: any) => {
const socket: ClusterSocket = {
send: (data: string) => rawWs.send(data),
close: () => rawWs.close(),
get readyState() {
return rawWs.readyState
},
onopen: null,
onmessage: null,
onclose: null,
onerror: null,
}

// deno-lint-ignore no-explicit-any
rawWs.on("message", (data: any) => {
socket.onmessage?.({ data: data.toString() })
})
rawWs.on("close", (ev: unknown) => socket.onclose?.(ev))
rawWs.on("error", (ev: unknown) => socket.onerror?.(ev))

// Connection is already open when "connection" fires
onConnection(socket)
})
})
},

connect(address) {
return new WebSocket(address) as unknown as ClusterSocket
},
}
}
Loading