Skip to content

Commit 03f110b

Browse files
jbiskurclaude
andauthored
feat: add cluster-aware pump reset functionality (#51)
Add resetPump(position?) to PathwaysBuilder that repositions the data pump cursor. In cluster mode, reset requests from workers are automatically forwarded to the leader via WebSocket. Supports resetting to a specific timeBucket/eventId or clearing persisted state entirely. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 46b1719 commit 03f110b

6 files changed

Lines changed: 200 additions & 5 deletions

File tree

src/pathways/builder.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type {
2020
} from "./types.ts"
2121
import type { PathwayClusterOptions } from "./cluster/types.ts"
2222
import { ClusterManager } from "./cluster/cluster-manager.ts"
23-
import type { PathwayPumpOptions } from "./pump/types.ts"
23+
import type { PathwayPumpOptions, PumpState } from "./pump/types.ts"
2424
import { PathwayPump } from "./pump/pathway-pump.ts"
2525
import { PathwayProvisioner } from "./provisioner.ts"
2626
import {
@@ -1176,6 +1176,14 @@ export class PathwaysBuilder<
11761176
}
11771177
})
11781178

1179+
// Wire reset handler: leader receives reset requests and delegates to pump
1180+
this.clusterManager.onReset(async (position?: PumpState) => {
1181+
if (!this.pathwayPump) {
1182+
throw new Error("Pump not running on this leader")
1183+
}
1184+
await this.pathwayPump.reset(position)
1185+
})
1186+
11791187
await this.clusterManager.start()
11801188

11811189
this.logger.info("Cluster started", {
@@ -1302,6 +1310,27 @@ export class PathwaysBuilder<
13021310
this.logger.info("Pump stopped")
13031311
}
13041312

1313+
/**
1314+
* Reset the data pump to a specific position or clear state and restart.
1315+
* In cluster mode, the request is routed to the leader automatically.
1316+
*
1317+
* @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state
1318+
* and restarts from the live position. To replay from the very beginning,
1319+
* pass the first time bucket explicitly.
1320+
*/
1321+
async resetPump(position?: PumpState): Promise<void> {
1322+
if (!this.pathwayPump) {
1323+
throw new Error("Pump not started — call startPump() first")
1324+
}
1325+
1326+
if (this.clusterManager) {
1327+
await this.clusterManager.requestReset(position)
1328+
return
1329+
}
1330+
1331+
await this.pathwayPump.reset(position)
1332+
}
1333+
13051334
/**
13061335
* Converts a Zod validation error to a human-readable string
13071336
* @param error The Zod validation error to convert

src/pathways/cluster/cluster-manager.ts

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { FlowcoreEvent } from "../../contracts/event.ts"
22
import type { Logger } from "../logger.ts"
33
import { NoopLogger } from "../logger.ts"
4+
import type { PumpState } from "../pump/types.ts"
45
import type {
56
ClusterRole,
67
ClusterSocket,
@@ -11,6 +12,7 @@ import type {
1112
WsAckMessage,
1213
WsFailMessage,
1314
WsMessage,
15+
WsResetMessage,
1416
} from "./types.ts"
1517
import { createNodeTransport } from "./node-transport.ts"
1618

@@ -89,6 +91,8 @@ export class ClusterManager {
8991
private leaderConnection: ClusterSocket | null = null
9092
private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise<void>) | null = null
9193
private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null
94+
private resetHandler: ((position?: PumpState) => Promise<void>) | null = null
95+
private pendingResets: Map<string, { resolve: () => void; reject: (error: Error) => void; sentAt: number }> = new Map()
9296

9397
constructor(options: PathwayClusterOptions, logger?: Logger) {
9498
this.coordinator = options.coordinator
@@ -120,6 +124,50 @@ export class ClusterManager {
120124
this.leadershipChangeHandler = handler
121125
}
122126

127+
/**
128+
* Set a callback that handles pump reset requests on the leader.
129+
*/
130+
onReset(handler: (position?: PumpState) => Promise<void>) {
131+
this.resetHandler = handler
132+
}
133+
134+
/**
135+
* Request a pump reset. Routes to the leader automatically.
136+
* - If this instance is the leader: executes the reset directly.
137+
* - If this instance is a worker: forwards the request to the leader via WebSocket.
138+
*/
139+
async requestReset(position?: PumpState): Promise<void> {
140+
if (this.role === "unknown") {
141+
throw new Error("Cluster role not yet established — cannot reset")
142+
}
143+
144+
if (this.role === "leader") {
145+
if (!this.resetHandler) {
146+
throw new Error("No reset handler set on ClusterManager")
147+
}
148+
await this.resetHandler(position)
149+
return
150+
}
151+
152+
// Worker: forward to leader via WS
153+
if (!this.leaderConnection || this.leaderConnection.readyState !== WebSocket.OPEN) {
154+
throw new Error("Not connected to leader — cannot forward reset")
155+
}
156+
157+
const resetId = crypto.randomUUID()
158+
return new Promise<void>((resolve, reject) => {
159+
this.pendingResets.set(resetId, { resolve, reject, sentAt: Date.now() })
160+
161+
const msg: WsResetMessage = { type: "reset", resetId, position }
162+
try {
163+
this.leaderConnection!.send(JSON.stringify(msg))
164+
} catch (err) {
165+
this.pendingResets.delete(resetId)
166+
reject(err instanceof Error ? err : new Error(String(err)))
167+
}
168+
})
169+
}
170+
123171
/**
124172
* Start the cluster: register instance, begin heartbeat, attempt leader election
125173
*/
@@ -222,6 +270,12 @@ export class ClusterManager {
222270
}
223271
this.pendingDeliveries.clear()
224272

273+
// Reject pending resets
274+
for (const [, pending] of this.pendingResets) {
275+
pending.reject(new Error("Cluster manager stopped"))
276+
}
277+
this.pendingResets.clear()
278+
225279
// Unregister
226280
try {
227281
await this.coordinator.unregister(this.instanceId)
@@ -445,6 +499,22 @@ export class ClusterManager {
445499
socket.send(JSON.stringify({ type: "pong" }))
446500
break
447501
}
502+
case "reset-ack": {
503+
const pending = this.pendingResets.get(msg.resetId)
504+
if (pending) {
505+
this.pendingResets.delete(msg.resetId)
506+
pending.resolve()
507+
}
508+
break
509+
}
510+
case "reset-fail": {
511+
const pending = this.pendingResets.get(msg.resetId)
512+
if (pending) {
513+
this.pendingResets.delete(msg.resetId)
514+
pending.reject(new Error(msg.error))
515+
}
516+
break
517+
}
448518
default:
449519
break
450520
}
@@ -461,10 +531,10 @@ export class ClusterManager {
461531
this.workerConnections.set(address, ws)
462532
}
463533

464-
ws.onmessage = (event: { data: string }) => {
534+
ws.onmessage = async (event: { data: string }) => {
465535
try {
466536
const msg: WsMessage = JSON.parse(event.data)
467-
this.handleLeaderMessage(address, msg)
537+
await this.handleLeaderMessage(address, msg)
468538
} catch (err) {
469539
this.logger.error(
470540
"Error handling worker response",
@@ -492,7 +562,7 @@ export class ClusterManager {
492562
}
493563
}
494564

495-
private handleLeaderMessage(workerAddress: string, msg: WsMessage): void {
565+
private async handleLeaderMessage(workerAddress: string, msg: WsMessage): Promise<void> {
496566
switch (msg.type) {
497567
case "ack": {
498568
const delivery = this.pendingDeliveries.get(msg.deliveryId)
@@ -513,6 +583,25 @@ export class ClusterManager {
513583
case "pong": {
514584
break
515585
}
586+
case "reset": {
587+
const resetMsg = msg as WsResetMessage
588+
const ws = this.workerConnections.get(workerAddress)
589+
if (!ws) break
590+
try {
591+
if (!this.resetHandler) {
592+
throw new Error("No reset handler set on leader")
593+
}
594+
await this.resetHandler(resetMsg.position)
595+
ws.send(JSON.stringify({ type: "reset-ack", resetId: resetMsg.resetId }))
596+
} catch (err) {
597+
ws.send(JSON.stringify({
598+
type: "reset-fail",
599+
resetId: resetMsg.resetId,
600+
error: err instanceof Error ? err.message : String(err),
601+
}))
602+
}
603+
break
604+
}
516605
default:
517606
break
518607
}
@@ -582,5 +671,11 @@ export class ClusterManager {
582671
delivery.reject(new Error(`Delivery ${id} to ${delivery.workerAddress} timed out`))
583672
}
584673
}
674+
for (const [id, pending] of this.pendingResets) {
675+
if (now - pending.sentAt > this.deliveryTimeoutMs) {
676+
this.pendingResets.delete(id)
677+
pending.reject(new Error(`Reset ${id} timed out`))
678+
}
679+
}
585680
}
586681
}

src/pathways/cluster/types.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { FlowcoreEvent } from "../../contracts/event.ts"
2+
import type { PumpState } from "../pump/types.ts"
23

34
/**
45
* Coordinator interface for distributed cluster coordination
@@ -39,6 +40,9 @@ export type WsMessage =
3940
| WsFailMessage
4041
| WsPingMessage
4142
| WsPongMessage
43+
| WsResetMessage
44+
| WsResetAckMessage
45+
| WsResetFailMessage
4246

4347
export interface WsEventsMessage {
4448
type: "events"
@@ -66,6 +70,23 @@ export interface WsPongMessage {
6670
type: "pong"
6771
}
6872

73+
export interface WsResetMessage {
74+
type: "reset"
75+
resetId: string
76+
position?: PumpState
77+
}
78+
79+
export interface WsResetAckMessage {
80+
type: "reset-ack"
81+
resetId: string
82+
}
83+
84+
export interface WsResetFailMessage {
85+
type: "reset-fail"
86+
resetId: string
87+
error: string
88+
}
89+
6990
/**
7091
* Minimal WebSocket-like interface for cross-platform compatibility
7192
*/

src/pathways/pump/pathway-pump.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { FlowcoreEvent } from "../../contracts/event.ts"
22
import type { Logger } from "../logger.ts"
33
import { NoopLogger } from "../logger.ts"
4-
import type { PathwayPumpOptions, PumpNotifierConfig, PumpStateManagerFactory } from "./types.ts"
4+
import type { PathwayPumpOptions, PumpNotifierConfig, PumpState, PumpStateManager, PumpStateManagerFactory } from "./types.ts"
55

66
/**
77
* Registered pathway info needed for pump grouping
@@ -28,6 +28,7 @@ export class PathwayPump {
2828
private readonly logger: Logger
2929

3030
private pumps: Map<string, DataPumpInstance> = new Map()
31+
private stateManagers: Map<string, PumpStateManager> = new Map()
3132
private running = false
3233

3334
// Required config from PathwaysBuilder
@@ -94,6 +95,7 @@ export class PathwayPump {
9495

9596
for (const [flowType, eventTypes] of flowTypeGroups) {
9697
const stateManager = this.stateManagerFactory(flowType)
98+
this.stateManagers.set(flowType, stateManager)
9799

98100
const notifierOptions = this.buildNotifierOptions(flowType, eventTypes)
99101

@@ -164,6 +166,45 @@ export class PathwayPump {
164166
}
165167

166168
this.pumps.clear()
169+
this.stateManagers.clear()
170+
}
171+
172+
/**
173+
* Reset all pumps to a specific position, or clear state and bounce if no position given.
174+
* Uses @flowcore/data-pump's restart() to reposition the cursor without recreating instances.
175+
*
176+
* @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state
177+
* and restarts pumps (pump will start from live position).
178+
* To replay from the very beginning, pass the first time bucket explicitly.
179+
*/
180+
async reset(position?: PumpState): Promise<void> {
181+
if (!this.running) {
182+
throw new Error("PathwayPump is not running — cannot reset")
183+
}
184+
185+
this.logger.info("Resetting data pumps", { position })
186+
187+
for (const [flowType, pump] of this.pumps) {
188+
try {
189+
if (position) {
190+
await pump.restart({ timeBucket: position.timeBucket, eventId: position.eventId })
191+
} else {
192+
// Clear persisted state then restart pump from live
193+
const stateManager = this.stateManagers.get(flowType)
194+
if (stateManager?.clearState) {
195+
await stateManager.clearState()
196+
}
197+
await pump.restart({ timeBucket: new Date().toISOString().replace(/[-:T]/g, "").slice(0, 14) })
198+
}
199+
this.logger.info("Data pump reset", { flowType, position })
200+
} catch (err) {
201+
this.logger.error(
202+
`Error resetting pump for ${flowType}`,
203+
err instanceof Error ? err : new Error(String(err)),
204+
)
205+
throw err
206+
}
207+
}
167208
}
168209

169210
get isRunning(): boolean {

src/pathways/pump/state.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ class PostgresPumpStateManager implements PumpStateManager {
5252
[this.flowType, state.timeBucket, state.eventId ?? null],
5353
)
5454
}
55+
56+
async clearState(): Promise<void> {
57+
await this.ensureInitialized()
58+
await this.adapter.execute(
59+
`DELETE FROM ${this.tableName} WHERE flow_type = $1`,
60+
[this.flowType],
61+
)
62+
}
5563
}
5664

5765
/**

src/pathways/pump/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export type PumpStateManagerFactory = (flowType: string) => PumpStateManager
2323
export interface PumpStateManager {
2424
getState(): Promise<PumpState | null> | PumpState | null
2525
setState(state: PumpState): Promise<void> | void
26+
clearState?(): Promise<void> | void
2627
}
2728

2829
/**

0 commit comments

Comments
 (0)