diff --git a/Dockerfile b/Dockerfile index 92cffb93..6e02986f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,10 +8,15 @@ COPY package*.json ./ # Copy ts config COPY tsconfig.json ./ +# Vendored @atmo-dev/contrail* tarballs referenced by file: deps in package.json. +# Required before npm ci; regenerated via scripts/prepare-contrail-deps.sh. +# Drops out when upstream publishes to npm (PR #44 follow-up). +COPY vendor/ ./vendor/ + # ---- Dependencies ---- FROM base AS dependencies -# Install production dependencies +# Install production dependencies RUN npm ci RUN npm install -g ts-node @@ -54,3 +59,12 @@ EXPOSE 3000 # CMD npm run migration:run:prod && npm run seed:run:prod && npm run start:prod CMD npm run start:prod + + +# ---- Ingest ---- +# Same artifact as `release`, different default command. The contrail live-ingest +# Deployment streams ATProto records from Jetstream into Postgres continuously; +# no HTTP port. Process liveness is the sole health signal (kubelet restarts on +# exit). See src/contrail/ingest.ts and the Phase D plan. +FROM release AS ingest +CMD ["npm", "run", "contrail:ingest"] diff --git a/package-lock.json b/package-lock.json index 05b7b228..650a7929 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,8 @@ "version": "1.5.0", "license": "Apache-2.0", "dependencies": { - "@atmo-dev/contrail": "^0.6.0", + "@atcute/identity-resolver": "^1.2.3", + "@atmo-dev/contrail": "file:./vendor/atmo-dev-contrail.tgz", "@atproto/api": "^0.13.31", "@atproto/crypto": "^0.4.5", "@atproto/identity": "^0.4.7", @@ -648,8 +649,8 @@ }, "node_modules/@atmo-dev/contrail": { "version": "0.6.0", - "resolved": "https://registry.npmjs.org/@atmo-dev/contrail/-/contrail-0.6.0.tgz", - "integrity": "sha512-R5xlhQFT+nUkxCNKS41TwnVID2GZl2heh6Zz3Mw7+Zn0zrDLKkcbc4znNLw7q0VZk04Wx9xNxZQGlmfwUooJ1Q==", + "resolved": "file:vendor/atmo-dev-contrail.tgz", + "integrity": "sha512-zrRkIPbpwspWz/qESZqX3iwj+fkIyQ9Ddf6BTjvNdBzltrGmy96fRH4kerdyIPPFDa1MYKMjMgQn4RBbAGh8aA==", "license": "MIT", "dependencies": { "@atcute/atproto": "^3.1.10", @@ -661,6 +662,10 @@ "@atcute/jetstream": "^1.0.2", "@atcute/lexicons": "^1.2.9", "@atcute/xrpc-server": "^0.1.12", + "@atmo-dev/contrail-appview": "0.6.0", + "@atmo-dev/contrail-authority": "0.6.0", + "@atmo-dev/contrail-base": "0.6.0", + "@atmo-dev/contrail-record-host": "0.6.0", "cac": "^7.0.0", "hono": "^4.12.8", "jiti": "^2.4.0" @@ -681,6 +686,74 @@ } } }, + "node_modules/@atmo-dev/contrail-appview": { + "version": "0.6.0", + "resolved": "file:vendor/atmo-dev-contrail-appview.tgz", + "integrity": "sha512-C5pJPGdwngA7OpD7UHMZuVNOkWhVXNjgCrUUQ/GjDmqMDPRUfYnp270zqQ5rQqrTRz7a5Trb3PKyn1+nrxppvQ==", + "license": "MIT", + "dependencies": { + "@atcute/atproto": "^3.1.10", + "@atcute/cbor": "^2.3.2", + "@atcute/cid": "^2.4.1", + "@atcute/client": "^4.2.1", + "@atcute/identity": "^1.1.4", + "@atcute/identity-resolver": "^1.2.2", + "@atcute/jetstream": "^1.0.2", + "@atcute/lexicons": "^1.2.9", + "@atcute/xrpc-server": "^0.1.12", + "@atmo-dev/contrail-authority": "0.6.0", + "@atmo-dev/contrail-base": "0.6.0", + "@atmo-dev/contrail-record-host": "0.6.0", + "hono": "^4.12.8" + } + }, + "node_modules/@atmo-dev/contrail-authority": { + "version": "0.6.0", + "resolved": "file:vendor/atmo-dev-contrail-authority.tgz", + "integrity": "sha512-Fccf2GqSP7reNinflu3/K0Fvu2ihPDctIa+0Bb9OMbCFsDX7c36acmqfVlNJskhslevcPqKb8RxrJNGEgI9MuA==", + "license": "MIT", + "dependencies": { + "@atcute/cid": "^2.4.1", + "@atcute/lexicons": "^1.2.9", + "@atmo-dev/contrail-base": "0.6.0", + "hono": "^4.12.8" + } + }, + "node_modules/@atmo-dev/contrail-base": { + "version": "0.6.0", + "resolved": "file:vendor/atmo-dev-contrail-base.tgz", + "integrity": "sha512-KeatsgaXE/cx+ZR3L5YmieCRoHzALfY9WbvbLCaKnTT/Ey3O3MMRqrlBQq2/oyot5gCBN1RKl1toan0MM4M8qw==", + "license": "MIT", + "dependencies": { + "@atcute/atproto": "^3.1.10", + "@atcute/cid": "^2.4.1", + "@atcute/client": "^4.2.1", + "@atcute/identity": "^1.1.4", + "@atcute/identity-resolver": "^1.2.2", + "@atcute/lexicons": "^1.2.9", + "@atcute/xrpc-server": "^0.1.12", + "hono": "^4.12.8" + }, + "peerDependencies": { + "pg": "^8.0.0" + }, + "peerDependenciesMeta": { + "pg": { + "optional": true + } + } + }, + "node_modules/@atmo-dev/contrail-record-host": { + "version": "0.6.0", + "resolved": "file:vendor/atmo-dev-contrail-record-host.tgz", + "integrity": "sha512-UdLxr1o3ENrwma95GkTJ+/9ffExwIAG3PyGdUUGgHUYhgFyaF4gJ+30I3PpihB5V5cwytkS5sC0WtySyr6Crsw==", + "license": "MIT", + "dependencies": { + "@atcute/cid": "^2.4.1", + "@atmo-dev/contrail-base": "0.6.0", + "hono": "^4.12.8" + } + }, "node_modules/@atmo-dev/contrail/node_modules/jiti": { "version": "2.7.0", "resolved": "https://registry.npmjs.org/jiti/-/jiti-2.7.0.tgz", @@ -14622,9 +14695,9 @@ } }, "node_modules/hono": { - "version": "4.12.18", - "resolved": "https://registry.npmjs.org/hono/-/hono-4.12.18.tgz", - "integrity": "sha512-RWzP96k/yv0PQfyXnWjs6zot20TqfpfsNXhOnev8d1InAxubW93L11/oNUc3tQqn2G0bSdAOBpX+2uDFHV7kdQ==", + "version": "4.12.19", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.12.19.tgz", + "integrity": "sha512-xa3eYXYXx68XTT4hZ7dRzsXBhaq85ToSrlUJNoR0gwz/1Ap/CNwX47wfvV7pc/xWhjKVVkLT7zBJy8chhNguqQ==", "license": "MIT", "engines": { "node": ">=16.9.0" diff --git a/package.json b/package.json index 61a7d237..192371bf 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "migration:run:prod": "ts-node -r tsconfig-paths/register ./src/database/run-multi-tenant-migrations.ts", "backfill:activity-feeds": "env-cmd ts-node -r tsconfig-paths/register ./src/database/backfill-activity-feeds.ts", "contrail:sync": "node dist/contrail/sync.js", + "contrail:ingest": "node dist/contrail/ingest.js", "contrail:smoke": "ts-node -r tsconfig-paths/register ./src/contrail/smoke-server.ts", "seed:run:prod": "ts-node -r tsconfig-paths/register ./src/database/seeds/relational/run-seed.ts --tenant-config=./config/tenants.json", "schema:drop": "npm run typeorm -- --dataSource=src/database/data-source.ts schema:drop", @@ -63,8 +64,15 @@ "perf:compare": "mkdir -p k6-tests/results && MODE=comparison k6 run k6-tests/compare-index-impact.js --out json=k6-tests/results/latest.json", "perf:report": "node k6-tests/compare-results.js" }, + "overrides": { + "@atmo-dev/contrail-base": "file:./vendor/atmo-dev-contrail-base.tgz", + "@atmo-dev/contrail-appview": "file:./vendor/atmo-dev-contrail-appview.tgz", + "@atmo-dev/contrail-authority": "file:./vendor/atmo-dev-contrail-authority.tgz", + "@atmo-dev/contrail-record-host": "file:./vendor/atmo-dev-contrail-record-host.tgz" + }, "dependencies": { - "@atmo-dev/contrail": "0.6.0", + "@atcute/identity-resolver": "^1.2.3", + "@atmo-dev/contrail": "file:./vendor/atmo-dev-contrail.tgz", "@atproto/api": "^0.13.31", "@atproto/crypto": "^0.4.5", "@atproto/identity": "^0.4.7", @@ -226,7 +234,8 @@ "/../test/jest-setup.ts" ], "transformIgnorePatterns": [ - "node_modules/(?!matrix-js-sdk)" + "node_modules/(?!matrix-js-sdk)", + "contrail-pr30/packages/.*/dist/" ], "maxWorkers": "75%", "workerIdleMemoryLimit": "512MB", diff --git a/relational.e2e.Dockerfile b/relational.e2e.Dockerfile index 673b1e66..9eaa7dc4 100644 --- a/relational.e2e.Dockerfile +++ b/relational.e2e.Dockerfile @@ -4,6 +4,9 @@ RUN apk add --no-cache bash curl jq RUN npm i -g @nestjs/cli typescript ts-node COPY package*.json /tmp/app/ +# Vendored @atmo-dev/contrail* tarballs referenced by file: deps; required for +# npm install. Drops out when upstream publishes (PR #44 follow-up). +COPY vendor/ /tmp/app/vendor/ RUN cd /tmp/app && npm install COPY . /usr/src/app diff --git a/relational.test.Dockerfile b/relational.test.Dockerfile index a3e07b7c..6c0c3ba7 100644 --- a/relational.test.Dockerfile +++ b/relational.test.Dockerfile @@ -4,6 +4,9 @@ RUN apk add --no-cache bash RUN npm i -g @nestjs/cli typescript ts-node COPY package*.json /tmp/app/ +# Vendored @atmo-dev/contrail* tarballs referenced by file: deps; required for +# npm install. Drops out when upstream publishes (PR #44 follow-up). +COPY vendor/ /tmp/app/vendor/ RUN cd /tmp/app && npm install COPY . /usr/src/app diff --git a/scripts/prepare-contrail-deps.sh b/scripts/prepare-contrail-deps.sh new file mode 100755 index 00000000..d2056693 --- /dev/null +++ b/scripts/prepare-contrail-deps.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# Build the five @atmo-dev/contrail* packages from a sibling fork worktree +# and place the resulting tarballs under vendor/ with stable filenames. +# +# Local: assumes ../contrail-pr30 exists (override with CONTRAIL_DIR). +# Stable names (atmo-dev-contrail.tgz, etc.) let package.json pin paths +# without churn across fork bumps. +# +# vendor/*.tgz are TRACKED in git so CI (deploy-to-dev.yml: npm ci + +# docker build) can resolve the file: deps without a fork checkout. +# +# Regeneration workflow on a fork bump: +# scripts/prepare-contrail-deps.sh +# npm install # only touches lockfile entries for the tarballs +# git add vendor/*.tgz package-lock.json +# git commit -m "chore(contrail): bump fork pin to " +# +# Drop this script + vendor/ entirely once @atmo-dev/contrail* publish +# to npm (post PR #44 merge). +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +CONTRAIL_DIR="${CONTRAIL_DIR:-$REPO_ROOT/../contrail-pr30}" + +if [[ ! -d "$CONTRAIL_DIR" ]]; then + echo "error: CONTRAIL_DIR not found: $CONTRAIL_DIR" >&2 + echo " set CONTRAIL_DIR or check out the fork next to openmeet-api-contrail-live-ingest" >&2 + exit 1 +fi + +echo "building @atmo-dev/contrail* from $CONTRAIL_DIR ($(git -C "$CONTRAIL_DIR" rev-parse --short HEAD))" +cd "$CONTRAIL_DIR" +pnpm install --frozen-lockfile +pnpm -r --filter "@atmo-dev/contrail" --filter "@atmo-dev/contrail-base" --filter "@atmo-dev/contrail-appview" --filter "@atmo-dev/contrail-authority" --filter "@atmo-dev/contrail-record-host" build + +mkdir -p "$REPO_ROOT/vendor" + +for pkg in contrail contrail-base contrail-appview contrail-authority contrail-record-host; do + cd "$CONTRAIL_DIR/packages/$pkg" + packed=$(pnpm pack --silent | tail -1) + dest="$REPO_ROOT/vendor/atmo-dev-${pkg}.tgz" + mv "$packed" "$dest" + echo "wrote $dest" +done diff --git a/src/contrail/contrail-init-idempotency.spec.ts b/src/contrail/contrail-init-idempotency.spec.ts new file mode 100644 index 00000000..af72bb3f --- /dev/null +++ b/src/contrail/contrail-init-idempotency.spec.ts @@ -0,0 +1,51 @@ +/** + * Asserts that contrail.init() is safe to call concurrently against the same + * database. Replaces the previous contrail-init-lock.spec which exercised the + * consumer-side advisory lock; idempotency is now the library's responsibility + * (see fork PR #44 / commit L3). + * + * Gated on CONTRAIL_TEST_DATABASE_URL — local-only by default. + */ +import pg from 'pg'; +import { loadContrail } from './contrail-loader'; +import { buildContrailConfig } from './contrail.config'; + +const databaseUrl = process.env.CONTRAIL_TEST_DATABASE_URL; + +const maybe = databaseUrl ? describe : describe.skip; + +maybe('contrail.init() idempotency', () => { + let pool: pg.Pool; + + beforeEach(async () => { + pool = new pg.Pool({ connectionString: databaseUrl }); + await pool.query('DROP SCHEMA IF EXISTS contrail_idempotency_test CASCADE'); + await pool.query('CREATE SCHEMA contrail_idempotency_test'); + }); + + afterEach(async () => { + await pool.query('DROP SCHEMA contrail_idempotency_test CASCADE'); + await pool.end(); + }); + + it('should not throw when called twice sequentially', async () => { + const { pkg, postgres } = await loadContrail(); + const config = await buildContrailConfig(); + const db = postgres.createPostgresDatabase(pool); + const contrail = new pkg.Contrail({ ...config, db }); + + await contrail.init(db); + await expect(contrail.init(db)).resolves.not.toThrow(); + }); + + it('should be safe under concurrent invocation', async () => { + const { pkg, postgres } = await loadContrail(); + const config = await buildContrailConfig(); + const db = postgres.createPostgresDatabase(pool); + const contrail = new pkg.Contrail({ ...config, db }); + + await expect( + Promise.all([contrail.init(db), contrail.init(db), contrail.init(db)]), + ).resolves.not.toThrow(); + }); +}); diff --git a/src/contrail/contrail-init-lock.spec.ts b/src/contrail/contrail-init-lock.spec.ts deleted file mode 100644 index d39c21a1..00000000 --- a/src/contrail/contrail-init-lock.spec.ts +++ /dev/null @@ -1,57 +0,0 @@ -import pg from 'pg'; -import { withInitLock } from './contrail-init-lock'; - -const TEST_DB_URL = process.env.CONTRAIL_TEST_DATABASE_URL; - -const describeIfDb = TEST_DB_URL ? describe : describe.skip; - -describeIfDb('withInitLock (requires CONTRAIL_TEST_DATABASE_URL)', () => { - let pool: pg.Pool; - - beforeAll(() => { - pool = new pg.Pool({ connectionString: TEST_DB_URL }); - }); - - afterAll(async () => { - await pool.end(); - }); - - it('should serialize concurrent critical sections', async () => { - const events: string[] = []; - const work = async (id: string) => { - await withInitLock(pool, 'om-test-init-lock', async () => { - events.push(`${id}-enter`); - await new Promise((r) => setTimeout(r, 50)); - events.push(`${id}-exit`); - }); - }; - - await Promise.all([work('A'), work('B')]); - - expect(events).toHaveLength(4); - const aEnter = events.indexOf('A-enter'); - const aExit = events.indexOf('A-exit'); - const bEnter = events.indexOf('B-enter'); - const bExit = events.indexOf('B-exit'); - - const aBeforeB = aEnter < aExit && aExit < bEnter && bEnter < bExit; - const bBeforeA = bEnter < bExit && bExit < aEnter && aEnter < aExit; - - expect(aBeforeB || bBeforeA).toBe(true); - }); - - it('should release the lock on critical-section throw', async () => { - await expect( - withInitLock(pool, 'om-test-init-lock-throw', () => { - throw new Error('boom'); - }), - ).rejects.toThrow('boom'); - - let ran = false; - await withInitLock(pool, 'om-test-init-lock-throw', () => { - ran = true; - return Promise.resolve(); - }); - expect(ran).toBe(true); - }); -}); diff --git a/src/contrail/contrail-init-lock.ts b/src/contrail/contrail-init-lock.ts deleted file mode 100644 index 05fb0ff5..00000000 --- a/src/contrail/contrail-init-lock.ts +++ /dev/null @@ -1,21 +0,0 @@ -import pg from 'pg'; - -export async function withInitLock( - pool: pg.Pool, - key: string, - fn: () => Promise, -): Promise { - const client = await pool.connect(); - try { - await client.query('SELECT pg_advisory_lock(hashtext($1)::bigint)', [key]); - try { - return await fn(); - } finally { - await client.query('SELECT pg_advisory_unlock(hashtext($1)::bigint)', [ - key, - ]); - } - } finally { - client.release(); - } -} diff --git a/src/contrail/contrail.config.ts b/src/contrail/contrail.config.ts index 11dff21f..5d03d353 100644 --- a/src/contrail/contrail.config.ts +++ b/src/contrail/contrail.config.ts @@ -1,44 +1,81 @@ import type { ContrailConfig } from '@atmo-dev/contrail'; -export const contrailConfig: ContrailConfig = { - namespace: 'net.openmeet', - collections: { - event: { - collection: 'community.lexicon.calendar.event', - queryable: { - mode: {}, - name: {}, - status: {}, - startsAt: { type: 'range' }, - endsAt: { type: 'range' }, - createdAt: { type: 'range' }, +// @atcute/identity-resolver ships as ESM-only. Mirrors the trick in +// contrail-loader.ts so the dynamic import survives `module: commonjs`. +const esmImport = new Function('specifier', 'return import(specifier)') as < + T = unknown, +>( + specifier: string, +) => Promise; + +export async function buildContrailConfig(): Promise { + const plcUrl = process.env.CONTRAIL_PLC_URL; + + let resolver: unknown | undefined; + if (plcUrl) { + const ir = await esmImport( + '@atcute/identity-resolver', + ); + resolver = new ir.CompositeDidDocumentResolver({ + methods: { + plc: new ir.PlcDidDocumentResolver({ apiUrl: plcUrl }), + web: new ir.WebDidDocumentResolver(), }, - searchable: ['name', 'description'], - relations: { - rsvps: { - collection: 'rsvp', - groupBy: 'status', - count: true, - groups: { - interested: 'community.lexicon.calendar.rsvp#interested', - going: 'community.lexicon.calendar.rsvp#going', - notgoing: 'community.lexicon.calendar.rsvp#notgoing', + }); + } + + const slingshotUrl = process.env.CONTRAIL_SLINGSHOT_URL || undefined; + const additionalAllowedHosts = + process.env.CONTRAIL_ALLOWED_HOSTS?.split(',') || undefined; + + const networkOverrides = + resolver || slingshotUrl || additionalAllowedHosts + ? { resolver, slingshotUrl, additionalAllowedHosts } + : undefined; + + return { + namespace: 'net.openmeet', + collections: { + event: { + collection: 'community.lexicon.calendar.event', + queryable: { + mode: {}, + name: {}, + status: {}, + startsAt: { type: 'range' }, + endsAt: { type: 'range' }, + createdAt: { type: 'range' }, + }, + searchable: ['name', 'description'], + relations: { + rsvps: { + collection: 'rsvp', + groupBy: 'status', + count: true, + groups: { + interested: 'community.lexicon.calendar.rsvp#interested', + going: 'community.lexicon.calendar.rsvp#going', + notgoing: 'community.lexicon.calendar.rsvp#notgoing', + }, }, }, }, - }, - rsvp: { - collection: 'community.lexicon.calendar.rsvp', - queryable: { - status: {}, - 'subject.uri': {}, - }, - references: { - event: { - collection: 'event', - field: 'subject.uri', + rsvp: { + collection: 'community.lexicon.calendar.rsvp', + queryable: { + status: {}, + 'subject.uri': {}, + }, + references: { + event: { + collection: 'event', + field: 'subject.uri', + }, }, }, }, - }, -}; + jetstreams: process.env.CONTRAIL_JETSTREAM_URLS?.split(',') || undefined, + relays: process.env.CONTRAIL_RELAYS?.split(',') || undefined, + networkOverrides, + }; +} diff --git a/src/contrail/contrail.d.ts b/src/contrail/contrail.d.ts index 21702aec..842ae516 100644 --- a/src/contrail/contrail.d.ts +++ b/src/contrail/contrail.d.ts @@ -35,6 +35,12 @@ declare module '@atmo-dev/contrail' { error(...args: unknown[]): void; } + export interface NetworkOverrides { + resolver?: unknown; + slingshotUrl?: string; + additionalAllowedHosts?: string[]; + } + export interface ContrailConfig { namespace: string; collections: Record; @@ -42,6 +48,7 @@ declare module '@atmo-dev/contrail' { jetstreams?: string[]; logger?: Logger; notify?: boolean | string; + networkOverrides?: NetworkOverrides; } export type Database = unknown; @@ -63,11 +70,16 @@ declare module '@atmo-dev/contrail' { onProgress?: (p: BackfillProgress) => void; } + export interface RunPersistentOptions { + signal?: AbortSignal; + } + export class Contrail { constructor(options: ContrailOptions); init(db?: Database, spacesDb?: Database): Promise; discover(db?: Database): Promise; backfill(options?: BackfillAllOptions, db?: Database): Promise; + runPersistent(options?: RunPersistentOptions): Promise; } } @@ -83,3 +95,15 @@ declare module '@atmo-dev/contrail/postgres' { import type { Database } from '@atmo-dev/contrail'; export function createPostgresDatabase(pool: Pool): Database; } + +declare module '@atcute/identity-resolver' { + export class CompositeDidDocumentResolver { + constructor(config: { methods: Record }); + } + export class PlcDidDocumentResolver { + constructor(config: { apiUrl: string }); + } + export class WebDidDocumentResolver { + constructor(); + } +} diff --git a/src/contrail/contrail.provider.ts b/src/contrail/contrail.provider.ts index 6efdc48c..a27e8998 100644 --- a/src/contrail/contrail.provider.ts +++ b/src/contrail/contrail.provider.ts @@ -6,11 +6,9 @@ import { } from '@nestjs/common'; import pg from 'pg'; import type { Contrail } from '@atmo-dev/contrail'; -import { contrailConfig } from './contrail.config'; -import { withInitLock } from './contrail-init-lock'; +import { buildContrailConfig } from './contrail.config'; import { loadContrail } from './contrail-loader'; -const INIT_LOCK_KEY = 'net.openmeet.contrail.init'; const DEFAULT_SCHEMA = 'contrail'; @Injectable() @@ -40,19 +38,18 @@ export class ContrailProvider implements OnModuleInit, OnModuleDestroy { } as pg.PoolConfig); const { pkg, server, postgres } = await loadContrail(); + const config = await buildContrailConfig(); const db = postgres.createPostgresDatabase(this.pool); - this.contrail = new pkg.Contrail({ ...contrailConfig, db }); + this.contrail = new pkg.Contrail({ ...config, db }); - await withInitLock(this.pool, INIT_LOCK_KEY, async () => { - await this.pool!.query(`CREATE SCHEMA IF NOT EXISTS "${schema}"`); - await this.contrail!.init(); - }); + await this.pool!.query(`CREATE SCHEMA IF NOT EXISTS "${schema}"`); + await this.contrail!.init(); this.handler = server.createHandler(this.contrail); const redactedUrl = databaseUrl.replace(/:[^:@/]+@/, ':***@'); this.logger.log( - `Contrail initialized; namespace=${contrailConfig.namespace}, schema=${schema}, db=${redactedUrl}`, + `Contrail initialized; namespace=${config.namespace}, schema=${schema}, db=${redactedUrl}`, ); } diff --git a/src/contrail/ingest.ts b/src/contrail/ingest.ts new file mode 100644 index 00000000..7534b7f3 --- /dev/null +++ b/src/contrail/ingest.ts @@ -0,0 +1,71 @@ +/** + * Long-running Contrail live-ingest entrypoint. Streams ATProto records from + * the configured Jetstream(s) into the Contrail index continuously. + * + * Run as a Kubernetes Deployment (NOT inside the API pod). Process-liveness + * (no HTTP probe) is the sole health signal — kubelet restarts a dead pod. + * runPersistent's built-in reconnect/backoff handles transient Jetstream + * failures. + * + * Required env: + * CONTRAIL_DATABASE_URL Postgres connection string + * CONTRAIL_SCHEMA Schema name (default: contrail) + * CONTRAIL_JETSTREAM_URLS Comma-separated Jetstream WS URLs + * + * Optional env (private-network deployment): + * CONTRAIL_PLC_URL Override hardcoded PLC + * CONTRAIL_SLINGSHOT_URL Override Slingshot endpoint + * CONTRAIL_ALLOWED_HOSTS Comma-separated SSRF allowlist + * CONTRAIL_RELAYS Comma-separated relay URLs + */ +import pg from 'pg'; +import { buildContrailConfig } from './contrail.config'; +import { loadContrail } from './contrail-loader'; + +const DEFAULT_SCHEMA = 'contrail'; + +async function main(): Promise { + const databaseUrl = process.env.CONTRAIL_DATABASE_URL; + if (!databaseUrl) { + console.error('CONTRAIL_DATABASE_URL is required'); + process.exit(1); + } + const schema = process.env.CONTRAIL_SCHEMA ?? DEFAULT_SCHEMA; + + const pool = new pg.Pool({ + connectionString: databaseUrl, + options: `-c search_path=${schema},public`, + } as pg.PoolConfig); + + const { pkg, postgres } = await loadContrail(); + const config = await buildContrailConfig(); + const db = postgres.createPostgresDatabase(pool); + const contrail = new pkg.Contrail({ ...config, db }); + + await pool.query(`CREATE SCHEMA IF NOT EXISTS "${schema}"`); + await contrail.init(db); + + const ac = new AbortController(); + const shutdown = (signal: string) => { + console.log(`[ingest] received ${signal}; aborting`); + ac.abort(); + }; + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); + + console.log( + `[ingest] starting; namespace=${config.namespace}, schema=${schema}, ` + + `jetstreams=${(config.jetstreams ?? []).join(',') || '(default)'}`, + ); + + try { + await contrail.runPersistent({ signal: ac.signal }); + } finally { + await pool.end(); + } +} + +main().catch((err) => { + console.error('[ingest] fatal error:', err); + process.exit(1); +}); diff --git a/src/contrail/sync.ts b/src/contrail/sync.ts index 9e5e017d..d526c50f 100644 --- a/src/contrail/sync.ts +++ b/src/contrail/sync.ts @@ -12,11 +12,9 @@ * npm run contrail:sync */ import pg from 'pg'; -import { contrailConfig } from './contrail.config'; -import { withInitLock } from './contrail-init-lock'; +import { buildContrailConfig } from './contrail.config'; import { loadContrail } from './contrail-loader'; -const INIT_LOCK_KEY = 'net.openmeet.contrail.init'; const DEFAULT_SCHEMA = 'contrail'; function elapsed(start: number): string { @@ -43,16 +41,15 @@ async function main(): Promise { try { const { pkg, postgres } = await loadContrail(); + const config = await buildContrailConfig(); const db = postgres.createPostgresDatabase(pool); - const contrail = new pkg.Contrail({ ...contrailConfig, db }); + const contrail = new pkg.Contrail({ ...config, db }); const syncStart = Date.now(); console.log(`=== Contrail sync (schema=${schema}) ===\n`); - await withInitLock(pool, INIT_LOCK_KEY, async () => { - await pool.query(`CREATE SCHEMA IF NOT EXISTS "${schema}"`); - await contrail.init(); - }); + await pool.query(`CREATE SCHEMA IF NOT EXISTS "${schema}"`); + await contrail.init(); console.log('--- Discovery ---'); const discoveryStart = Date.now(); diff --git a/vendor/atmo-dev-contrail-appview.tgz b/vendor/atmo-dev-contrail-appview.tgz new file mode 100644 index 00000000..75f26323 Binary files /dev/null and b/vendor/atmo-dev-contrail-appview.tgz differ diff --git a/vendor/atmo-dev-contrail-authority.tgz b/vendor/atmo-dev-contrail-authority.tgz new file mode 100644 index 00000000..191f670a Binary files /dev/null and b/vendor/atmo-dev-contrail-authority.tgz differ diff --git a/vendor/atmo-dev-contrail-base.tgz b/vendor/atmo-dev-contrail-base.tgz new file mode 100644 index 00000000..7432cf58 Binary files /dev/null and b/vendor/atmo-dev-contrail-base.tgz differ diff --git a/vendor/atmo-dev-contrail-record-host.tgz b/vendor/atmo-dev-contrail-record-host.tgz new file mode 100644 index 00000000..aae4035e Binary files /dev/null and b/vendor/atmo-dev-contrail-record-host.tgz differ diff --git a/vendor/atmo-dev-contrail.tgz b/vendor/atmo-dev-contrail.tgz new file mode 100644 index 00000000..494382c1 Binary files /dev/null and b/vendor/atmo-dev-contrail.tgz differ