Skip to content

Commit 2012dff

Browse files
jbiskurclaude
andcommitted
feat: add Kubernetes integration test suite for cluster mode
Adds a full k3d-based integration test that deploys 3 replicas with a real PostgreSQL coordinator to verify leader election, WS mesh, and event distribution across pods. Also adds dataSourceOverride passthrough to FlowcoreDataPumpCluster for test injection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 231fbc7 commit 2012dff

19 files changed

Lines changed: 697 additions & 9 deletions

.github/workflows/validate.yml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,38 @@ jobs:
4343
run: deno test -A
4444
- name: Validate NPM build
4545
run: deno run -A bin/build-npm.ts
46+
47+
integration-test:
48+
runs-on: blacksmith-4vcpu-ubuntu-2204
49+
needs: build
50+
steps:
51+
- uses: actions/checkout@v3
52+
with:
53+
token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }}
54+
submodules: true
55+
- name: Setup Deno2
56+
uses: denoland/setup-deno@v2
57+
with:
58+
deno-version: v2.x
59+
- name: Install Docker
60+
run: |
61+
if ! command -v docker &>/dev/null; then
62+
curl -fsSL https://get.docker.com | sh
63+
fi
64+
docker version
65+
- name: Install k3d
66+
run: curl -s https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash
67+
- name: Install kubectl
68+
run: |
69+
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
70+
chmod +x kubectl && sudo mv kubectl /usr/local/bin/
71+
- name: Run integration tests
72+
run: bash integration/run-integration.sh
73+
- name: Collect logs on failure
74+
if: failure()
75+
run: |
76+
kubectl logs -n data-pump-integration-test -l app=data-pump-test --tail=200 || true
77+
kubectl logs -n data-pump-integration-test -l app=postgres --tail=100 || true
78+
- name: Teardown
79+
if: always()
80+
run: bash integration/scripts/teardown.sh || true

deno.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"node_modules",
3333
"npm",
3434
".github",
35-
"CHANGELOG.md"
35+
"CHANGELOG.md",
36+
"integration"
3637
],
3738
"lineWidth": 120,
3839
"indentWidth": 2,
@@ -42,7 +43,8 @@
4243
"lint": {
4344
"exclude": [
4445
"node_modules",
45-
"npm"
46+
"npm",
47+
"integration"
4648
]
4749
}
4850
}

integration/app/Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM denoland/deno:2.0.0
2+
WORKDIR /app
3+
COPY deno.json deno.lock ./
4+
COPY src/ ./src/
5+
COPY integration/app/ ./integration/app/
6+
RUN deno install
7+
EXPOSE 8080
8+
CMD ["deno", "run", "-A", "integration/app/main.ts"]

integration/app/deps.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export { default as pg } from "npm:pg@^8.13.0"
2+
export const { Client } = await import("npm:pg@^8.13.0")
3+
export type { Client as PgClient } from "npm:pg@^8.13.0"
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import type { EventListOutput, FlowcoreEvent } from "@flowcore/sdk"
2+
import { TimeUuid } from "@flowcore/time-uuid"
3+
import { utc } from "@date-fns/utc"
4+
import { format, startOfHour } from "date-fns"
5+
import { FlowcoreDataSource } from "../../src/data-pump/data-source.ts"
6+
import type { FlowcoreDataPumpState } from "../../src/data-pump/types.ts"
7+
8+
export class FakeDataSource extends FlowcoreDataSource {
9+
private readonly events: FlowcoreEvent[]
10+
private readonly timeBucket: string
11+
private delivered = false
12+
13+
constructor(totalEvents: number) {
14+
super({
15+
auth: { getBearerToken: () => Promise.resolve("fake") },
16+
dataSource: {
17+
tenant: "integration-test",
18+
dataCore: "test-data-core",
19+
flowType: "test-flow-type",
20+
eventTypes: ["test-event"],
21+
},
22+
noTranslation: true,
23+
})
24+
25+
this.timeBucket = format(startOfHour(utc(new Date())), "yyyyMMddHH0000")
26+
this.events = []
27+
28+
for (let i = 0; i < totalEvents; i++) {
29+
const timeUuid = TimeUuid.now()
30+
this.events.push({
31+
eventId: timeUuid.toString(),
32+
eventType: "test-event",
33+
aggregator: `agg-${i}`,
34+
payload: { index: i, data: `test-payload-${i}` },
35+
metadata: {},
36+
timeBucket: this.timeBucket,
37+
validTime: new Date().toISOString(),
38+
})
39+
}
40+
41+
// pre-set cached IDs so the parent class never calls Flowcore API
42+
this.tenantId = "integration-test"
43+
this.dataCoreId = "test-data-core"
44+
this.flowTypeId = "test-flow-type"
45+
this.eventTypeIds = ["test-event"]
46+
this.timeBuckets = [this.timeBucket]
47+
}
48+
49+
public override async getEvents(
50+
_from: FlowcoreDataPumpState,
51+
amount: number,
52+
_toEventId?: string,
53+
_cursor?: string,
54+
_includeSensitiveData?: boolean,
55+
): Promise<EventListOutput> {
56+
if (this.delivered) {
57+
return { events: [], nextCursor: undefined }
58+
}
59+
this.delivered = true
60+
return {
61+
events: this.events.slice(0, amount),
62+
nextCursor: undefined,
63+
}
64+
}
65+
66+
public override async getTimeBuckets(_force?: boolean): Promise<string[]> {
67+
return [this.timeBucket]
68+
}
69+
70+
public override async getNextTimeBucket(_timeBucket: string): Promise<string | null> {
71+
return null
72+
}
73+
74+
public override async getClosestTimeBucket(_timeBucket: string, _getBefore?: boolean): Promise<string | null> {
75+
return this.timeBucket
76+
}
77+
78+
public override async getTenantId(): Promise<string> {
79+
return "integration-test"
80+
}
81+
82+
public override async getDataCoreId(): Promise<string> {
83+
return "test-data-core"
84+
}
85+
86+
public override async getFlowTypeId(): Promise<string> {
87+
return "test-flow-type"
88+
}
89+
90+
public override async getEventTypeIds(): Promise<string[]> {
91+
return ["test-event"]
92+
}
93+
}

integration/app/main.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { Client } from "npm:pg@^8.13.0"
2+
import { FlowcoreDataPumpCluster } from "../../src/data-pump/data-pump-cluster.ts"
3+
import { PostgresCoordinator } from "./postgres-coordinator.ts"
4+
import { PostgresStateManager } from "./postgres-state-manager.ts"
5+
import { FakeDataSource } from "./fake-data-source.ts"
6+
7+
const DATABASE_URL = Deno.env.get("DATABASE_URL") ?? "postgres://postgres:postgres@localhost:5432/datapump_test"
8+
const POD_NAME = Deno.env.get("POD_NAME") ?? "local-pod"
9+
const POD_IP = Deno.env.get("POD_IP") ?? "127.0.0.1"
10+
const TOTAL_EVENTS = parseInt(Deno.env.get("TOTAL_EVENTS") ?? "100", 10)
11+
const WS_PORT = parseInt(Deno.env.get("WS_PORT") ?? "8080", 10)
12+
13+
const log = {
14+
debug: (msg: string, meta?: Record<string, unknown>) => console.log(`[DEBUG] [${POD_NAME}] ${msg}`, meta ?? ""),
15+
info: (msg: string, meta?: Record<string, unknown>) => console.log(`[INFO] [${POD_NAME}] ${msg}`, meta ?? ""),
16+
warn: (msg: string, meta?: Record<string, unknown>) => console.warn(`[WARN] [${POD_NAME}] ${msg}`, meta ?? ""),
17+
error: (msg: string | Error, meta?: Record<string, unknown>) =>
18+
console.error(`[ERROR] [${POD_NAME}] ${msg}`, meta ?? ""),
19+
}
20+
21+
// connect to PG
22+
const db = new Client({ connectionString: DATABASE_URL })
23+
await db.connect()
24+
log.info("Connected to PostgreSQL")
25+
26+
// create tables
27+
await db.query(`
28+
CREATE TABLE IF NOT EXISTS flowcore_pump_leases (
29+
key TEXT PRIMARY KEY,
30+
holder TEXT NOT NULL,
31+
expires_at TIMESTAMPTZ NOT NULL
32+
);
33+
34+
CREATE TABLE IF NOT EXISTS flowcore_pump_instances (
35+
instance_id TEXT PRIMARY KEY,
36+
address TEXT NOT NULL,
37+
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW()
38+
);
39+
40+
CREATE TABLE IF NOT EXISTS pump_state (
41+
id TEXT PRIMARY KEY,
42+
time_bucket TEXT NOT NULL,
43+
event_id TEXT,
44+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
45+
);
46+
47+
CREATE TABLE IF NOT EXISTS processed_events (
48+
id SERIAL PRIMARY KEY,
49+
pod_name TEXT NOT NULL,
50+
event_id TEXT NOT NULL,
51+
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
52+
);
53+
`)
54+
log.info("Database tables ready")
55+
56+
// create components
57+
const coordinator = new PostgresCoordinator(db)
58+
const stateManager = new PostgresStateManager(db)
59+
const fakeDataSource = new FakeDataSource(TOTAL_EVENTS)
60+
61+
// create cluster
62+
const cluster = new FlowcoreDataPumpCluster({
63+
auth: { getBearerToken: () => Promise.resolve("fake") },
64+
dataSource: {
65+
tenant: "integration-test",
66+
dataCore: "test-data-core",
67+
flowType: "test-flow-type",
68+
eventTypes: ["test-event"],
69+
},
70+
stateManager,
71+
coordinator,
72+
dataSourceOverride: fakeDataSource,
73+
advertisedAddress: `ws://${POD_IP}:${WS_PORT}`,
74+
noTranslation: true,
75+
processor: {
76+
concurrency: 5,
77+
handler: async (events) => {
78+
for (const event of events) {
79+
await db.query(`INSERT INTO processed_events (pod_name, event_id) VALUES ($1, $2)`, [
80+
POD_NAME,
81+
event.eventId,
82+
])
83+
}
84+
log.info(`Processed ${events.length} events`)
85+
},
86+
},
87+
notifier: { type: "poller", intervalMs: 2000 },
88+
leaseTtlMs: 15000,
89+
leaseRenewIntervalMs: 5000,
90+
heartbeatIntervalMs: 3000,
91+
workerConcurrency: 5,
92+
logger: log,
93+
})
94+
95+
// start HTTP + WS server
96+
Deno.serve({ port: WS_PORT }, (req) => {
97+
const url = new URL(req.url)
98+
99+
if (url.pathname === "/health") {
100+
return new Response(
101+
JSON.stringify({
102+
instanceId: cluster.id,
103+
isLeader: cluster.isLeaderInstance,
104+
workerCount: cluster.activeWorkerCount,
105+
isRunning: cluster.isRunning,
106+
podName: POD_NAME,
107+
}),
108+
{ headers: { "content-type": "application/json" } },
109+
)
110+
}
111+
112+
if (req.headers.get("upgrade")?.toLowerCase() === "websocket") {
113+
const { socket, response } = Deno.upgradeWebSocket(req)
114+
cluster.handleConnection(socket)
115+
return response
116+
}
117+
118+
return new Response("Not found", { status: 404 })
119+
})
120+
121+
log.info(`HTTP/WS server listening on :${WS_PORT}`)
122+
123+
// start cluster
124+
await cluster.start()
125+
log.info("Cluster started")
126+
127+
// graceful shutdown
128+
Deno.addSignalListener("SIGTERM", async () => {
129+
log.info("SIGTERM received, shutting down...")
130+
await cluster.stop()
131+
await db.end()
132+
Deno.exit(0)
133+
})
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import type { Client as PgClient } from "npm:pg@^8.13.0"
2+
import type { FlowcoreDataPumpCoordinator } from "../../src/data-pump/types.ts"
3+
4+
export class PostgresCoordinator implements FlowcoreDataPumpCoordinator {
5+
constructor(private readonly db: PgClient) {}
6+
7+
async acquireLease(instanceId: string, key: string, ttlMs: number): Promise<boolean> {
8+
const expiresAt = new Date(Date.now() + ttlMs)
9+
const result = await this.db.query(
10+
`INSERT INTO flowcore_pump_leases (key, holder, expires_at)
11+
VALUES ($1, $2, $3)
12+
ON CONFLICT (key) DO UPDATE
13+
SET holder = $2, expires_at = $3
14+
WHERE flowcore_pump_leases.expires_at < NOW() OR flowcore_pump_leases.holder = $2
15+
RETURNING holder`,
16+
[key, instanceId, expiresAt],
17+
)
18+
return result.rowCount > 0 && result.rows[0].holder === instanceId
19+
}
20+
21+
async renewLease(instanceId: string, key: string, ttlMs: number): Promise<boolean> {
22+
const expiresAt = new Date(Date.now() + ttlMs)
23+
const result = await this.db.query(
24+
`UPDATE flowcore_pump_leases SET expires_at = $3 WHERE key = $1 AND holder = $2`,
25+
[key, instanceId, expiresAt],
26+
)
27+
return (result.rowCount ?? 0) > 0
28+
}
29+
30+
async releaseLease(instanceId: string, key: string): Promise<void> {
31+
await this.db.query(`DELETE FROM flowcore_pump_leases WHERE key = $1 AND holder = $2`, [key, instanceId])
32+
}
33+
34+
async register(instanceId: string, address: string): Promise<void> {
35+
await this.db.query(
36+
`INSERT INTO flowcore_pump_instances (instance_id, address, last_heartbeat)
37+
VALUES ($1, $2, NOW())
38+
ON CONFLICT (instance_id) DO UPDATE SET address = $2, last_heartbeat = NOW()`,
39+
[instanceId, address],
40+
)
41+
}
42+
43+
async heartbeat(instanceId: string): Promise<void> {
44+
await this.db.query(`UPDATE flowcore_pump_instances SET last_heartbeat = NOW() WHERE instance_id = $1`, [
45+
instanceId,
46+
])
47+
}
48+
49+
async unregister(instanceId: string): Promise<void> {
50+
await this.db.query(`DELETE FROM flowcore_pump_instances WHERE instance_id = $1`, [instanceId])
51+
}
52+
53+
async getInstances(staleThresholdMs: number): Promise<Array<{ instanceId: string; address: string }>> {
54+
const threshold = new Date(Date.now() - staleThresholdMs)
55+
const result = await this.db.query(
56+
`SELECT instance_id, address FROM flowcore_pump_instances WHERE last_heartbeat > $1`,
57+
[threshold],
58+
)
59+
return result.rows.map((row: { instance_id: string; address: string }) => ({
60+
instanceId: row.instance_id,
61+
address: row.address,
62+
}))
63+
}
64+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import type { Client as PgClient } from "npm:pg@^8.13.0"
2+
import type { FlowcoreDataPumpState, FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts"
3+
4+
export class PostgresStateManager implements FlowcoreDataPumpStateManager {
5+
constructor(private readonly db: PgClient) {}
6+
7+
async getState(): Promise<FlowcoreDataPumpState | null> {
8+
const result = await this.db.query(
9+
`SELECT time_bucket, event_id FROM pump_state ORDER BY updated_at DESC LIMIT 1`,
10+
)
11+
if (result.rows.length === 0) return null
12+
return {
13+
timeBucket: result.rows[0].time_bucket,
14+
eventId: result.rows[0].event_id,
15+
}
16+
}
17+
18+
async setState(state: FlowcoreDataPumpState): Promise<void> {
19+
await this.db.query(
20+
`INSERT INTO pump_state (id, time_bucket, event_id, updated_at)
21+
VALUES ('default', $1, $2, NOW())
22+
ON CONFLICT (id) DO UPDATE SET time_bucket = $1, event_id = $2, updated_at = NOW()`,
23+
[state.timeBucket, state.eventId],
24+
)
25+
}
26+
}

0 commit comments

Comments
 (0)