From cecfb574ae25b7e3e89f244de34d51f46115309f Mon Sep 17 00:00:00 2001 From: jbiskur Date: Tue, 24 Mar 2026 10:35:55 +0000 Subject: [PATCH] feat: add cluster-aware pump reset functionality Add resetPump(position?) to PathwaysBuilder that repositions the data pump cursor. In cluster mode, reset requests from workers are automatically forwarded to the leader via WebSocket. Supports resetting to a specific timeBucket/eventId or clearing persisted state entirely. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pathways/builder.ts | 31 +++++++- src/pathways/cluster/cluster-manager.ts | 101 +++++++++++++++++++++++- src/pathways/cluster/types.ts | 21 +++++ src/pathways/pump/pathway-pump.ts | 43 +++++++++- src/pathways/pump/state.ts | 8 ++ src/pathways/pump/types.ts | 1 + 6 files changed, 200 insertions(+), 5 deletions(-) diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index f10e0c5..1d344b7 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -20,7 +20,7 @@ import type { } from "./types.ts" import type { PathwayClusterOptions } from "./cluster/types.ts" import { ClusterManager } from "./cluster/cluster-manager.ts" -import type { PathwayPumpOptions } from "./pump/types.ts" +import type { PathwayPumpOptions, PumpState } from "./pump/types.ts" import { PathwayPump } from "./pump/pathway-pump.ts" import { PathwayProvisioner } from "./provisioner.ts" import { @@ -1176,6 +1176,14 @@ export class PathwaysBuilder< } }) + // Wire reset handler: leader receives reset requests and delegates to pump + this.clusterManager.onReset(async (position?: PumpState) => { + if (!this.pathwayPump) { + throw new Error("Pump not running on this leader") + } + await this.pathwayPump.reset(position) + }) + await this.clusterManager.start() this.logger.info("Cluster started", { @@ -1302,6 +1310,27 @@ export class PathwaysBuilder< this.logger.info("Pump stopped") } + /** + * Reset the data pump to a specific position or clear state and restart. + * In cluster mode, the request is routed to the leader automatically. + * + * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state + * and restarts from the live position. To replay from the very beginning, + * pass the first time bucket explicitly. + */ + async resetPump(position?: PumpState): Promise { + if (!this.pathwayPump) { + throw new Error("Pump not started — call startPump() first") + } + + if (this.clusterManager) { + await this.clusterManager.requestReset(position) + return + } + + await this.pathwayPump.reset(position) + } + /** * Converts a Zod validation error to a human-readable string * @param error The Zod validation error to convert diff --git a/src/pathways/cluster/cluster-manager.ts b/src/pathways/cluster/cluster-manager.ts index 409ab27..763d909 100644 --- a/src/pathways/cluster/cluster-manager.ts +++ b/src/pathways/cluster/cluster-manager.ts @@ -1,6 +1,7 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" import type { Logger } from "../logger.ts" import { NoopLogger } from "../logger.ts" +import type { PumpState } from "../pump/types.ts" import type { ClusterRole, ClusterSocket, @@ -11,6 +12,7 @@ import type { WsAckMessage, WsFailMessage, WsMessage, + WsResetMessage, } from "./types.ts" import { createNodeTransport } from "./node-transport.ts" @@ -89,6 +91,8 @@ export class ClusterManager { private leaderConnection: ClusterSocket | null = null private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise) | null = null private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null + private resetHandler: ((position?: PumpState) => Promise) | null = null + private pendingResets: Map void; reject: (error: Error) => void; sentAt: number }> = new Map() constructor(options: PathwayClusterOptions, logger?: Logger) { this.coordinator = options.coordinator @@ -120,6 +124,50 @@ export class ClusterManager { this.leadershipChangeHandler = handler } + /** + * Set a callback that handles pump reset requests on the leader. + */ + onReset(handler: (position?: PumpState) => Promise) { + this.resetHandler = handler + } + + /** + * Request a pump reset. Routes to the leader automatically. + * - If this instance is the leader: executes the reset directly. + * - If this instance is a worker: forwards the request to the leader via WebSocket. + */ + async requestReset(position?: PumpState): Promise { + if (this.role === "unknown") { + throw new Error("Cluster role not yet established — cannot reset") + } + + if (this.role === "leader") { + if (!this.resetHandler) { + throw new Error("No reset handler set on ClusterManager") + } + await this.resetHandler(position) + return + } + + // Worker: forward to leader via WS + if (!this.leaderConnection || this.leaderConnection.readyState !== WebSocket.OPEN) { + throw new Error("Not connected to leader — cannot forward reset") + } + + const resetId = crypto.randomUUID() + return new Promise((resolve, reject) => { + this.pendingResets.set(resetId, { resolve, reject, sentAt: Date.now() }) + + const msg: WsResetMessage = { type: "reset", resetId, position } + try { + this.leaderConnection!.send(JSON.stringify(msg)) + } catch (err) { + this.pendingResets.delete(resetId) + reject(err instanceof Error ? err : new Error(String(err))) + } + }) + } + /** * Start the cluster: register instance, begin heartbeat, attempt leader election */ @@ -222,6 +270,12 @@ export class ClusterManager { } this.pendingDeliveries.clear() + // Reject pending resets + for (const [, pending] of this.pendingResets) { + pending.reject(new Error("Cluster manager stopped")) + } + this.pendingResets.clear() + // Unregister try { await this.coordinator.unregister(this.instanceId) @@ -445,6 +499,22 @@ export class ClusterManager { socket.send(JSON.stringify({ type: "pong" })) break } + case "reset-ack": { + const pending = this.pendingResets.get(msg.resetId) + if (pending) { + this.pendingResets.delete(msg.resetId) + pending.resolve() + } + break + } + case "reset-fail": { + const pending = this.pendingResets.get(msg.resetId) + if (pending) { + this.pendingResets.delete(msg.resetId) + pending.reject(new Error(msg.error)) + } + break + } default: break } @@ -461,10 +531,10 @@ export class ClusterManager { this.workerConnections.set(address, ws) } - ws.onmessage = (event: { data: string }) => { + ws.onmessage = async (event: { data: string }) => { try { const msg: WsMessage = JSON.parse(event.data) - this.handleLeaderMessage(address, msg) + await this.handleLeaderMessage(address, msg) } catch (err) { this.logger.error( "Error handling worker response", @@ -492,7 +562,7 @@ export class ClusterManager { } } - private handleLeaderMessage(workerAddress: string, msg: WsMessage): void { + private async handleLeaderMessage(workerAddress: string, msg: WsMessage): Promise { switch (msg.type) { case "ack": { const delivery = this.pendingDeliveries.get(msg.deliveryId) @@ -513,6 +583,25 @@ export class ClusterManager { case "pong": { break } + case "reset": { + const resetMsg = msg as WsResetMessage + const ws = this.workerConnections.get(workerAddress) + if (!ws) break + try { + if (!this.resetHandler) { + throw new Error("No reset handler set on leader") + } + await this.resetHandler(resetMsg.position) + ws.send(JSON.stringify({ type: "reset-ack", resetId: resetMsg.resetId })) + } catch (err) { + ws.send(JSON.stringify({ + type: "reset-fail", + resetId: resetMsg.resetId, + error: err instanceof Error ? err.message : String(err), + })) + } + break + } default: break } @@ -582,5 +671,11 @@ export class ClusterManager { delivery.reject(new Error(`Delivery ${id} to ${delivery.workerAddress} timed out`)) } } + for (const [id, pending] of this.pendingResets) { + if (now - pending.sentAt > this.deliveryTimeoutMs) { + this.pendingResets.delete(id) + pending.reject(new Error(`Reset ${id} timed out`)) + } + } } } diff --git a/src/pathways/cluster/types.ts b/src/pathways/cluster/types.ts index 37745f7..e577dff 100644 --- a/src/pathways/cluster/types.ts +++ b/src/pathways/cluster/types.ts @@ -1,4 +1,5 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" +import type { PumpState } from "../pump/types.ts" /** * Coordinator interface for distributed cluster coordination @@ -39,6 +40,9 @@ export type WsMessage = | WsFailMessage | WsPingMessage | WsPongMessage + | WsResetMessage + | WsResetAckMessage + | WsResetFailMessage export interface WsEventsMessage { type: "events" @@ -66,6 +70,23 @@ export interface WsPongMessage { type: "pong" } +export interface WsResetMessage { + type: "reset" + resetId: string + position?: PumpState +} + +export interface WsResetAckMessage { + type: "reset-ack" + resetId: string +} + +export interface WsResetFailMessage { + type: "reset-fail" + resetId: string + error: string +} + /** * Minimal WebSocket-like interface for cross-platform compatibility */ diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index caffe85..71972de 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -1,7 +1,7 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" import type { Logger } from "../logger.ts" import { NoopLogger } from "../logger.ts" -import type { PathwayPumpOptions, PumpNotifierConfig, PumpStateManagerFactory } from "./types.ts" +import type { PathwayPumpOptions, PumpNotifierConfig, PumpState, PumpStateManager, PumpStateManagerFactory } from "./types.ts" /** * Registered pathway info needed for pump grouping @@ -28,6 +28,7 @@ export class PathwayPump { private readonly logger: Logger private pumps: Map = new Map() + private stateManagers: Map = new Map() private running = false // Required config from PathwaysBuilder @@ -94,6 +95,7 @@ export class PathwayPump { for (const [flowType, eventTypes] of flowTypeGroups) { const stateManager = this.stateManagerFactory(flowType) + this.stateManagers.set(flowType, stateManager) const notifierOptions = this.buildNotifierOptions(flowType, eventTypes) @@ -164,6 +166,45 @@ export class PathwayPump { } this.pumps.clear() + this.stateManagers.clear() + } + + /** + * Reset all pumps to a specific position, or clear state and bounce if no position given. + * Uses @flowcore/data-pump's restart() to reposition the cursor without recreating instances. + * + * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state + * and restarts pumps (pump will start from live position). + * To replay from the very beginning, pass the first time bucket explicitly. + */ + async reset(position?: PumpState): Promise { + if (!this.running) { + throw new Error("PathwayPump is not running — cannot reset") + } + + this.logger.info("Resetting data pumps", { position }) + + for (const [flowType, pump] of this.pumps) { + try { + if (position) { + await pump.restart({ timeBucket: position.timeBucket, eventId: position.eventId }) + } else { + // Clear persisted state then restart pump from live + const stateManager = this.stateManagers.get(flowType) + if (stateManager?.clearState) { + await stateManager.clearState() + } + await pump.restart({ timeBucket: new Date().toISOString().replace(/[-:T]/g, "").slice(0, 14) }) + } + this.logger.info("Data pump reset", { flowType, position }) + } catch (err) { + this.logger.error( + `Error resetting pump for ${flowType}`, + err instanceof Error ? err : new Error(String(err)), + ) + throw err + } + } } get isRunning(): boolean { diff --git a/src/pathways/pump/state.ts b/src/pathways/pump/state.ts index 1f5b0ad..54ca621 100644 --- a/src/pathways/pump/state.ts +++ b/src/pathways/pump/state.ts @@ -52,6 +52,14 @@ class PostgresPumpStateManager implements PumpStateManager { [this.flowType, state.timeBucket, state.eventId ?? null], ) } + + async clearState(): Promise { + await this.ensureInitialized() + await this.adapter.execute( + `DELETE FROM ${this.tableName} WHERE flow_type = $1`, + [this.flowType], + ) + } } /** diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index f919b62..6d07236 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -23,6 +23,7 @@ export type PumpStateManagerFactory = (flowType: string) => PumpStateManager export interface PumpStateManager { getState(): Promise | PumpState | null setState(state: PumpState): Promise | void + clearState?(): Promise | void } /**