Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/pathways/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type {
} from "./types.ts"
import type { PathwayClusterOptions } from "./cluster/types.ts"
import { ClusterManager } from "./cluster/cluster-manager.ts"
import type { PathwayPumpOptions } from "./pump/types.ts"
import type { PathwayPumpOptions, PumpState } from "./pump/types.ts"
import { PathwayPump } from "./pump/pathway-pump.ts"
import { PathwayProvisioner } from "./provisioner.ts"
import {
Expand Down Expand Up @@ -1176,6 +1176,14 @@ export class PathwaysBuilder<
}
})

// Wire reset handler: leader receives reset requests and delegates to pump
this.clusterManager.onReset(async (position?: PumpState) => {
if (!this.pathwayPump) {
throw new Error("Pump not running on this leader")
}
await this.pathwayPump.reset(position)
})

await this.clusterManager.start()

this.logger.info("Cluster started", {
Expand Down Expand Up @@ -1302,6 +1310,27 @@ export class PathwaysBuilder<
this.logger.info("Pump stopped")
}

/**
* Reset the data pump to a specific position or clear state and restart.
* In cluster mode, the request is routed to the leader automatically.
*
* @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state
* and restarts from the live position. To replay from the very beginning,
* pass the first time bucket explicitly.
*/
async resetPump(position?: PumpState): Promise<void> {
if (!this.pathwayPump) {
throw new Error("Pump not started — call startPump() first")
}

if (this.clusterManager) {
await this.clusterManager.requestReset(position)
return
}

await this.pathwayPump.reset(position)
}

/**
* Converts a Zod validation error to a human-readable string
* @param error The Zod validation error to convert
Expand Down
101 changes: 98 additions & 3 deletions src/pathways/cluster/cluster-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { FlowcoreEvent } from "../../contracts/event.ts"
import type { Logger } from "../logger.ts"
import { NoopLogger } from "../logger.ts"
import type { PumpState } from "../pump/types.ts"
import type {
ClusterRole,
ClusterSocket,
Expand All @@ -11,6 +12,7 @@ import type {
WsAckMessage,
WsFailMessage,
WsMessage,
WsResetMessage,
} from "./types.ts"
import { createNodeTransport } from "./node-transport.ts"

Expand Down Expand Up @@ -89,6 +91,8 @@ export class ClusterManager {
private leaderConnection: ClusterSocket | null = null
private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise<void>) | null = null
private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null
private resetHandler: ((position?: PumpState) => Promise<void>) | null = null
private pendingResets: Map<string, { resolve: () => void; reject: (error: Error) => void; sentAt: number }> = new Map()

constructor(options: PathwayClusterOptions, logger?: Logger) {
this.coordinator = options.coordinator
Expand Down Expand Up @@ -120,6 +124,50 @@ export class ClusterManager {
this.leadershipChangeHandler = handler
}

/**
* Set a callback that handles pump reset requests on the leader.
*/
onReset(handler: (position?: PumpState) => Promise<void>) {
this.resetHandler = handler
}

/**
* Request a pump reset. Routes to the leader automatically.
* - If this instance is the leader: executes the reset directly.
* - If this instance is a worker: forwards the request to the leader via WebSocket.
*/
async requestReset(position?: PumpState): Promise<void> {
if (this.role === "unknown") {
throw new Error("Cluster role not yet established — cannot reset")
}

if (this.role === "leader") {
if (!this.resetHandler) {
throw new Error("No reset handler set on ClusterManager")
}
await this.resetHandler(position)
return
}

// Worker: forward to leader via WS
if (!this.leaderConnection || this.leaderConnection.readyState !== WebSocket.OPEN) {
throw new Error("Not connected to leader — cannot forward reset")
}

const resetId = crypto.randomUUID()
return new Promise<void>((resolve, reject) => {
this.pendingResets.set(resetId, { resolve, reject, sentAt: Date.now() })

const msg: WsResetMessage = { type: "reset", resetId, position }
try {
this.leaderConnection!.send(JSON.stringify(msg))
} catch (err) {
this.pendingResets.delete(resetId)
reject(err instanceof Error ? err : new Error(String(err)))
}
})
}

/**
* Start the cluster: register instance, begin heartbeat, attempt leader election
*/
Expand Down Expand Up @@ -222,6 +270,12 @@ export class ClusterManager {
}
this.pendingDeliveries.clear()

// Reject pending resets
for (const [, pending] of this.pendingResets) {
pending.reject(new Error("Cluster manager stopped"))
}
this.pendingResets.clear()

// Unregister
try {
await this.coordinator.unregister(this.instanceId)
Expand Down Expand Up @@ -445,6 +499,22 @@ export class ClusterManager {
socket.send(JSON.stringify({ type: "pong" }))
break
}
case "reset-ack": {
const pending = this.pendingResets.get(msg.resetId)
if (pending) {
this.pendingResets.delete(msg.resetId)
pending.resolve()
}
break
}
case "reset-fail": {
const pending = this.pendingResets.get(msg.resetId)
if (pending) {
this.pendingResets.delete(msg.resetId)
pending.reject(new Error(msg.error))
}
break
}
default:
break
}
Expand All @@ -461,10 +531,10 @@ export class ClusterManager {
this.workerConnections.set(address, ws)
}

ws.onmessage = (event: { data: string }) => {
ws.onmessage = async (event: { data: string }) => {
try {
const msg: WsMessage = JSON.parse(event.data)
this.handleLeaderMessage(address, msg)
await this.handleLeaderMessage(address, msg)
} catch (err) {
this.logger.error(
"Error handling worker response",
Expand Down Expand Up @@ -492,7 +562,7 @@ export class ClusterManager {
}
}

private handleLeaderMessage(workerAddress: string, msg: WsMessage): void {
private async handleLeaderMessage(workerAddress: string, msg: WsMessage): Promise<void> {
switch (msg.type) {
case "ack": {
const delivery = this.pendingDeliveries.get(msg.deliveryId)
Expand All @@ -513,6 +583,25 @@ export class ClusterManager {
case "pong": {
break
}
case "reset": {
const resetMsg = msg as WsResetMessage
const ws = this.workerConnections.get(workerAddress)
if (!ws) break
try {
if (!this.resetHandler) {
throw new Error("No reset handler set on leader")
}
await this.resetHandler(resetMsg.position)
ws.send(JSON.stringify({ type: "reset-ack", resetId: resetMsg.resetId }))
} catch (err) {
ws.send(JSON.stringify({
type: "reset-fail",
resetId: resetMsg.resetId,
error: err instanceof Error ? err.message : String(err),
}))
}
break
}
default:
break
}
Expand Down Expand Up @@ -582,5 +671,11 @@ export class ClusterManager {
delivery.reject(new Error(`Delivery ${id} to ${delivery.workerAddress} timed out`))
}
}
for (const [id, pending] of this.pendingResets) {
if (now - pending.sentAt > this.deliveryTimeoutMs) {
this.pendingResets.delete(id)
pending.reject(new Error(`Reset ${id} timed out`))
}
}
}
}
21 changes: 21 additions & 0 deletions src/pathways/cluster/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { FlowcoreEvent } from "../../contracts/event.ts"
import type { PumpState } from "../pump/types.ts"

/**
* Coordinator interface for distributed cluster coordination
Expand Down Expand Up @@ -39,6 +40,9 @@ export type WsMessage =
| WsFailMessage
| WsPingMessage
| WsPongMessage
| WsResetMessage
| WsResetAckMessage
| WsResetFailMessage

export interface WsEventsMessage {
type: "events"
Expand Down Expand Up @@ -66,6 +70,23 @@ export interface WsPongMessage {
type: "pong"
}

export interface WsResetMessage {
type: "reset"
resetId: string
position?: PumpState
}

export interface WsResetAckMessage {
type: "reset-ack"
resetId: string
}

export interface WsResetFailMessage {
type: "reset-fail"
resetId: string
error: string
}

/**
* Minimal WebSocket-like interface for cross-platform compatibility
*/
Expand Down
43 changes: 42 additions & 1 deletion src/pathways/pump/pathway-pump.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { FlowcoreEvent } from "../../contracts/event.ts"
import type { Logger } from "../logger.ts"
import { NoopLogger } from "../logger.ts"
import type { PathwayPumpOptions, PumpNotifierConfig, PumpStateManagerFactory } from "./types.ts"
import type { PathwayPumpOptions, PumpNotifierConfig, PumpState, PumpStateManager, PumpStateManagerFactory } from "./types.ts"

/**
* Registered pathway info needed for pump grouping
Expand All @@ -28,6 +28,7 @@ export class PathwayPump {
private readonly logger: Logger

private pumps: Map<string, DataPumpInstance> = new Map()
private stateManagers: Map<string, PumpStateManager> = new Map()
private running = false

// Required config from PathwaysBuilder
Expand Down Expand Up @@ -94,6 +95,7 @@ export class PathwayPump {

for (const [flowType, eventTypes] of flowTypeGroups) {
const stateManager = this.stateManagerFactory(flowType)
this.stateManagers.set(flowType, stateManager)

const notifierOptions = this.buildNotifierOptions(flowType, eventTypes)

Expand Down Expand Up @@ -164,6 +166,45 @@ export class PathwayPump {
}

this.pumps.clear()
this.stateManagers.clear()
}

/**
* Reset all pumps to a specific position, or clear state and bounce if no position given.
* Uses @flowcore/data-pump's restart() to reposition the cursor without recreating instances.
*
* @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state
* and restarts pumps (pump will start from live position).
* To replay from the very beginning, pass the first time bucket explicitly.
*/
async reset(position?: PumpState): Promise<void> {
if (!this.running) {
throw new Error("PathwayPump is not running — cannot reset")
}

this.logger.info("Resetting data pumps", { position })

for (const [flowType, pump] of this.pumps) {
try {
if (position) {
await pump.restart({ timeBucket: position.timeBucket, eventId: position.eventId })
} else {
// Clear persisted state then restart pump from live
const stateManager = this.stateManagers.get(flowType)
if (stateManager?.clearState) {
await stateManager.clearState()
}
await pump.restart({ timeBucket: new Date().toISOString().replace(/[-:T]/g, "").slice(0, 14) })
}
this.logger.info("Data pump reset", { flowType, position })
} catch (err) {
this.logger.error(
`Error resetting pump for ${flowType}`,
err instanceof Error ? err : new Error(String(err)),
)
throw err
}
}
}

get isRunning(): boolean {
Expand Down
8 changes: 8 additions & 0 deletions src/pathways/pump/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class PostgresPumpStateManager implements PumpStateManager {
[this.flowType, state.timeBucket, state.eventId ?? null],
)
}

async clearState(): Promise<void> {
await this.ensureInitialized()
await this.adapter.execute(
`DELETE FROM ${this.tableName} WHERE flow_type = $1`,
[this.flowType],
)
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/pathways/pump/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export type PumpStateManagerFactory = (flowType: string) => PumpStateManager
export interface PumpStateManager {
getState(): Promise<PumpState | null> | PumpState | null
setState(state: PumpState): Promise<void> | void
clearState?(): Promise<void> | void
}

/**
Expand Down
Loading