From 411b920991c310392427546b7c08391f672abf34 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Sat, 21 Mar 2026 12:46:20 +0000 Subject: [PATCH] fix: auto-start pump when instance becomes cluster leader --- src/pathways/builder.ts | 25 +++++++++++++++++++++++++ src/pathways/cluster/cluster-manager.ts | 11 +++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index 6c6aff6..f10e0c5 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -1151,6 +1151,31 @@ export class PathwaysBuilder< } }) + // Listen for leadership changes to auto-start/stop the pump + this.clusterManager.onLeadershipChange((isLeader: boolean) => { + if (isLeader && this.pathwayPump && !this.pathwayPump.isRunning) { + this.logger.info("Became leader, starting pump") + const registrations = Object.keys(this.pathways).map((key) => { + const [flowType, eventType] = key.split("/") + return { flowType, eventType } + }) + this.pathwayPump.start(registrations).catch((err) => { + this.logger.error( + "Failed to start pump after becoming leader", + err instanceof Error ? err : new Error(String(err)), + ) + }) + } else if (!isLeader && this.pathwayPump?.isRunning) { + this.logger.info("Lost leadership, stopping pump") + this.pathwayPump.stop().catch((err) => { + this.logger.error( + "Failed to stop pump after losing leadership", + err instanceof Error ? err : new Error(String(err)), + ) + }) + } + }) + await this.clusterManager.start() this.logger.info("Cluster started", { diff --git a/src/pathways/cluster/cluster-manager.ts b/src/pathways/cluster/cluster-manager.ts index cfd2f55..e19c829 100644 --- a/src/pathways/cluster/cluster-manager.ts +++ b/src/pathways/cluster/cluster-manager.ts @@ -89,6 +89,7 @@ export class ClusterManager { private wsServer: { shutdown(): Promise } | null = null private leaderConnection: ClusterSocket | null = null private eventHandler: ((pathway: string, event: FlowcoreEvent) => Promise) | null = null + private leadershipChangeHandler: ((isLeader: boolean) => void) | null = null constructor(options: PathwayClusterOptions, logger?: Logger) { this.coordinator = options.coordinator @@ -112,6 +113,14 @@ export class ClusterManager { this.eventHandler = handler } + /** + * Set a callback that fires when this instance becomes or loses leadership. + * Used by PathwaysBuilder to auto-start the pump when becoming leader. + */ + onLeadershipChange(handler: (isLeader: boolean) => void) { + this.leadershipChangeHandler = handler + } + /** * Start the cluster: register instance, begin heartbeat, attempt leader election */ @@ -301,6 +310,7 @@ export class ClusterManager { this.logger.warn("Lost leader lease", { instanceId: this.instanceId }) this.role = "worker" this.cleanupLeaderState() + this.leadershipChangeHandler?.(false) } else { await this.refreshWorkers() } @@ -311,6 +321,7 @@ export class ClusterManager { private async onBecomeLeader(): Promise { await this.refreshWorkers() + this.leadershipChangeHandler?.(true) } private async refreshWorkers(): Promise {