From 2012dff0ac8acc1e0966d326673d1eb5f961b416 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 19 Mar 2026 15:21:49 +0000 Subject: [PATCH 1/5] feat: add Kubernetes integration test suite for cluster mode Adds a full k3d-based integration test that deploys 3 replicas with a real PostgreSQL coordinator to verify leader election, WS mesh, and event distribution across pods. Also adds dataSourceOverride passthrough to FlowcoreDataPumpCluster for test injection. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/validate.yml | 35 ++++++ deno.json | 6 +- integration/app/Dockerfile | 8 ++ integration/app/deps.ts | 3 + integration/app/fake-data-source.ts | 93 +++++++++++++++ integration/app/main.ts | 133 ++++++++++++++++++++++ integration/app/postgres-coordinator.ts | 64 +++++++++++ integration/app/postgres-state-manager.ts | 26 +++++ integration/k8s/configmap.yaml | 8 ++ integration/k8s/namespace.yaml | 4 + integration/k8s/postgres.yaml | 44 +++++++ integration/k8s/test-app.yaml | 51 +++++++++ integration/run-integration.sh | 30 +++++ integration/scripts/deploy.sh | 32 ++++++ integration/scripts/setup-k3d.sh | 44 +++++++ integration/scripts/teardown.sh | 13 +++ integration/scripts/verify.sh | 96 ++++++++++++++++ src/data-pump/data-pump-cluster.ts | 4 +- src/data-pump/data-pump.ts | 12 +- 19 files changed, 697 insertions(+), 9 deletions(-) create mode 100644 integration/app/Dockerfile create mode 100644 integration/app/deps.ts create mode 100644 integration/app/fake-data-source.ts create mode 100644 integration/app/main.ts create mode 100644 integration/app/postgres-coordinator.ts create mode 100644 integration/app/postgres-state-manager.ts create mode 100644 integration/k8s/configmap.yaml create mode 100644 integration/k8s/namespace.yaml create mode 100644 integration/k8s/postgres.yaml create mode 100644 integration/k8s/test-app.yaml create mode 100755 integration/run-integration.sh create mode 100755 integration/scripts/deploy.sh create mode 100755 integration/scripts/setup-k3d.sh create mode 100755 integration/scripts/teardown.sh create mode 100755 integration/scripts/verify.sh diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 3a6a3bb..91ec314 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -43,3 +43,38 @@ jobs: run: deno test -A - name: Validate NPM build run: deno run -A bin/build-npm.ts + + integration-test: + runs-on: blacksmith-4vcpu-ubuntu-2204 + needs: build + steps: + - uses: actions/checkout@v3 + with: + token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }} + submodules: true + - name: Setup Deno2 + uses: denoland/setup-deno@v2 + with: + deno-version: v2.x + - name: Install Docker + run: | + if ! command -v docker &>/dev/null; then + curl -fsSL https://get.docker.com | sh + fi + docker version + - name: Install k3d + run: curl -s https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash + - name: Install kubectl + run: | + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x kubectl && sudo mv kubectl /usr/local/bin/ + - name: Run integration tests + run: bash integration/run-integration.sh + - name: Collect logs on failure + if: failure() + run: | + kubectl logs -n data-pump-integration-test -l app=data-pump-test --tail=200 || true + kubectl logs -n data-pump-integration-test -l app=postgres --tail=100 || true + - name: Teardown + if: always() + run: bash integration/scripts/teardown.sh || true diff --git a/deno.json b/deno.json index e5ef75e..fbf03d9 100644 --- a/deno.json +++ b/deno.json @@ -32,7 +32,8 @@ "node_modules", "npm", ".github", - "CHANGELOG.md" + "CHANGELOG.md", + "integration" ], "lineWidth": 120, "indentWidth": 2, @@ -42,7 +43,8 @@ "lint": { "exclude": [ "node_modules", - "npm" + "npm", + "integration" ] } } diff --git a/integration/app/Dockerfile b/integration/app/Dockerfile new file mode 100644 index 0000000..e06f948 --- /dev/null +++ b/integration/app/Dockerfile @@ -0,0 +1,8 @@ +FROM denoland/deno:2.0.0 +WORKDIR /app +COPY deno.json deno.lock ./ +COPY src/ ./src/ +COPY integration/app/ ./integration/app/ +RUN deno install +EXPOSE 8080 +CMD ["deno", "run", "-A", "integration/app/main.ts"] diff --git a/integration/app/deps.ts b/integration/app/deps.ts new file mode 100644 index 0000000..150de5b --- /dev/null +++ b/integration/app/deps.ts @@ -0,0 +1,3 @@ +export { default as pg } from "npm:pg@^8.13.0" +export const { Client } = await import("npm:pg@^8.13.0") +export type { Client as PgClient } from "npm:pg@^8.13.0" diff --git a/integration/app/fake-data-source.ts b/integration/app/fake-data-source.ts new file mode 100644 index 0000000..bddfca6 --- /dev/null +++ b/integration/app/fake-data-source.ts @@ -0,0 +1,93 @@ +import type { EventListOutput, FlowcoreEvent } from "@flowcore/sdk" +import { TimeUuid } from "@flowcore/time-uuid" +import { utc } from "@date-fns/utc" +import { format, startOfHour } from "date-fns" +import { FlowcoreDataSource } from "../../src/data-pump/data-source.ts" +import type { FlowcoreDataPumpState } from "../../src/data-pump/types.ts" + +export class FakeDataSource extends FlowcoreDataSource { + private readonly events: FlowcoreEvent[] + private readonly timeBucket: string + private delivered = false + + constructor(totalEvents: number) { + super({ + auth: { getBearerToken: () => Promise.resolve("fake") }, + dataSource: { + tenant: "integration-test", + dataCore: "test-data-core", + flowType: "test-flow-type", + eventTypes: ["test-event"], + }, + noTranslation: true, + }) + + this.timeBucket = format(startOfHour(utc(new Date())), "yyyyMMddHH0000") + this.events = [] + + for (let i = 0; i < totalEvents; i++) { + const timeUuid = TimeUuid.now() + this.events.push({ + eventId: timeUuid.toString(), + eventType: "test-event", + aggregator: `agg-${i}`, + payload: { index: i, data: `test-payload-${i}` }, + metadata: {}, + timeBucket: this.timeBucket, + validTime: new Date().toISOString(), + }) + } + + // pre-set cached IDs so the parent class never calls Flowcore API + this.tenantId = "integration-test" + this.dataCoreId = "test-data-core" + this.flowTypeId = "test-flow-type" + this.eventTypeIds = ["test-event"] + this.timeBuckets = [this.timeBucket] + } + + public override async getEvents( + _from: FlowcoreDataPumpState, + amount: number, + _toEventId?: string, + _cursor?: string, + _includeSensitiveData?: boolean, + ): Promise { + if (this.delivered) { + return { events: [], nextCursor: undefined } + } + this.delivered = true + return { + events: this.events.slice(0, amount), + nextCursor: undefined, + } + } + + public override async getTimeBuckets(_force?: boolean): Promise { + return [this.timeBucket] + } + + public override async getNextTimeBucket(_timeBucket: string): Promise { + return null + } + + public override async getClosestTimeBucket(_timeBucket: string, _getBefore?: boolean): Promise { + return this.timeBucket + } + + public override async getTenantId(): Promise { + return "integration-test" + } + + public override async getDataCoreId(): Promise { + return "test-data-core" + } + + public override async getFlowTypeId(): Promise { + return "test-flow-type" + } + + public override async getEventTypeIds(): Promise { + return ["test-event"] + } +} diff --git a/integration/app/main.ts b/integration/app/main.ts new file mode 100644 index 0000000..914d22c --- /dev/null +++ b/integration/app/main.ts @@ -0,0 +1,133 @@ +import { Client } from "npm:pg@^8.13.0" +import { FlowcoreDataPumpCluster } from "../../src/data-pump/data-pump-cluster.ts" +import { PostgresCoordinator } from "./postgres-coordinator.ts" +import { PostgresStateManager } from "./postgres-state-manager.ts" +import { FakeDataSource } from "./fake-data-source.ts" + +const DATABASE_URL = Deno.env.get("DATABASE_URL") ?? "postgres://postgres:postgres@localhost:5432/datapump_test" +const POD_NAME = Deno.env.get("POD_NAME") ?? "local-pod" +const POD_IP = Deno.env.get("POD_IP") ?? "127.0.0.1" +const TOTAL_EVENTS = parseInt(Deno.env.get("TOTAL_EVENTS") ?? "100", 10) +const WS_PORT = parseInt(Deno.env.get("WS_PORT") ?? "8080", 10) + +const log = { + debug: (msg: string, meta?: Record) => console.log(`[DEBUG] [${POD_NAME}] ${msg}`, meta ?? ""), + info: (msg: string, meta?: Record) => console.log(`[INFO] [${POD_NAME}] ${msg}`, meta ?? ""), + warn: (msg: string, meta?: Record) => console.warn(`[WARN] [${POD_NAME}] ${msg}`, meta ?? ""), + error: (msg: string | Error, meta?: Record) => + console.error(`[ERROR] [${POD_NAME}] ${msg}`, meta ?? ""), +} + +// connect to PG +const db = new Client({ connectionString: DATABASE_URL }) +await db.connect() +log.info("Connected to PostgreSQL") + +// create tables +await db.query(` + CREATE TABLE IF NOT EXISTS flowcore_pump_leases ( + key TEXT PRIMARY KEY, + holder TEXT NOT NULL, + expires_at TIMESTAMPTZ NOT NULL + ); + + CREATE TABLE IF NOT EXISTS flowcore_pump_instances ( + instance_id TEXT PRIMARY KEY, + address TEXT NOT NULL, + last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE TABLE IF NOT EXISTS pump_state ( + id TEXT PRIMARY KEY, + time_bucket TEXT NOT NULL, + event_id TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE TABLE IF NOT EXISTS processed_events ( + id SERIAL PRIMARY KEY, + pod_name TEXT NOT NULL, + event_id TEXT NOT NULL, + processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); +`) +log.info("Database tables ready") + +// create components +const coordinator = new PostgresCoordinator(db) +const stateManager = new PostgresStateManager(db) +const fakeDataSource = new FakeDataSource(TOTAL_EVENTS) + +// create cluster +const cluster = new FlowcoreDataPumpCluster({ + auth: { getBearerToken: () => Promise.resolve("fake") }, + dataSource: { + tenant: "integration-test", + dataCore: "test-data-core", + flowType: "test-flow-type", + eventTypes: ["test-event"], + }, + stateManager, + coordinator, + dataSourceOverride: fakeDataSource, + advertisedAddress: `ws://${POD_IP}:${WS_PORT}`, + noTranslation: true, + processor: { + concurrency: 5, + handler: async (events) => { + for (const event of events) { + await db.query(`INSERT INTO processed_events (pod_name, event_id) VALUES ($1, $2)`, [ + POD_NAME, + event.eventId, + ]) + } + log.info(`Processed ${events.length} events`) + }, + }, + notifier: { type: "poller", intervalMs: 2000 }, + leaseTtlMs: 15000, + leaseRenewIntervalMs: 5000, + heartbeatIntervalMs: 3000, + workerConcurrency: 5, + logger: log, +}) + +// start HTTP + WS server +Deno.serve({ port: WS_PORT }, (req) => { + const url = new URL(req.url) + + if (url.pathname === "/health") { + return new Response( + JSON.stringify({ + instanceId: cluster.id, + isLeader: cluster.isLeaderInstance, + workerCount: cluster.activeWorkerCount, + isRunning: cluster.isRunning, + podName: POD_NAME, + }), + { headers: { "content-type": "application/json" } }, + ) + } + + if (req.headers.get("upgrade")?.toLowerCase() === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req) + cluster.handleConnection(socket) + return response + } + + return new Response("Not found", { status: 404 }) +}) + +log.info(`HTTP/WS server listening on :${WS_PORT}`) + +// start cluster +await cluster.start() +log.info("Cluster started") + +// graceful shutdown +Deno.addSignalListener("SIGTERM", async () => { + log.info("SIGTERM received, shutting down...") + await cluster.stop() + await db.end() + Deno.exit(0) +}) diff --git a/integration/app/postgres-coordinator.ts b/integration/app/postgres-coordinator.ts new file mode 100644 index 0000000..29c363a --- /dev/null +++ b/integration/app/postgres-coordinator.ts @@ -0,0 +1,64 @@ +import type { Client as PgClient } from "npm:pg@^8.13.0" +import type { FlowcoreDataPumpCoordinator } from "../../src/data-pump/types.ts" + +export class PostgresCoordinator implements FlowcoreDataPumpCoordinator { + constructor(private readonly db: PgClient) {} + + async acquireLease(instanceId: string, key: string, ttlMs: number): Promise { + const expiresAt = new Date(Date.now() + ttlMs) + const result = await this.db.query( + `INSERT INTO flowcore_pump_leases (key, holder, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (key) DO UPDATE + SET holder = $2, expires_at = $3 + WHERE flowcore_pump_leases.expires_at < NOW() OR flowcore_pump_leases.holder = $2 + RETURNING holder`, + [key, instanceId, expiresAt], + ) + return result.rowCount > 0 && result.rows[0].holder === instanceId + } + + async renewLease(instanceId: string, key: string, ttlMs: number): Promise { + const expiresAt = new Date(Date.now() + ttlMs) + const result = await this.db.query( + `UPDATE flowcore_pump_leases SET expires_at = $3 WHERE key = $1 AND holder = $2`, + [key, instanceId, expiresAt], + ) + return (result.rowCount ?? 0) > 0 + } + + async releaseLease(instanceId: string, key: string): Promise { + await this.db.query(`DELETE FROM flowcore_pump_leases WHERE key = $1 AND holder = $2`, [key, instanceId]) + } + + async register(instanceId: string, address: string): Promise { + await this.db.query( + `INSERT INTO flowcore_pump_instances (instance_id, address, last_heartbeat) + VALUES ($1, $2, NOW()) + ON CONFLICT (instance_id) DO UPDATE SET address = $2, last_heartbeat = NOW()`, + [instanceId, address], + ) + } + + async heartbeat(instanceId: string): Promise { + await this.db.query(`UPDATE flowcore_pump_instances SET last_heartbeat = NOW() WHERE instance_id = $1`, [ + instanceId, + ]) + } + + async unregister(instanceId: string): Promise { + await this.db.query(`DELETE FROM flowcore_pump_instances WHERE instance_id = $1`, [instanceId]) + } + + async getInstances(staleThresholdMs: number): Promise> { + const threshold = new Date(Date.now() - staleThresholdMs) + const result = await this.db.query( + `SELECT instance_id, address FROM flowcore_pump_instances WHERE last_heartbeat > $1`, + [threshold], + ) + return result.rows.map((row: { instance_id: string; address: string }) => ({ + instanceId: row.instance_id, + address: row.address, + })) + } +} diff --git a/integration/app/postgres-state-manager.ts b/integration/app/postgres-state-manager.ts new file mode 100644 index 0000000..2f90519 --- /dev/null +++ b/integration/app/postgres-state-manager.ts @@ -0,0 +1,26 @@ +import type { Client as PgClient } from "npm:pg@^8.13.0" +import type { FlowcoreDataPumpState, FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts" + +export class PostgresStateManager implements FlowcoreDataPumpStateManager { + constructor(private readonly db: PgClient) {} + + async getState(): Promise { + const result = await this.db.query( + `SELECT time_bucket, event_id FROM pump_state ORDER BY updated_at DESC LIMIT 1`, + ) + if (result.rows.length === 0) return null + return { + timeBucket: result.rows[0].time_bucket, + eventId: result.rows[0].event_id, + } + } + + async setState(state: FlowcoreDataPumpState): Promise { + await this.db.query( + `INSERT INTO pump_state (id, time_bucket, event_id, updated_at) + VALUES ('default', $1, $2, NOW()) + ON CONFLICT (id) DO UPDATE SET time_bucket = $1, event_id = $2, updated_at = NOW()`, + [state.timeBucket, state.eventId], + ) + } +} diff --git a/integration/k8s/configmap.yaml b/integration/k8s/configmap.yaml new file mode 100644 index 0000000..e3f8760 --- /dev/null +++ b/integration/k8s/configmap.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: data-pump-config + namespace: data-pump-integration-test +data: + DATABASE_URL: "postgres://postgres:postgres@postgres:5432/datapump_test" + TOTAL_EVENTS: "100" diff --git a/integration/k8s/namespace.yaml b/integration/k8s/namespace.yaml new file mode 100644 index 0000000..12b24ea --- /dev/null +++ b/integration/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: data-pump-integration-test diff --git a/integration/k8s/postgres.yaml b/integration/k8s/postgres.yaml new file mode 100644 index 0000000..bddba4b --- /dev/null +++ b/integration/k8s/postgres.yaml @@ -0,0 +1,44 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: data-pump-integration-test +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: postgres:16-alpine + ports: + - containerPort: 5432 + env: + - name: POSTGRES_DB + value: datapump_test + - name: POSTGRES_USER + value: postgres + - name: POSTGRES_PASSWORD + value: postgres + readinessProbe: + exec: + command: ["pg_isready", "-U", "postgres", "-d", "datapump_test"] + initialDelaySeconds: 5 + periodSeconds: 5 +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: data-pump-integration-test +spec: + selector: + app: postgres + ports: + - port: 5432 + targetPort: 5432 diff --git a/integration/k8s/test-app.yaml b/integration/k8s/test-app.yaml new file mode 100644 index 0000000..dae127b --- /dev/null +++ b/integration/k8s/test-app.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: data-pump-test + namespace: data-pump-integration-test +spec: + replicas: 3 + selector: + matchLabels: + app: data-pump-test + template: + metadata: + labels: + app: data-pump-test + spec: + containers: + - name: data-pump + image: data-pump-integration-test:local + ports: + - containerPort: 8080 + envFrom: + - configMapRef: + name: data-pump-config + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 +--- +apiVersion: v1 +kind: Service +metadata: + name: data-pump-test + namespace: data-pump-integration-test +spec: + clusterIP: None + selector: + app: data-pump-test + ports: + - port: 8080 + targetPort: 8080 diff --git a/integration/run-integration.sh b/integration/run-integration.sh new file mode 100755 index 0000000..ab1a0a8 --- /dev/null +++ b/integration/run-integration.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "==========================================" +echo " Data Pump K8s Integration Test" +echo "==========================================" + +# ensure teardown on exit +trap '${SCRIPT_DIR}/scripts/teardown.sh || true' EXIT + +# 1. Setup k3d cluster +echo "" +echo ">>> Step 1: Setup k3d cluster" +bash "${SCRIPT_DIR}/scripts/setup-k3d.sh" + +# 2. Build & deploy +echo "" +echo ">>> Step 2: Build & deploy" +bash "${SCRIPT_DIR}/scripts/deploy.sh" + +# 3. Verify +echo "" +echo ">>> Step 3: Verify" +bash "${SCRIPT_DIR}/scripts/verify.sh" + +# teardown happens via trap +echo "" +echo ">>> Integration test complete" diff --git a/integration/scripts/deploy.sh b/integration/scripts/deploy.sh new file mode 100755 index 0000000..b442aaf --- /dev/null +++ b/integration/scripts/deploy.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)" +CLUSTER_NAME="data-pump-integration" +IMAGE_NAME="data-pump-integration-test:local" +NAMESPACE="data-pump-integration-test" + +echo "==> Building Docker image: ${IMAGE_NAME}" +docker build -t "${IMAGE_NAME}" -f "${ROOT_DIR}/integration/app/Dockerfile" "${ROOT_DIR}" + +echo "==> Importing image into k3d cluster..." +k3d image import "${IMAGE_NAME}" -c "${CLUSTER_NAME}" + +echo "==> Applying Kubernetes manifests..." +kubectl apply -f "${ROOT_DIR}/integration/k8s/namespace.yaml" +kubectl apply -f "${ROOT_DIR}/integration/k8s/postgres.yaml" + +echo "==> Waiting for PostgreSQL to be ready..." +kubectl rollout status deployment/postgres -n "${NAMESPACE}" --timeout=120s +kubectl wait --for=condition=ready pod -l app=postgres -n "${NAMESPACE}" --timeout=120s + +echo "==> Deploying ConfigMap and test app..." +kubectl apply -f "${ROOT_DIR}/integration/k8s/configmap.yaml" +kubectl apply -f "${ROOT_DIR}/integration/k8s/test-app.yaml" + +echo "==> Waiting for test app pods to be ready..." +kubectl rollout status deployment/data-pump-test -n "${NAMESPACE}" --timeout=180s + +echo "==> Deploy complete" +kubectl get pods -n "${NAMESPACE}" diff --git a/integration/scripts/setup-k3d.sh b/integration/scripts/setup-k3d.sh new file mode 100755 index 0000000..a58baf2 --- /dev/null +++ b/integration/scripts/setup-k3d.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +set -euo pipefail + +CLUSTER_NAME="data-pump-integration" + +echo "==> Setting up k3d cluster: ${CLUSTER_NAME}" + +# install k3d if missing +if ! command -v k3d &>/dev/null; then + echo "==> Installing k3d..." + curl -s https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash +fi + +# install kubectl if missing +if ! command -v kubectl &>/dev/null; then + echo "==> Installing kubectl..." + if [[ "$(uname -s)" == "Linux" ]]; then + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x kubectl && sudo mv kubectl /usr/local/bin/ + elif [[ "$(uname -s)" == "Darwin" ]]; then + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/darwin/arm64/kubectl" + chmod +x kubectl && sudo mv kubectl /usr/local/bin/ + fi +fi + +# delete existing cluster if present (clean slate) +if k3d cluster list | grep -q "${CLUSTER_NAME}"; then + echo "==> Deleting existing cluster..." + k3d cluster delete "${CLUSTER_NAME}" +fi + +# create cluster +echo "==> Creating k3d cluster..." +k3d cluster create "${CLUSTER_NAME}" \ + --agents 1 \ + --wait \ + --timeout 120s + +# verify +echo "==> Verifying cluster..." +kubectl cluster-info +kubectl get nodes + +echo "==> k3d cluster '${CLUSTER_NAME}' is ready" diff --git a/integration/scripts/teardown.sh b/integration/scripts/teardown.sh new file mode 100755 index 0000000..7d7da84 --- /dev/null +++ b/integration/scripts/teardown.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +CLUSTER_NAME="data-pump-integration" + +echo "==> Tearing down k3d cluster: ${CLUSTER_NAME}" + +if k3d cluster list 2>/dev/null | grep -q "${CLUSTER_NAME}"; then + k3d cluster delete "${CLUSTER_NAME}" + echo "==> Cluster deleted" +else + echo "==> Cluster not found, nothing to tear down" +fi diff --git a/integration/scripts/verify.sh b/integration/scripts/verify.sh new file mode 100755 index 0000000..5cad157 --- /dev/null +++ b/integration/scripts/verify.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash +set -euo pipefail + +NAMESPACE="data-pump-integration-test" +TOTAL_EVENTS="${TOTAL_EVENTS:-100}" +TIMEOUT=180 +POLL_INTERVAL=5 + +echo "==> Verification starting (expecting ${TOTAL_EVENTS} events, timeout ${TIMEOUT}s)" + +# 1. Wait for all 3 pods to be Ready +echo "==> Waiting for all pods to be Ready..." +kubectl wait --for=condition=ready pod -l app=data-pump-test -n "${NAMESPACE}" --timeout=120s +PODS=$(kubectl get pods -n "${NAMESPACE}" -l app=data-pump-test -o jsonpath='{.items[*].metadata.name}') +POD_COUNT=$(echo "${PODS}" | wc -w | tr -d ' ') +echo "==> ${POD_COUNT} pods ready: ${PODS}" + +if [[ "${POD_COUNT}" -lt 3 ]]; then + echo "FAIL: Expected 3 pods, got ${POD_COUNT}" + exit 1 +fi + +# 2. Poll PG until all events are processed +PG_POD=$(kubectl get pods -n "${NAMESPACE}" -l app=postgres -o jsonpath='{.items[0].metadata.name}') +echo "==> Polling PostgreSQL (pod: ${PG_POD}) for processed events..." + +ELAPSED=0 +while [[ ${ELAPSED} -lt ${TIMEOUT} ]]; do + COUNT=$(kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ + psql -U postgres -d datapump_test -t -A -c \ + "SELECT COUNT(DISTINCT event_id) FROM processed_events" 2>/dev/null || echo "0") + COUNT=$(echo "${COUNT}" | tr -d '[:space:]') + + echo " [${ELAPSED}s] Processed events: ${COUNT}/${TOTAL_EVENTS}" + + if [[ "${COUNT}" -ge "${TOTAL_EVENTS}" ]]; then + echo "==> All ${TOTAL_EVENTS} events processed!" + break + fi + + sleep ${POLL_INTERVAL} + ELAPSED=$((ELAPSED + POLL_INTERVAL)) +done + +if [[ ${ELAPSED} -ge ${TIMEOUT} ]]; then + echo "FAIL: Timed out waiting for events to be processed (got ${COUNT}/${TOTAL_EVENTS})" + echo "==> Pod logs:" + for POD in ${PODS}; do + echo "--- ${POD} ---" + kubectl logs -n "${NAMESPACE}" "${POD}" --tail=50 || true + done + exit 1 +fi + +# 3. Assert distribution across multiple pods +DISTINCT_PODS=$(kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ + psql -U postgres -d datapump_test -t -A -c \ + "SELECT COUNT(DISTINCT pod_name) FROM processed_events") +DISTINCT_PODS=$(echo "${DISTINCT_PODS}" | tr -d '[:space:]') + +echo "==> Events distributed across ${DISTINCT_PODS} pod(s)" + +if [[ "${DISTINCT_PODS}" -lt 2 ]]; then + echo "FAIL: Events were only processed by ${DISTINCT_PODS} pod(s), expected >= 2" + exit 1 +fi + +# 4. Log distribution +echo "==> Event distribution by pod:" +kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ + psql -U postgres -d datapump_test -t -c \ + "SELECT pod_name, COUNT(*) as event_count FROM processed_events GROUP BY pod_name ORDER BY event_count DESC" + +# 5. Assert exactly 1 leader +echo "==> Checking leader election..." +LEADER_COUNT=0 +for POD in ${PODS}; do + HEALTH=$(kubectl exec -n "${NAMESPACE}" "${POD}" -- \ + curl -s http://localhost:8080/health 2>/dev/null || echo '{}') + IS_LEADER=$(echo "${HEALTH}" | grep -o '"isLeader":[a-z]*' | cut -d: -f2) + echo " ${POD}: isLeader=${IS_LEADER}" + if [[ "${IS_LEADER}" == "true" ]]; then + LEADER_COUNT=$((LEADER_COUNT + 1)) + fi +done + +echo "==> Leader count: ${LEADER_COUNT}" +if [[ "${LEADER_COUNT}" -ne 1 ]]; then + echo "FAIL: Expected exactly 1 leader, got ${LEADER_COUNT}" + exit 1 +fi + +echo "" +echo "==========================================" +echo " ALL INTEGRATION TESTS PASSED" +echo "==========================================" diff --git a/src/data-pump/data-pump-cluster.ts b/src/data-pump/data-pump-cluster.ts index 25f8360..9b91ad6 100644 --- a/src/data-pump/data-pump-cluster.ts +++ b/src/data-pump/data-pump-cluster.ts @@ -1,5 +1,6 @@ import type { FlowcoreEvent } from "@flowcore/sdk" import { FlowcoreDataPump, type FlowcoreDataPumpOptions } from "./data-pump.ts" +import type { FlowcoreDataSource } from "./data-source.ts" import { clusterMetrics } from "./metrics.ts" import type { FlowcoreDataPumpCoordinator, FlowcoreLogger } from "./types.ts" import { DeliveryTracker, WsConnection, type WsMessage } from "./ws-protocol.ts" @@ -18,6 +19,7 @@ const DELIVERY_TIMEOUT_MS = 30_000 export interface FlowcoreDataPumpClusterOptions extends FlowcoreDataPumpOptions { coordinator: FlowcoreDataPumpCoordinator advertisedAddress: string + dataSourceOverride?: FlowcoreDataSource leaseTtlMs?: number leaseRenewIntervalMs?: number heartbeatIntervalMs?: number @@ -308,7 +310,7 @@ export class FlowcoreDataPumpCluster { }, } - this.pump = FlowcoreDataPump.create(pumpOptions) + this.pump = FlowcoreDataPump.create(pumpOptions, this.options.dataSourceOverride) this.pump.start().catch((error) => { this.logger?.error("Pump error in leader mode", { error }) }) diff --git a/src/data-pump/data-pump.ts b/src/data-pump/data-pump.ts index 62f26e9..22170ae 100644 --- a/src/data-pump/data-pump.ts +++ b/src/data-pump/data-pump.ts @@ -6,12 +6,12 @@ import { FlowcoreDataSource } from "./data-source.ts" import { metrics } from "./metrics.ts" import { FlowcoreNotifier } from "./notifier.ts" import type { - FlowcoreDataPumpAuth, - FlowcoreDataPumpDataSource, - FlowcoreDataPumpProcessor, - FlowcoreDataPumpState, - FlowcoreDataPumpStateManager, - FlowcoreLogger, + FlowcoreDataPumpAuth, + FlowcoreDataPumpDataSource, + FlowcoreDataPumpProcessor, + FlowcoreDataPumpState, + FlowcoreDataPumpStateManager, + FlowcoreLogger, } from "./types.ts" interface FlowcoreDataPumpNotifierNatsOptions { From c730f3e4c82d56b9de77331a1f1ecf3c57440096 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 19 Mar 2026 15:24:14 +0000 Subject: [PATCH 2/5] fix: use latest Deno image to support lockfile v5 Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/app/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/app/Dockerfile b/integration/app/Dockerfile index e06f948..ff2630c 100644 --- a/integration/app/Dockerfile +++ b/integration/app/Dockerfile @@ -1,4 +1,4 @@ -FROM denoland/deno:2.0.0 +FROM denoland/deno:latest WORKDIR /app COPY deno.json deno.lock ./ COPY src/ ./src/ From b71f1dc02eec5f235c239195cf0d766cc5948347 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 19 Mar 2026 15:28:06 +0000 Subject: [PATCH 3/5] fix: delay event delivery to allow cluster formation Events were being processed before workers connected. Now delivers events in batches of 10 after a 15s startup delay, giving the WS mesh time to form so events get distributed across pods. Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/app/fake-data-source.ts | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/integration/app/fake-data-source.ts b/integration/app/fake-data-source.ts index bddfca6..a685356 100644 --- a/integration/app/fake-data-source.ts +++ b/integration/app/fake-data-source.ts @@ -8,9 +8,11 @@ import type { FlowcoreDataPumpState } from "../../src/data-pump/types.ts" export class FakeDataSource extends FlowcoreDataSource { private readonly events: FlowcoreEvent[] private readonly timeBucket: string - private delivered = false + private deliveredIndex = 0 + private startTime: number + private readonly startDelayMs: number - constructor(totalEvents: number) { + constructor(totalEvents: number, startDelayMs = 15000) { super({ auth: { getBearerToken: () => Promise.resolve("fake") }, dataSource: { @@ -22,6 +24,8 @@ export class FakeDataSource extends FlowcoreDataSource { noTranslation: true, }) + this.startDelayMs = startDelayMs + this.startTime = Date.now() this.timeBucket = format(startOfHour(utc(new Date())), "yyyyMMddHH0000") this.events = [] @@ -53,13 +57,25 @@ export class FakeDataSource extends FlowcoreDataSource { _cursor?: string, _includeSensitiveData?: boolean, ): Promise { - if (this.delivered) { + // delay event delivery to give the cluster time to form (workers connect) + const elapsed = Date.now() - this.startTime + if (elapsed < this.startDelayMs) { + await new Promise((resolve) => setTimeout(resolve, 2000)) return { events: [], nextCursor: undefined } } - this.delivered = true + + if (this.deliveredIndex >= this.events.length) { + return { events: [], nextCursor: undefined } + } + + // deliver in batches of 10 to allow distribution across workers + const batchSize = Math.min(10, amount, this.events.length - this.deliveredIndex) + const batch = this.events.slice(this.deliveredIndex, this.deliveredIndex + batchSize) + this.deliveredIndex += batchSize + return { - events: this.events.slice(0, amount), - nextCursor: undefined, + events: batch, + nextCursor: this.deliveredIndex < this.events.length ? String(this.deliveredIndex) : undefined, } } From f2840a021392e2f954533ea56a6188de57b51ef5 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 19 Mar 2026 15:41:33 +0000 Subject: [PATCH 4/5] fix: check leader via PG lease table instead of curl Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/scripts/verify.sh | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/integration/scripts/verify.sh b/integration/scripts/verify.sh index 5cad157..34c3efd 100755 --- a/integration/scripts/verify.sh +++ b/integration/scripts/verify.sh @@ -71,22 +71,22 @@ kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ psql -U postgres -d datapump_test -t -c \ "SELECT pod_name, COUNT(*) as event_count FROM processed_events GROUP BY pod_name ORDER BY event_count DESC" -# 5. Assert exactly 1 leader -echo "==> Checking leader election..." -LEADER_COUNT=0 -for POD in ${PODS}; do - HEALTH=$(kubectl exec -n "${NAMESPACE}" "${POD}" -- \ - curl -s http://localhost:8080/health 2>/dev/null || echo '{}') - IS_LEADER=$(echo "${HEALTH}" | grep -o '"isLeader":[a-z]*' | cut -d: -f2) - echo " ${POD}: isLeader=${IS_LEADER}" - if [[ "${IS_LEADER}" == "true" ]]; then - LEADER_COUNT=$((LEADER_COUNT + 1)) - fi -done +# 5. Assert exactly 1 leader via PG lease table +echo "==> Checking leader election via lease table..." +LEADER_COUNT=$(kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ + psql -U postgres -d datapump_test -t -A -c \ + "SELECT COUNT(*) FROM flowcore_pump_leases WHERE expires_at > NOW()") +LEADER_COUNT=$(echo "${LEADER_COUNT}" | tr -d '[:space:]') + +echo "==> Active lease count: ${LEADER_COUNT}" + +# also show the lease holder +kubectl exec -n "${NAMESPACE}" "${PG_POD}" -- \ + psql -U postgres -d datapump_test -t -c \ + "SELECT key, holder, expires_at FROM flowcore_pump_leases WHERE expires_at > NOW()" -echo "==> Leader count: ${LEADER_COUNT}" if [[ "${LEADER_COUNT}" -ne 1 ]]; then - echo "FAIL: Expected exactly 1 leader, got ${LEADER_COUNT}" + echo "FAIL: Expected exactly 1 active lease, got ${LEADER_COUNT}" exit 1 fi From b616c18153680fc7ffe20aa616e37ef366a3b023 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 19 Mar 2026 19:37:53 +0000 Subject: [PATCH 5/5] feat: unified NATS mode for notification and cluster event distribution When notifier.type is "nats", NATS handles both event notifications and cluster event distribution via request/reply with queue groups, replacing the WS mesh for inter-pod communication. Shared NATS connection is reused by both notifier and distributor. WS mode remains unchanged as fallback. Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/app/main.ts | 23 +++-- integration/k8s/configmap.yaml | 1 + integration/k8s/nats.yaml | 37 ++++++++ integration/scripts/deploy.sh | 7 ++ src/data-pump/data-pump-cluster.ts | 124 +++++++++++++++++++++++--- src/data-pump/nats-connection.ts | 47 ++++++++++ src/data-pump/nats-distribution.ts | 97 +++++++++++++++++++++ src/data-pump/notifier.ts | 22 +++-- src/mod.ts | 2 + test/tests/data-pump-cluster.test.ts | 126 ++++++++++++++++++++++++++- 10 files changed, 458 insertions(+), 28 deletions(-) create mode 100644 integration/k8s/nats.yaml create mode 100644 src/data-pump/nats-connection.ts create mode 100644 src/data-pump/nats-distribution.ts diff --git a/integration/app/main.ts b/integration/app/main.ts index 914d22c..af98e18 100644 --- a/integration/app/main.ts +++ b/integration/app/main.ts @@ -6,7 +6,7 @@ import { FakeDataSource } from "./fake-data-source.ts" const DATABASE_URL = Deno.env.get("DATABASE_URL") ?? "postgres://postgres:postgres@localhost:5432/datapump_test" const POD_NAME = Deno.env.get("POD_NAME") ?? "local-pod" -const POD_IP = Deno.env.get("POD_IP") ?? "127.0.0.1" +const NATS_URL = Deno.env.get("NATS_URL") const TOTAL_EVENTS = parseInt(Deno.env.get("TOTAL_EVENTS") ?? "100", 10) const WS_PORT = parseInt(Deno.env.get("WS_PORT") ?? "8080", 10) @@ -58,7 +58,11 @@ const coordinator = new PostgresCoordinator(db) const stateManager = new PostgresStateManager(db) const fakeDataSource = new FakeDataSource(TOTAL_EVENTS) +const useNats = !!NATS_URL +log.info(`Distribution mode: ${useNats ? "NATS" : "WS"}`, { natsUrl: NATS_URL }) + // create cluster +const POD_IP = Deno.env.get("POD_IP") ?? "127.0.0.1" const cluster = new FlowcoreDataPumpCluster({ auth: { getBearerToken: () => Promise.resolve("fake") }, dataSource: { @@ -70,7 +74,14 @@ const cluster = new FlowcoreDataPumpCluster({ stateManager, coordinator, dataSourceOverride: fakeDataSource, - advertisedAddress: `ws://${POD_IP}:${WS_PORT}`, + ...(useNats + ? { + notifier: { type: "nats" as const, servers: [NATS_URL!] }, + } + : { + advertisedAddress: `ws://${POD_IP}:${WS_PORT}`, + notifier: { type: "poller" as const, intervalMs: 2000 }, + }), noTranslation: true, processor: { concurrency: 5, @@ -84,7 +95,6 @@ const cluster = new FlowcoreDataPumpCluster({ log.info(`Processed ${events.length} events`) }, }, - notifier: { type: "poller", intervalMs: 2000 }, leaseTtlMs: 15000, leaseRenewIntervalMs: 5000, heartbeatIntervalMs: 3000, @@ -92,7 +102,7 @@ const cluster = new FlowcoreDataPumpCluster({ logger: log, }) -// start HTTP + WS server +// start HTTP server (health endpoint only in NATS mode, health + WS in WS mode) Deno.serve({ port: WS_PORT }, (req) => { const url = new URL(req.url) @@ -104,12 +114,13 @@ Deno.serve({ port: WS_PORT }, (req) => { workerCount: cluster.activeWorkerCount, isRunning: cluster.isRunning, podName: POD_NAME, + distributionMode: useNats ? "nats" : "ws", }), { headers: { "content-type": "application/json" } }, ) } - if (req.headers.get("upgrade")?.toLowerCase() === "websocket") { + if (!useNats && req.headers.get("upgrade")?.toLowerCase() === "websocket") { const { socket, response } = Deno.upgradeWebSocket(req) cluster.handleConnection(socket) return response @@ -118,7 +129,7 @@ Deno.serve({ port: WS_PORT }, (req) => { return new Response("Not found", { status: 404 }) }) -log.info(`HTTP/WS server listening on :${WS_PORT}`) +log.info(`HTTP server listening on :${WS_PORT}`) // start cluster await cluster.start() diff --git a/integration/k8s/configmap.yaml b/integration/k8s/configmap.yaml index e3f8760..22b2f16 100644 --- a/integration/k8s/configmap.yaml +++ b/integration/k8s/configmap.yaml @@ -6,3 +6,4 @@ metadata: data: DATABASE_URL: "postgres://postgres:postgres@postgres:5432/datapump_test" TOTAL_EVENTS: "100" + NATS_URL: "nats://nats:4222" diff --git a/integration/k8s/nats.yaml b/integration/k8s/nats.yaml new file mode 100644 index 0000000..36a79e9 --- /dev/null +++ b/integration/k8s/nats.yaml @@ -0,0 +1,37 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nats + namespace: data-pump-integration-test +spec: + replicas: 1 + selector: + matchLabels: + app: nats + template: + metadata: + labels: + app: nats + spec: + containers: + - name: nats + image: nats:2.10-alpine + ports: + - containerPort: 4222 + readinessProbe: + tcpSocket: + port: 4222 + initialDelaySeconds: 2 + periodSeconds: 5 +--- +apiVersion: v1 +kind: Service +metadata: + name: nats + namespace: data-pump-integration-test +spec: + selector: + app: nats + ports: + - port: 4222 + targetPort: 4222 diff --git a/integration/scripts/deploy.sh b/integration/scripts/deploy.sh index b442aaf..bb706bd 100755 --- a/integration/scripts/deploy.sh +++ b/integration/scripts/deploy.sh @@ -21,6 +21,13 @@ echo "==> Waiting for PostgreSQL to be ready..." kubectl rollout status deployment/postgres -n "${NAMESPACE}" --timeout=120s kubectl wait --for=condition=ready pod -l app=postgres -n "${NAMESPACE}" --timeout=120s +echo "==> Deploying NATS..." +kubectl apply -f "${ROOT_DIR}/integration/k8s/nats.yaml" + +echo "==> Waiting for NATS to be ready..." +kubectl rollout status deployment/nats -n "${NAMESPACE}" --timeout=120s +kubectl wait --for=condition=ready pod -l app=nats -n "${NAMESPACE}" --timeout=120s + echo "==> Deploying ConfigMap and test app..." kubectl apply -f "${ROOT_DIR}/integration/k8s/configmap.yaml" kubectl apply -f "${ROOT_DIR}/integration/k8s/test-app.yaml" diff --git a/src/data-pump/data-pump-cluster.ts b/src/data-pump/data-pump-cluster.ts index 9b91ad6..58ae6fe 100644 --- a/src/data-pump/data-pump-cluster.ts +++ b/src/data-pump/data-pump-cluster.ts @@ -2,6 +2,8 @@ import type { FlowcoreEvent } from "@flowcore/sdk" import { FlowcoreDataPump, type FlowcoreDataPumpOptions } from "./data-pump.ts" import type { FlowcoreDataSource } from "./data-source.ts" import { clusterMetrics } from "./metrics.ts" +import { NatsConnectionManager } from "./nats-connection.ts" +import { NatsDistributionLeader, NatsDistributionWorker } from "./nats-distribution.ts" import type { FlowcoreDataPumpCoordinator, FlowcoreLogger } from "./types.ts" import { DeliveryTracker, WsConnection, type WsMessage } from "./ws-protocol.ts" @@ -18,12 +20,13 @@ const DELIVERY_TIMEOUT_MS = 30_000 export interface FlowcoreDataPumpClusterOptions extends FlowcoreDataPumpOptions { coordinator: FlowcoreDataPumpCoordinator - advertisedAddress: string + advertisedAddress?: string dataSourceOverride?: FlowcoreDataSource leaseTtlMs?: number leaseRenewIntervalMs?: number heartbeatIntervalMs?: number workerConcurrency?: number + clusterKey?: string } interface WorkerConnection { @@ -62,7 +65,17 @@ export class FlowcoreDataPumpCluster { private workerHandler?: (events: FlowcoreEvent[]) => Promise private workerFailedHandler?: (events: FlowcoreEvent[]) => void | Promise + // NATS distribution state + private natsConnectionManager?: NatsConnectionManager + private natsDistLeader?: NatsDistributionLeader + private natsDistWorker?: NatsDistributionWorker + constructor(private readonly options: FlowcoreDataPumpClusterOptions) { + // validate: WS mode requires advertisedAddress + if (!this.useNatsDistribution && !options.advertisedAddress) { + throw new Error("advertisedAddress is required when not using NATS distribution (notifier.type !== 'nats')") + } + this.instanceId = crypto.randomUUID() this.coordinator = options.coordinator this.leaseTtlMs = options.leaseTtlMs ?? DEFAULT_LEASE_TTL_MS @@ -76,6 +89,16 @@ export class FlowcoreDataPumpCluster { this.workerFailedHandler = options.processor?.failedHandler } + private get useNatsDistribution(): boolean { + return this.options.notifier?.type === "nats" + } + + private get natsDistributionSubject(): string { + const key = this.options.clusterKey ?? + `${this.options.dataSource.tenant}.${this.options.dataSource.dataCore}.${this.options.dataSource.flowType}` + return `data-pump.distribute.${key}` + } + get isRunning(): boolean { return this.running } @@ -138,13 +161,34 @@ export class FlowcoreDataPumpCluster { this.logger?.info("Starting cluster instance", { instanceId: this.instanceId }) - // Step 1: Register in DB - await this.coordinator.register(this.instanceId, this.options.advertisedAddress) + if (this.useNatsDistribution) { + // NATS mode: connect shared NATS, start worker subscription + const natsServers = this.options.notifier?.type === "nats" ? this.options.notifier.servers : [] + this.natsConnectionManager = new NatsConnectionManager(natsServers, this.logger) + const conn = await this.natsConnectionManager.connect() + + // all instances subscribe as workers (leader also processes if no workers pick up) + if (this.workerHandler) { + this.natsDistWorker = new NatsDistributionWorker( + conn, + this.natsDistributionSubject, + this.workerHandler, + this.logger, + ) + this.natsDistWorker.start() + } + + // register with placeholder address (not used for routing in NATS mode) + await this.coordinator.register(this.instanceId, "nats://distributed") + } else { + // WS mode: register with actual address + await this.coordinator.register(this.instanceId, this.options.advertisedAddress!) + } - // Step 2: Start heartbeat + // Start heartbeat this.startHeartbeat() - // Step 3: Start leader election loop + // Start leader election loop this.startElectionLoop() } @@ -171,6 +215,17 @@ export class FlowcoreDataPumpCluster { this.leaderConnection?.close() this.leaderConnection = undefined + // stop NATS distribution + this.natsDistWorker?.stop() + this.natsDistWorker = undefined + this.natsDistLeader = undefined + + // close shared NATS connection + if (this.natsConnectionManager) { + await this.natsConnectionManager.close() + this.natsConnectionManager = undefined + } + // unregister try { await this.coordinator.releaseLease(this.instanceId, LEASE_KEY) @@ -255,11 +310,19 @@ export class FlowcoreDataPumpCluster { this.leaderConnection?.close() this.leaderConnection = undefined - // start worker discovery - this.startWorkerDiscovery() - - // initial discovery - await this.discoverWorkers() + if (this.useNatsDistribution) { + // NATS mode: create leader distributor, no WS discovery needed + const conn = await this.natsConnectionManager!.connect() + this.natsDistLeader = new NatsDistributionLeader( + conn, + this.natsDistributionSubject, + this.logger, + ) + } else { + // WS mode: start worker discovery + this.startWorkerDiscovery() + await this.discoverWorkers() + } // start the pump with distribution processor this.startPumpAsLeader() @@ -282,7 +345,10 @@ export class FlowcoreDataPumpCluster { this.leaseRenewInterval = undefined } - // disconnect all workers + // NATS mode: close leader distributor (worker subscription stays) + this.natsDistLeader = undefined + + // disconnect all WS workers for (const worker of this.workers.values()) { worker.deliveryTracker.rejectAll(new Error("Leader stepping down")) worker.connection.close() @@ -316,7 +382,41 @@ export class FlowcoreDataPumpCluster { }) } - private async distributeEvents(events: FlowcoreEvent[]): Promise { + private distributeEvents(events: FlowcoreEvent[]): Promise { + if (this.useNatsDistribution) { + return this.distributeEventsNats(events) + } + return this.distributeEventsWs(events) + } + + private async distributeEventsNats(events: FlowcoreEvent[]): Promise { + if (!this.natsDistLeader) { + // no NATS leader distributor - process locally as fallback + this.logger?.debug("No NATS leader distributor, processing locally") + if (this.workerHandler) { + await this.workerHandler(events) + } + return + } + + clusterMetrics.eventsDistributedCounter.inc(events.length) + + try { + await this.natsDistLeader.distribute(events) + clusterMetrics.workerAcksCounter.inc() + } catch (error) { + clusterMetrics.workerFailsCounter.inc() + // fallback to local processing on NATS failure + this.logger?.warn("NATS distribution failed, processing locally", { + error: error instanceof Error ? error.message : "unknown", + }) + if (this.workerHandler) { + await this.workerHandler(events) + } + } + } + + private async distributeEventsWs(events: FlowcoreEvent[]): Promise { const worker = this.selectWorker() if (!worker) { diff --git a/src/data-pump/nats-connection.ts b/src/data-pump/nats-connection.ts new file mode 100644 index 0000000..6e42de0 --- /dev/null +++ b/src/data-pump/nats-connection.ts @@ -0,0 +1,47 @@ +import * as Nats from "nats" +import type { FlowcoreLogger } from "./types.ts" + +export class NatsConnectionManager { + private connection?: Nats.NatsConnection + private connecting?: Promise + + constructor( + private readonly servers: string[], + private readonly logger?: FlowcoreLogger, + ) {} + + connect(): Promise { + if (this.connection && !this.connection.isClosed()) { + return Promise.resolve(this.connection) + } + + // deduplicate concurrent connect calls + if (this.connecting) { + return this.connecting + } + + this.connecting = (async () => { + this.logger?.debug("Connecting to NATS", { servers: this.servers }) + const conn = await Nats.connect({ servers: this.servers }) + this.connection = conn + this.connecting = undefined + this.logger?.info("Connected to NATS") + return conn + })() + + return this.connecting + } + + async close(): Promise { + if (this.connection && !this.connection.isClosed()) { + this.logger?.debug("Closing NATS connection") + await this.connection.drain() + this.connection = undefined + } + this.connecting = undefined + } + + get isConnected(): boolean { + return !!this.connection && !this.connection.isClosed() + } +} diff --git a/src/data-pump/nats-distribution.ts b/src/data-pump/nats-distribution.ts new file mode 100644 index 0000000..9014f53 --- /dev/null +++ b/src/data-pump/nats-distribution.ts @@ -0,0 +1,97 @@ +import type { FlowcoreEvent } from "@flowcore/sdk" +import * as Nats from "nats" +import type { FlowcoreLogger } from "./types.ts" + +const DEFAULT_REQUEST_TIMEOUT_MS = 30_000 + +export interface NatsDistributionRequest { + deliveryId: string + events: FlowcoreEvent[] +} + +export interface NatsDistributionReply { + status: "ack" | "fail" + error?: string +} + +export class NatsDistributionLeader { + private readonly codec = Nats.JSONCodec() + + constructor( + private readonly connection: Nats.NatsConnection, + private readonly subject: string, + private readonly logger?: FlowcoreLogger, + private readonly timeoutMs: number = DEFAULT_REQUEST_TIMEOUT_MS, + ) {} + + async distribute(events: FlowcoreEvent[]): Promise { + const deliveryId = crypto.randomUUID() + const request: NatsDistributionRequest = { deliveryId, events } + + this.logger?.debug("Distributing events via NATS", { deliveryId, count: events.length, subject: this.subject }) + + const response = await this.connection.request( + this.subject, + this.codec.encode(request), + { timeout: this.timeoutMs }, + ) + + const reply = this.codec.decode(response.data) as NatsDistributionReply + if (reply.status === "fail") { + throw new Error(`Worker failed to process events: ${reply.error ?? "unknown error"}`) + } + } +} + +export class NatsDistributionWorker { + private subscription?: Nats.Subscription + private readonly codec = Nats.JSONCodec() + private readonly queueGroup = "data-pump-workers" + + constructor( + private readonly connection: Nats.NatsConnection, + private readonly subject: string, + private readonly handler: (events: FlowcoreEvent[]) => Promise, + private readonly logger?: FlowcoreLogger, + ) {} + + start(): void { + this.logger?.info("Starting NATS distribution worker", { subject: this.subject, queue: this.queueGroup }) + + this.subscription = this.connection.subscribe(this.subject, { + queue: this.queueGroup, + callback: (_err, msg) => { + void this.handleMessage(msg) + }, + }) + } + + stop(): void { + if (this.subscription) { + this.logger?.debug("Stopping NATS distribution worker") + this.subscription.unsubscribe() + this.subscription = undefined + } + } + + private async handleMessage(msg: Nats.Msg): Promise { + try { + const request = this.codec.decode(msg.data) as NatsDistributionRequest + this.logger?.debug("Received events from leader via NATS", { + deliveryId: request.deliveryId, + count: request.events.length, + }) + + await this.handler(request.events) + + const reply: NatsDistributionReply = { status: "ack" } + msg.respond(this.codec.encode(reply)) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "unknown error" + this.logger?.error("Worker failed to process events via NATS", { error: errorMessage }) + + const reply: NatsDistributionReply = { status: "fail", error: errorMessage } + msg.respond(this.codec.encode(reply)) + } + } +} diff --git a/src/data-pump/notifier.ts b/src/data-pump/notifier.ts index b6c1747..01e04bf 100644 --- a/src/data-pump/notifier.ts +++ b/src/data-pump/notifier.ts @@ -1,7 +1,7 @@ import { NotificationClient, type NotificationEvent } from "@flowcore/sdk" -import * as Nats from "nats" import { Subject } from "rxjs" import { FlowcoreDataSource } from "./data-source.ts" +import { NatsConnectionManager } from "./nats-connection.ts" import type { FlowcoreDataPumpAuth, FlowcoreDataPumpDataSource, FlowcoreLogger } from "./types.ts" import { noOpLogger } from "./no-op-logger.ts" @@ -11,6 +11,7 @@ export interface FlowcoreNotifierOptions { dataSource: FlowcoreDataPumpDataSource auth: FlowcoreDataPumpAuth natsServers?: string[] + natsConnectionManager?: NatsConnectionManager pollerIntervalMs?: number timeoutMs?: number logger?: FlowcoreLogger @@ -20,7 +21,7 @@ export interface FlowcoreNotifierOptions { export class FlowcoreNotifier { private dataSource: FlowcoreDataSource - private nats?: Nats.NatsConnection + private natsManager?: NatsConnectionManager private subject?: Subject private notificationClient?: NotificationClient private eventResolver?: () => void @@ -34,6 +35,12 @@ export class FlowcoreNotifier { directMode: this.options.directMode, noTranslation: this.options.noTranslation, }) + + if (this.options.natsConnectionManager) { + this.natsManager = this.options.natsConnectionManager + } else if (this.options.natsServers) { + this.natsManager = new NatsConnectionManager(this.options.natsServers, this.options.logger) + } } public wait(signal?: AbortSignal) { @@ -57,9 +64,8 @@ export class FlowcoreNotifier { private async waitNats(signal?: AbortSignal) { this.options.logger?.debug("Waiting for nats") - if (!this.nats) { - this.nats = await Nats.connect({ servers: this.options.natsServers }) - } + const nats = await this.natsManager!.connect() + const dataCoreId = await this.dataSource.getDataCoreId() const topics = this.dataSource.eventTypes.map( (eventType) => `stored.event.notify.0.${dataCoreId}.${this.dataSource.flowType}.${eventType}`, @@ -69,10 +75,10 @@ export class FlowcoreNotifier { this.eventResolver = resolve }) - const subscriptions: Nats.Subscription[] = [] + const subscriptions: ReturnType[] = [] for (const topic of topics) { subscriptions.push( - this.nats.subscribe(topic, { + nats.subscribe(topic, { callback: () => { this.options.logger?.debug(`Received event from nats: ${topic}`) this.eventResolver?.() @@ -90,8 +96,6 @@ export class FlowcoreNotifier { for (const subscription of subscriptions) { subscription.unsubscribe() } - this.nats?.close() - this.nats = undefined } private onWebSocketEvent(event: NotificationEvent) { diff --git a/src/mod.ts b/src/mod.ts index c523d42..962b81b 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -2,6 +2,8 @@ export * from "./data-pump/data-pump.ts" export * from "./data-pump/data-pump-cluster.ts" export * from "./data-pump/data-source.ts" export * from "./data-pump/metrics.ts" +export * from "./data-pump/nats-connection.ts" +export * from "./data-pump/nats-distribution.ts" export * from "./data-pump/types.ts" export * from "./data-pump/ws-protocol.ts" export * from "./data-pump/no-op-logger.ts" diff --git a/test/tests/data-pump-cluster.test.ts b/test/tests/data-pump-cluster.test.ts index 403a159..4be9f52 100644 --- a/test/tests/data-pump-cluster.test.ts +++ b/test/tests/data-pump-cluster.test.ts @@ -1,5 +1,7 @@ -import { assertEquals, assertRejects } from "@std/assert" +import { assertEquals, assertRejects, assertThrows } from "@std/assert" import { beforeEach, describe, it } from "@std/testing/bdd" +import type { FlowcoreEvent } from "@flowcore/sdk" +import { FlowcoreDataPumpCluster } from "../../src/data-pump/data-pump-cluster.ts" import type { FlowcoreDataPumpCoordinator } from "../../src/data-pump/types.ts" import { DeliveryTracker, @@ -9,6 +11,7 @@ import { type WsEventsMessage, type WsPingMessage, } from "../../src/data-pump/ws-protocol.ts" +import type { NatsDistributionReply, NatsDistributionRequest } from "../../src/data-pump/nats-distribution.ts" // #region Mock Coordinator @@ -292,3 +295,124 @@ describe("MockCoordinator", () => { }) // #endregion + +// #region NATS Distribution Tests + +describe("NATS Distribution Protocol", () => { + const sampleEvents: FlowcoreEvent[] = [ + { + eventId: "e1", + eventType: "test", + payload: { foo: "bar" }, + metadata: {}, + timeBucket: "20250101000000", + tenant: "test-tenant", + dataCoreId: "test-dc", + flowType: "test-flow", + validTime: "2025-01-01T00:00:00Z", + }, + { + eventId: "e2", + eventType: "test", + payload: { baz: 42 }, + metadata: {}, + timeBucket: "20250101000000", + tenant: "test-tenant", + dataCoreId: "test-dc", + flowType: "test-flow", + validTime: "2025-01-01T00:00:01Z", + }, + ] + + it("should serialize and deserialize request format", () => { + const request: NatsDistributionRequest = { + deliveryId: "d1", + events: sampleEvents, + } + const json = JSON.stringify(request) + const parsed = JSON.parse(json) as NatsDistributionRequest + assertEquals(parsed.deliveryId, "d1") + assertEquals(parsed.events.length, 2) + assertEquals(parsed.events[0].eventId, "e1") + assertEquals(parsed.events[1].payload, { baz: 42 }) + }) + + it("should serialize and deserialize ack reply", () => { + const reply: NatsDistributionReply = { status: "ack" } + const json = JSON.stringify(reply) + const parsed = JSON.parse(json) as NatsDistributionReply + assertEquals(parsed.status, "ack") + assertEquals(parsed.error, undefined) + }) + + it("should serialize and deserialize fail reply", () => { + const reply: NatsDistributionReply = { status: "fail", error: "processing error" } + const json = JSON.stringify(reply) + const parsed = JSON.parse(json) as NatsDistributionReply + assertEquals(parsed.status, "fail") + assertEquals(parsed.error, "processing error") + }) +}) + +// #endregion + +// #region Cluster Options Validation Tests + +describe("Cluster Options Validation", () => { + const baseOptions = { + auth: { getBearerToken: () => Promise.resolve("fake") }, + dataSource: { + tenant: "test", + dataCore: "dc", + flowType: "ft", + eventTypes: ["ev"], + }, + stateManager: { + getState: () => null, + }, + } + + it("should throw when WS mode and no advertisedAddress", () => { + assertThrows( + () => + new FlowcoreDataPumpCluster({ + ...baseOptions, + coordinator: new MockCoordinator(), + notifier: { type: "poller", intervalMs: 1000 }, + }), + Error, + "advertisedAddress is required", + ) + }) + + it("should not throw when WS mode and advertisedAddress provided", () => { + const cluster = new FlowcoreDataPumpCluster({ + ...baseOptions, + coordinator: new MockCoordinator(), + advertisedAddress: "ws://localhost:8080", + notifier: { type: "poller", intervalMs: 1000 }, + }) + assertEquals(typeof cluster.id, "string") + }) + + it("should not throw when NATS mode and no advertisedAddress", () => { + const cluster = new FlowcoreDataPumpCluster({ + ...baseOptions, + coordinator: new MockCoordinator(), + notifier: { type: "nats", servers: ["nats://localhost:4222"] }, + }) + assertEquals(typeof cluster.id, "string") + }) + + it("should not throw when NATS mode with advertisedAddress (ignored)", () => { + const cluster = new FlowcoreDataPumpCluster({ + ...baseOptions, + coordinator: new MockCoordinator(), + advertisedAddress: "ws://localhost:8080", + notifier: { type: "nats", servers: ["nats://localhost:4222"] }, + }) + assertEquals(typeof cluster.id, "string") + }) +}) + +// #endregion