diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index f10e0c5..1d344b7 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -20,7 +20,7 @@ import type { } from "./types.ts" import type { PathwayClusterOptions } from "./cluster/types.ts" import { ClusterManager } from "./cluster/cluster-manager.ts" -import type { PathwayPumpOptions } from "./pump/types.ts" +import type { PathwayPumpOptions, PumpState } from "./pump/types.ts" import { PathwayPump } from "./pump/pathway-pump.ts" import { PathwayProvisioner } from "./provisioner.ts" import { @@ -1176,6 +1176,14 @@ export class PathwaysBuilder< } }) + // Wire reset handler: leader receives reset requests and delegates to pump + this.clusterManager.onReset(async (position?: PumpState) => { + if (!this.pathwayPump) { + throw new Error("Pump not running on this leader") + } + await this.pathwayPump.reset(position) + }) + await this.clusterManager.start() this.logger.info("Cluster started", { @@ -1302,6 +1310,27 @@ export class PathwaysBuilder< this.logger.info("Pump stopped") } + /** + * Reset the data pump to a specific position or clear state and restart. + * In cluster mode, the request is routed to the leader automatically. + * + * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state + * and restarts from the live position. To replay from the very beginning, + * pass the first time bucket explicitly. + */ + async resetPump(position?: PumpState): Promise { + if (!this.pathwayPump) { + throw new Error("Pump not started — call startPump() first") + } + + if (this.clusterManager) { + await this.clusterManager.requestReset(position) + return + } + + await this.pathwayPump.reset(position) + } + /** * Converts a Zod validation error to a human-readable string * @param error The Zod validation error to convert diff --git a/src/pathways/cluster/cluster-manager.ts b/src/pathways/cluster/cluster-manager.ts index 409ab27..763d909 100644 --- a/src/pathways/cluster/cluster-manager.ts +++ b/src/pathways/cluster/cluster-manager.ts @@ -1,6 +1,7 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" import type { Logger } from "../logger.ts" import { NoopLogger } from "../logger.ts" +import type { PumpState } from "../pump/types.ts" import type { ClusterRole, ClusterSocket, @@ -11,6 +12,7 @@ import type { WsAckMessage, WsFailMessage, WsMessage, + WsResetMessage, } from "./types.ts" import { createNodeTransport } from "./node-transport.ts" @@ -89,6 +91,8 @@ export class ClusterManager { private leaderConnection: ClusterSocket | null = null private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise) | null = null private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null + private resetHandler: ((position?: PumpState) => Promise) | null = null + private pendingResets: Map void; reject: (error: Error) => void; sentAt: number }> = new Map() constructor(options: PathwayClusterOptions, logger?: Logger) { this.coordinator = options.coordinator @@ -120,6 +124,50 @@ export class ClusterManager { this.leadershipChangeHandler = handler } + /** + * Set a callback that handles pump reset requests on the leader. + */ + onReset(handler: (position?: PumpState) => Promise) { + this.resetHandler = handler + } + + /** + * Request a pump reset. Routes to the leader automatically. + * - If this instance is the leader: executes the reset directly. + * - If this instance is a worker: forwards the request to the leader via WebSocket. + */ + async requestReset(position?: PumpState): Promise { + if (this.role === "unknown") { + throw new Error("Cluster role not yet established — cannot reset") + } + + if (this.role === "leader") { + if (!this.resetHandler) { + throw new Error("No reset handler set on ClusterManager") + } + await this.resetHandler(position) + return + } + + // Worker: forward to leader via WS + if (!this.leaderConnection || this.leaderConnection.readyState !== WebSocket.OPEN) { + throw new Error("Not connected to leader — cannot forward reset") + } + + const resetId = crypto.randomUUID() + return new Promise((resolve, reject) => { + this.pendingResets.set(resetId, { resolve, reject, sentAt: Date.now() }) + + const msg: WsResetMessage = { type: "reset", resetId, position } + try { + this.leaderConnection!.send(JSON.stringify(msg)) + } catch (err) { + this.pendingResets.delete(resetId) + reject(err instanceof Error ? err : new Error(String(err))) + } + }) + } + /** * Start the cluster: register instance, begin heartbeat, attempt leader election */ @@ -222,6 +270,12 @@ export class ClusterManager { } this.pendingDeliveries.clear() + // Reject pending resets + for (const [, pending] of this.pendingResets) { + pending.reject(new Error("Cluster manager stopped")) + } + this.pendingResets.clear() + // Unregister try { await this.coordinator.unregister(this.instanceId) @@ -445,6 +499,22 @@ export class ClusterManager { socket.send(JSON.stringify({ type: "pong" })) break } + case "reset-ack": { + const pending = this.pendingResets.get(msg.resetId) + if (pending) { + this.pendingResets.delete(msg.resetId) + pending.resolve() + } + break + } + case "reset-fail": { + const pending = this.pendingResets.get(msg.resetId) + if (pending) { + this.pendingResets.delete(msg.resetId) + pending.reject(new Error(msg.error)) + } + break + } default: break } @@ -461,10 +531,10 @@ export class ClusterManager { this.workerConnections.set(address, ws) } - ws.onmessage = (event: { data: string }) => { + ws.onmessage = async (event: { data: string }) => { try { const msg: WsMessage = JSON.parse(event.data) - this.handleLeaderMessage(address, msg) + await this.handleLeaderMessage(address, msg) } catch (err) { this.logger.error( "Error handling worker response", @@ -492,7 +562,7 @@ export class ClusterManager { } } - private handleLeaderMessage(workerAddress: string, msg: WsMessage): void { + private async handleLeaderMessage(workerAddress: string, msg: WsMessage): Promise { switch (msg.type) { case "ack": { const delivery = this.pendingDeliveries.get(msg.deliveryId) @@ -513,6 +583,25 @@ export class ClusterManager { case "pong": { break } + case "reset": { + const resetMsg = msg as WsResetMessage + const ws = this.workerConnections.get(workerAddress) + if (!ws) break + try { + if (!this.resetHandler) { + throw new Error("No reset handler set on leader") + } + await this.resetHandler(resetMsg.position) + ws.send(JSON.stringify({ type: "reset-ack", resetId: resetMsg.resetId })) + } catch (err) { + ws.send(JSON.stringify({ + type: "reset-fail", + resetId: resetMsg.resetId, + error: err instanceof Error ? err.message : String(err), + })) + } + break + } default: break } @@ -582,5 +671,11 @@ export class ClusterManager { delivery.reject(new Error(`Delivery ${id} to ${delivery.workerAddress} timed out`)) } } + for (const [id, pending] of this.pendingResets) { + if (now - pending.sentAt > this.deliveryTimeoutMs) { + this.pendingResets.delete(id) + pending.reject(new Error(`Reset ${id} timed out`)) + } + } } } diff --git a/src/pathways/cluster/types.ts b/src/pathways/cluster/types.ts index 37745f7..e577dff 100644 --- a/src/pathways/cluster/types.ts +++ b/src/pathways/cluster/types.ts @@ -1,4 +1,5 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" +import type { PumpState } from "../pump/types.ts" /** * Coordinator interface for distributed cluster coordination @@ -39,6 +40,9 @@ export type WsMessage = | WsFailMessage | WsPingMessage | WsPongMessage + | WsResetMessage + | WsResetAckMessage + | WsResetFailMessage export interface WsEventsMessage { type: "events" @@ -66,6 +70,23 @@ export interface WsPongMessage { type: "pong" } +export interface WsResetMessage { + type: "reset" + resetId: string + position?: PumpState +} + +export interface WsResetAckMessage { + type: "reset-ack" + resetId: string +} + +export interface WsResetFailMessage { + type: "reset-fail" + resetId: string + error: string +} + /** * Minimal WebSocket-like interface for cross-platform compatibility */ diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index caffe85..71972de 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -1,7 +1,7 @@ import type { FlowcoreEvent } from "../../contracts/event.ts" import type { Logger } from "../logger.ts" import { NoopLogger } from "../logger.ts" -import type { PathwayPumpOptions, PumpNotifierConfig, PumpStateManagerFactory } from "./types.ts" +import type { PathwayPumpOptions, PumpNotifierConfig, PumpState, PumpStateManager, PumpStateManagerFactory } from "./types.ts" /** * Registered pathway info needed for pump grouping @@ -28,6 +28,7 @@ export class PathwayPump { private readonly logger: Logger private pumps: Map = new Map() + private stateManagers: Map = new Map() private running = false // Required config from PathwaysBuilder @@ -94,6 +95,7 @@ export class PathwayPump { for (const [flowType, eventTypes] of flowTypeGroups) { const stateManager = this.stateManagerFactory(flowType) + this.stateManagers.set(flowType, stateManager) const notifierOptions = this.buildNotifierOptions(flowType, eventTypes) @@ -164,6 +166,45 @@ export class PathwayPump { } this.pumps.clear() + this.stateManagers.clear() + } + + /** + * Reset all pumps to a specific position, or clear state and bounce if no position given. + * Uses @flowcore/data-pump's restart() to reposition the cursor without recreating instances. + * + * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state + * and restarts pumps (pump will start from live position). + * To replay from the very beginning, pass the first time bucket explicitly. + */ + async reset(position?: PumpState): Promise { + if (!this.running) { + throw new Error("PathwayPump is not running — cannot reset") + } + + this.logger.info("Resetting data pumps", { position }) + + for (const [flowType, pump] of this.pumps) { + try { + if (position) { + await pump.restart({ timeBucket: position.timeBucket, eventId: position.eventId }) + } else { + // Clear persisted state then restart pump from live + const stateManager = this.stateManagers.get(flowType) + if (stateManager?.clearState) { + await stateManager.clearState() + } + await pump.restart({ timeBucket: new Date().toISOString().replace(/[-:T]/g, "").slice(0, 14) }) + } + this.logger.info("Data pump reset", { flowType, position }) + } catch (err) { + this.logger.error( + `Error resetting pump for ${flowType}`, + err instanceof Error ? err : new Error(String(err)), + ) + throw err + } + } } get isRunning(): boolean { diff --git a/src/pathways/pump/state.ts b/src/pathways/pump/state.ts index 1f5b0ad..54ca621 100644 --- a/src/pathways/pump/state.ts +++ b/src/pathways/pump/state.ts @@ -52,6 +52,14 @@ class PostgresPumpStateManager implements PumpStateManager { [this.flowType, state.timeBucket, state.eventId ?? null], ) } + + async clearState(): Promise { + await this.ensureInitialized() + await this.adapter.execute( + `DELETE FROM ${this.tableName} WHERE flow_type = $1`, + [this.flowType], + ) + } } /** diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index f919b62..6d07236 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -23,6 +23,7 @@ export type PumpStateManagerFactory = (flowType: string) => PumpStateManager export interface PumpStateManager { getState(): Promise | PumpState | null setState(state: PumpState): Promise | void + clearState?(): Promise | void } /**