Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,38 @@ jobs:
run: deno test -A
- name: Validate NPM build
run: deno run -A bin/build-npm.ts

integration-test:
runs-on: blacksmith-4vcpu-ubuntu-2204
needs: build
steps:
- uses: actions/checkout@v3
with:
token: ${{ secrets.FLOWCORE_MACHINE_GITHUB_TOKEN }}
submodules: true
- name: Setup Deno2
uses: denoland/setup-deno@v2
with:
deno-version: v2.x
- name: Install Docker
run: |
if ! command -v docker &>/dev/null; then
curl -fsSL https://get.docker.com | sh
fi
docker version
- name: Install k3d
run: curl -s https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash
- name: Install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl && sudo mv kubectl /usr/local/bin/
- name: Run integration tests
run: bash integration/run-integration.sh
- name: Collect logs on failure
if: failure()
run: |
kubectl logs -n data-pump-integration-test -l app=data-pump-test --tail=200 || true
kubectl logs -n data-pump-integration-test -l app=postgres --tail=100 || true
- name: Teardown
if: always()
run: bash integration/scripts/teardown.sh || true
6 changes: 4 additions & 2 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"node_modules",
"npm",
".github",
"CHANGELOG.md"
"CHANGELOG.md",
"integration"
],
"lineWidth": 120,
"indentWidth": 2,
Expand All @@ -42,7 +43,8 @@
"lint": {
"exclude": [
"node_modules",
"npm"
"npm",
"integration"
]
}
}
8 changes: 8 additions & 0 deletions integration/app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM denoland/deno:latest
WORKDIR /app
COPY deno.json deno.lock ./
COPY src/ ./src/
COPY integration/app/ ./integration/app/
RUN deno install
EXPOSE 8080
CMD ["deno", "run", "-A", "integration/app/main.ts"]
3 changes: 3 additions & 0 deletions integration/app/deps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { default as pg } from "npm:pg@^8.13.0"
export const { Client } = await import("npm:pg@^8.13.0")
export type { Client as PgClient } from "npm:pg@^8.13.0"
109 changes: 109 additions & 0 deletions integration/app/fake-data-source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import type { EventListOutput, FlowcoreEvent } from "@flowcore/sdk"
import { TimeUuid } from "@flowcore/time-uuid"
import { utc } from "@date-fns/utc"
import { format, startOfHour } from "date-fns"
import { FlowcoreDataSource } from "../../src/data-pump/data-source.ts"
import type { FlowcoreDataPumpState } from "../../src/data-pump/types.ts"

export class FakeDataSource extends FlowcoreDataSource {
private readonly events: FlowcoreEvent[]
private readonly timeBucket: string
private deliveredIndex = 0
private startTime: number
private readonly startDelayMs: number

constructor(totalEvents: number, startDelayMs = 15000) {
super({
auth: { getBearerToken: () => Promise.resolve("fake") },
dataSource: {
tenant: "integration-test",
dataCore: "test-data-core",
flowType: "test-flow-type",
eventTypes: ["test-event"],
},
noTranslation: true,
})

this.startDelayMs = startDelayMs
this.startTime = Date.now()
this.timeBucket = format(startOfHour(utc(new Date())), "yyyyMMddHH0000")
this.events = []

for (let i = 0; i < totalEvents; i++) {
const timeUuid = TimeUuid.now()
this.events.push({
eventId: timeUuid.toString(),
eventType: "test-event",
aggregator: `agg-${i}`,
payload: { index: i, data: `test-payload-${i}` },
metadata: {},
timeBucket: this.timeBucket,
validTime: new Date().toISOString(),
})
}

// pre-set cached IDs so the parent class never calls Flowcore API
this.tenantId = "integration-test"
this.dataCoreId = "test-data-core"
this.flowTypeId = "test-flow-type"
this.eventTypeIds = ["test-event"]
this.timeBuckets = [this.timeBucket]
}

public override async getEvents(
_from: FlowcoreDataPumpState,
amount: number,
_toEventId?: string,
_cursor?: string,
_includeSensitiveData?: boolean,
): Promise<EventListOutput> {
// delay event delivery to give the cluster time to form (workers connect)
const elapsed = Date.now() - this.startTime
if (elapsed < this.startDelayMs) {
await new Promise((resolve) => setTimeout(resolve, 2000))
return { events: [], nextCursor: undefined }
}

if (this.deliveredIndex >= this.events.length) {
return { events: [], nextCursor: undefined }
}

// deliver in batches of 10 to allow distribution across workers
const batchSize = Math.min(10, amount, this.events.length - this.deliveredIndex)
const batch = this.events.slice(this.deliveredIndex, this.deliveredIndex + batchSize)
this.deliveredIndex += batchSize

return {
events: batch,
nextCursor: this.deliveredIndex < this.events.length ? String(this.deliveredIndex) : undefined,
}
}

public override async getTimeBuckets(_force?: boolean): Promise<string[]> {
return [this.timeBucket]
}

public override async getNextTimeBucket(_timeBucket: string): Promise<string | null> {
return null
}

public override async getClosestTimeBucket(_timeBucket: string, _getBefore?: boolean): Promise<string | null> {
return this.timeBucket
}

public override async getTenantId(): Promise<string> {
return "integration-test"
}

public override async getDataCoreId(): Promise<string> {
return "test-data-core"
}

public override async getFlowTypeId(): Promise<string> {
return "test-flow-type"
}

public override async getEventTypeIds(): Promise<string[]> {
return ["test-event"]
}
}
144 changes: 144 additions & 0 deletions integration/app/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { Client } from "npm:pg@^8.13.0"
import { FlowcoreDataPumpCluster } from "../../src/data-pump/data-pump-cluster.ts"
import { PostgresCoordinator } from "./postgres-coordinator.ts"
import { PostgresStateManager } from "./postgres-state-manager.ts"
import { FakeDataSource } from "./fake-data-source.ts"

const DATABASE_URL = Deno.env.get("DATABASE_URL") ?? "postgres://postgres:postgres@localhost:5432/datapump_test"
const POD_NAME = Deno.env.get("POD_NAME") ?? "local-pod"
const NATS_URL = Deno.env.get("NATS_URL")
const TOTAL_EVENTS = parseInt(Deno.env.get("TOTAL_EVENTS") ?? "100", 10)
const WS_PORT = parseInt(Deno.env.get("WS_PORT") ?? "8080", 10)

const log = {
debug: (msg: string, meta?: Record<string, unknown>) => console.log(`[DEBUG] [${POD_NAME}] ${msg}`, meta ?? ""),
info: (msg: string, meta?: Record<string, unknown>) => console.log(`[INFO] [${POD_NAME}] ${msg}`, meta ?? ""),
warn: (msg: string, meta?: Record<string, unknown>) => console.warn(`[WARN] [${POD_NAME}] ${msg}`, meta ?? ""),
error: (msg: string | Error, meta?: Record<string, unknown>) =>
console.error(`[ERROR] [${POD_NAME}] ${msg}`, meta ?? ""),
}

// connect to PG
const db = new Client({ connectionString: DATABASE_URL })
await db.connect()
log.info("Connected to PostgreSQL")

// create tables
await db.query(`
CREATE TABLE IF NOT EXISTS flowcore_pump_leases (
key TEXT PRIMARY KEY,
holder TEXT NOT NULL,
expires_at TIMESTAMPTZ NOT NULL
);

CREATE TABLE IF NOT EXISTS flowcore_pump_instances (
instance_id TEXT PRIMARY KEY,
address TEXT NOT NULL,
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS pump_state (
id TEXT PRIMARY KEY,
time_bucket TEXT NOT NULL,
event_id TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS processed_events (
id SERIAL PRIMARY KEY,
pod_name TEXT NOT NULL,
event_id TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
`)
log.info("Database tables ready")

// create components
const coordinator = new PostgresCoordinator(db)
const stateManager = new PostgresStateManager(db)
const fakeDataSource = new FakeDataSource(TOTAL_EVENTS)

const useNats = !!NATS_URL
log.info(`Distribution mode: ${useNats ? "NATS" : "WS"}`, { natsUrl: NATS_URL })

// create cluster
const POD_IP = Deno.env.get("POD_IP") ?? "127.0.0.1"
const cluster = new FlowcoreDataPumpCluster({
auth: { getBearerToken: () => Promise.resolve("fake") },
dataSource: {
tenant: "integration-test",
dataCore: "test-data-core",
flowType: "test-flow-type",
eventTypes: ["test-event"],
},
stateManager,
coordinator,
dataSourceOverride: fakeDataSource,
...(useNats
? {
notifier: { type: "nats" as const, servers: [NATS_URL!] },
}
: {
advertisedAddress: `ws://${POD_IP}:${WS_PORT}`,
notifier: { type: "poller" as const, intervalMs: 2000 },
}),
noTranslation: true,
processor: {
concurrency: 5,
handler: async (events) => {
for (const event of events) {
await db.query(`INSERT INTO processed_events (pod_name, event_id) VALUES ($1, $2)`, [
POD_NAME,
event.eventId,
])
}
log.info(`Processed ${events.length} events`)
},
},
leaseTtlMs: 15000,
leaseRenewIntervalMs: 5000,
heartbeatIntervalMs: 3000,
workerConcurrency: 5,
logger: log,
})

// start HTTP server (health endpoint only in NATS mode, health + WS in WS mode)
Deno.serve({ port: WS_PORT }, (req) => {
const url = new URL(req.url)

if (url.pathname === "/health") {
return new Response(
JSON.stringify({
instanceId: cluster.id,
isLeader: cluster.isLeaderInstance,
workerCount: cluster.activeWorkerCount,
isRunning: cluster.isRunning,
podName: POD_NAME,
distributionMode: useNats ? "nats" : "ws",
}),
{ headers: { "content-type": "application/json" } },
)
}

if (!useNats && req.headers.get("upgrade")?.toLowerCase() === "websocket") {
const { socket, response } = Deno.upgradeWebSocket(req)
cluster.handleConnection(socket)
return response
}

return new Response("Not found", { status: 404 })
})

log.info(`HTTP server listening on :${WS_PORT}`)

// start cluster
await cluster.start()
log.info("Cluster started")

// graceful shutdown
Deno.addSignalListener("SIGTERM", async () => {
log.info("SIGTERM received, shutting down...")
await cluster.stop()
await db.end()
Deno.exit(0)
})
64 changes: 64 additions & 0 deletions integration/app/postgres-coordinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Client as PgClient } from "npm:pg@^8.13.0"
import type { FlowcoreDataPumpCoordinator } from "../../src/data-pump/types.ts"

export class PostgresCoordinator implements FlowcoreDataPumpCoordinator {
constructor(private readonly db: PgClient) {}

async acquireLease(instanceId: string, key: string, ttlMs: number): Promise<boolean> {
const expiresAt = new Date(Date.now() + ttlMs)
const result = await this.db.query(
`INSERT INTO flowcore_pump_leases (key, holder, expires_at)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET holder = $2, expires_at = $3
WHERE flowcore_pump_leases.expires_at < NOW() OR flowcore_pump_leases.holder = $2
RETURNING holder`,
[key, instanceId, expiresAt],
)
return result.rowCount > 0 && result.rows[0].holder === instanceId
}

async renewLease(instanceId: string, key: string, ttlMs: number): Promise<boolean> {
const expiresAt = new Date(Date.now() + ttlMs)
const result = await this.db.query(
`UPDATE flowcore_pump_leases SET expires_at = $3 WHERE key = $1 AND holder = $2`,
[key, instanceId, expiresAt],
)
return (result.rowCount ?? 0) > 0
}

async releaseLease(instanceId: string, key: string): Promise<void> {
await this.db.query(`DELETE FROM flowcore_pump_leases WHERE key = $1 AND holder = $2`, [key, instanceId])
}

async register(instanceId: string, address: string): Promise<void> {
await this.db.query(
`INSERT INTO flowcore_pump_instances (instance_id, address, last_heartbeat)
VALUES ($1, $2, NOW())
ON CONFLICT (instance_id) DO UPDATE SET address = $2, last_heartbeat = NOW()`,
[instanceId, address],
)
}

async heartbeat(instanceId: string): Promise<void> {
await this.db.query(`UPDATE flowcore_pump_instances SET last_heartbeat = NOW() WHERE instance_id = $1`, [
instanceId,
])
}

async unregister(instanceId: string): Promise<void> {
await this.db.query(`DELETE FROM flowcore_pump_instances WHERE instance_id = $1`, [instanceId])
}

async getInstances(staleThresholdMs: number): Promise<Array<{ instanceId: string; address: string }>> {
const threshold = new Date(Date.now() - staleThresholdMs)
const result = await this.db.query(
`SELECT instance_id, address FROM flowcore_pump_instances WHERE last_heartbeat > $1`,
[threshold],
)
return result.rows.map((row: { instance_id: string; address: string }) => ({
instanceId: row.instance_id,
address: row.address,
}))
}
}
Loading
Loading