Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ for the full dependency graph.
| `packages/destination-postgres` | Postgres destination connector | `protocol`, `util-postgres` |
| `packages/destination-google-sheets` | Google Sheets destination connector | `protocol` |
| `packages/state-postgres` | Postgres state store + migrations | `util-postgres` |
| `packages/logger` | Shared structured logger with PII redaction | `pino` |
| `packages/util-postgres` | Shared Postgres utilities (upsert, rate limiter) | standalone |
| `packages/ts-cli` | Generic TypeScript module CLI runner | standalone |
| `apps/engine` | Sync engine library + stateless CLI + HTTP API | `protocol`, connectors, `state-postgres` |
Expand Down Expand Up @@ -75,7 +76,8 @@ See [docs/architecture/principles.md](docs/architecture/principles.md) for the c
## Conventions

- All serializable inputs/outputs (Zod schemas, JSON wire format) must use **snake_case** field names.
- Source connectors must use `console.error` for logging (stdout is the NDJSON stream).
- **All logging must use `@stripe/sync-logger`** — raw `console.*` is banned in source code (enforced by ESLint `no-console: error`). Apps use `createLogger()`, subprocess connectors use `createConnectorLogger()` (writes to stderr).
- **Never log secrets or synced data** — the shared logger redacts `api_key`, `secret`, `token`, `password`, `connection_string`, `data`, and related fields. Do not log record payloads, API keys, or connection strings even in test output.
- Generated OpenAPI specs live in each package's `src/__generated__/openapi.json`. Run `./scripts/generate-openapi.sh` and commit the output before pushing when schemas change. Never edit generated files by hand.
- Non-trivial PRs should be accompanied by a plan artifact in `docs/plans/YYYY-MM-DD-<slug>.md`. Save it before or alongside the first implementation commit.

Expand Down
4 changes: 3 additions & 1 deletion apps/engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"scripts": {
"build": "tsc",
"x:watch": "sh -c 'if command -v bun > /dev/null 2>&1; then bun --watch \"$@\"; else tsx --watch --conditions bun \"$@\"; fi' --",
"dev": "LOG_LEVEL=debug LOG_PRETTY=true DANGEROUSLY_VERBOSE_LOGGING=true pnpm x:watch src/api/index.ts",
"dev": "PORT=4010 LOG_PRETTY=true pnpm x:watch src/api/index.ts",
"lint": "eslint src/",
"test": "vitest run",
"generate:types": "openapi-typescript src/__generated__/openapi.json -o src/__generated__/openapi.d.ts"
},
Expand All @@ -47,6 +48,7 @@
"@stripe/sync-destination-postgres": "workspace:*",
"@stripe/sync-hono-zod-openapi": "workspace:*",
"@stripe/sync-integration-supabase": "workspace:*",
"@stripe/sync-logger": "workspace:*",
"@stripe/sync-protocol": "workspace:*",
"@stripe/sync-source-stripe": "workspace:*",
"@stripe/sync-state-postgres": "workspace:*",
Expand Down
5 changes: 3 additions & 2 deletions apps/engine/src/__tests__/stripe-to-postgres.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ describe('engine read → write', () => {

for (const r of records) {
expect(r.stream).toBe(targetStream)
expect((r as any).data).toBeDefined()
expect((r as any).data.id).toBeDefined()
const data = (r as Record<string, unknown>).data as Record<string, unknown>
expect(data).toBeDefined()
expect(data.id).toBeDefined()
}
})

Expand Down
54 changes: 29 additions & 25 deletions apps/engine/src/api/app.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { ConnectorResolver, Message, SourceStateMessage } from '../lib/index.js'
import { sourceTest, destinationTest, collectFirst } from '../lib/index.js'
import { createApp } from './app.js'
import { z } from 'zod'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Json = Record<string, any>

// ---------------------------------------------------------------------------
// Helpers
Expand Down Expand Up @@ -33,7 +37,7 @@ beforeAll(async () => {
'test',
{
connector: sourceTest,
configSchema: {} as any,
configSchema: z.object({}),
rawConfigJsonSchema: srcConfigSchema,
},
],
Expand All @@ -44,7 +48,7 @@ beforeAll(async () => {
'test',
{
connector: destinationTest,
configSchema: {} as any,
configSchema: z.object({}),
rawConfigJsonSchema: destConfigSchema,
},
],
Expand Down Expand Up @@ -131,7 +135,7 @@ describe('GET /openapi.json', () => {
it('has typed connector schemas in components (auto-generated from Zod)', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json
const schemaNames = Object.keys(spec.components?.schemas ?? {})

expect(schemaNames).toContain('SourceTestConfig')
Expand All @@ -150,7 +154,7 @@ describe('GET /openapi.json', () => {
it('defines NDJSON message schemas with discriminated unions', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json
const schemas = spec.components.schemas

// Individual message types — zod-openapi uses const for z.literal() in OpenAPI 3.1
Expand Down Expand Up @@ -187,18 +191,18 @@ describe('GET /openapi.json', () => {
it('ControlMessage source_config/destination_config reference typed connector schemas', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json
const control = spec.components.schemas.ControlMessage.properties.control

const sourceVariant = control.oneOf.find(
(v: any) => v.properties?.control_type?.const === 'source_config'
(v: Json) => v.properties?.control_type?.const === 'source_config'
)
expect(sourceVariant.properties.source_config.$ref).toBe(
'#/components/schemas/SourceTestConfig'
)

const destVariant = control.oneOf.find(
(v: any) => v.properties?.control_type?.const === 'destination_config'
(v: Json) => v.properties?.control_type?.const === 'destination_config'
)
expect(destVariant.properties.destination_config.$ref).toBe(
'#/components/schemas/DestinationTestConfig'
Expand All @@ -208,7 +212,7 @@ describe('GET /openapi.json', () => {
it('/setup spec documents 200 response (not 204)', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json
const setupOp = spec.paths['/pipeline_setup']?.post
expect(setupOp).toBeDefined()
expect(setupOp.responses['200']).toBeDefined()
Expand All @@ -218,7 +222,7 @@ describe('GET /openapi.json', () => {
it('/write spec documents a required NDJSON request body', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json
const writeOp = spec.paths['/pipeline_write']?.post
expect(writeOp).toBeDefined()
const body = writeOp.requestBody
Expand All @@ -232,7 +236,7 @@ describe('GET /openapi.json', () => {
it('/read and /sync spec documents an optional NDJSON request body', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json

for (const path of ['/pipeline_read', '/pipeline_sync'] as const) {
const op = spec.paths[path]?.post
Expand All @@ -247,13 +251,13 @@ describe('GET /openapi.json', () => {
it('documents the X-Pipeline header on sync routes', async () => {
const app = await createApp(resolver)
const res = await app.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json

// /check is a POST with X-Pipeline header
const checkOp = spec.paths['/pipeline_check']?.post
expect(checkOp).toBeDefined()
const headerParam = checkOp.parameters?.find(
(p: any) => p.in === 'header' && p.name === 'x-pipeline'
(p: Json) => p.in === 'header' && p.name === 'x-pipeline'
)
expect(headerParam).toBeDefined()
})
Expand All @@ -264,9 +268,9 @@ describe('GET /meta/sources', () => {
const app = await createApp(resolver)
const res = await app.request('/meta/sources')
expect(res.status).toBe(200)
const body = (await res.json()) as any
const body = (await res.json()) as Json
expect(Array.isArray(body.items)).toBe(true)
expect(body.items.find((c: any) => c.type === 'test')?.config_schema).toBeDefined()
expect(body.items.find((c: Json) => c.type === 'test')?.config_schema).toBeDefined()
})
})

Expand All @@ -275,7 +279,7 @@ describe('GET /meta/sources/:type', () => {
const app = await createApp(resolver)
const res = await app.request('/meta/sources/test')
expect(res.status).toBe(200)
const body = (await res.json()) as any
const body = (await res.json()) as Json
expect(body.config_schema).toBeDefined()
})

Expand All @@ -291,9 +295,9 @@ describe('GET /meta/destinations', () => {
const app = await createApp(resolver)
const res = await app.request('/meta/destinations')
expect(res.status).toBe(200)
const body = (await res.json()) as any
const body = (await res.json()) as Json
expect(Array.isArray(body.items)).toBe(true)
expect(body.items.find((c: any) => c.type === 'test')?.config_schema).toBeDefined()
expect(body.items.find((c: Json) => c.type === 'test')?.config_schema).toBeDefined()
})
})

Expand All @@ -302,7 +306,7 @@ describe('GET /meta/destinations/:type', () => {
const app = await createApp(resolver)
const res = await app.request('/meta/destinations/test')
expect(res.status).toBe(200)
const body = (await res.json()) as any
const body = (await res.json()) as Json
expect(body.config_schema).toBeDefined()
})

Expand Down Expand Up @@ -372,7 +376,7 @@ describe('POST /check', () => {
const events = await readNdjson<Record<string, unknown>>(res)
const statuses = events.filter((e) => e.type === 'connection_status')
expect(statuses).toHaveLength(2)
expect(statuses.every((s: any) => s.connection_status.status === 'succeeded')).toBe(true)
expect(statuses.every((s: Json) => s.connection_status.status === 'succeeded')).toBe(true)
})
})

Expand Down Expand Up @@ -436,7 +440,7 @@ describe('POST /read', () => {
'test',
{
connector: sourceTest,
configSchema: {} as any,
configSchema: z.object({}),
rawConfigJsonSchema: srcConfigSchema,
rawInputJsonSchema: inputSchema,
},
Expand All @@ -448,7 +452,7 @@ describe('POST /read', () => {
'test',
{
connector: destinationTest,
configSchema: {} as any,
configSchema: z.object({}),
rawConfigJsonSchema: destConfigSchema,
},
],
Expand All @@ -459,7 +463,7 @@ describe('POST /read', () => {

it('spec uses SourceInputMessage schema for /read and /sync request body when source has input schema', async () => {
const res = await inputApp.request('/openapi.json')
const spec = (await res.json()) as any
const spec = (await res.json()) as Json

for (const path of ['/pipeline_read', '/pipeline_sync'] as const) {
const body = spec.paths[path]?.post?.requestBody
Expand Down Expand Up @@ -794,8 +798,8 @@ describe('POST /source_discover', () => {
const events = await readNdjson<Record<string, unknown>>(res)
const catalogs = events.filter((e) => e.type === 'catalog')
expect(catalogs).toHaveLength(1)
const catalog = (catalogs[0] as any).catalog
const streamNames = catalog.streams.map((s: any) => s.name)
const catalog = (catalogs[0] as Json).catalog
const streamNames = catalog.streams.map((s: Json) => s.name)
expect(streamNames).toContain('customers')
expect(streamNames).toContain('products')
})
Expand Down Expand Up @@ -824,7 +828,7 @@ describe('POST /source_discover', () => {
const events = await readNdjson<Record<string, unknown>>(res)
const traces = events.filter((e) => e.type === 'trace')
expect(traces).toHaveLength(1)
const trace = (traces[0] as any).trace
const trace = (traces[0] as Json).trace
expect(trace.trace_type).toBe('error')
expect(trace.error.failure_type).toBe('system_error')
expect(trace.error.message).toContain('network unreachable')
Expand Down
21 changes: 8 additions & 13 deletions apps/engine/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const ndjsonRef = {
SourceInputMessage: { $ref: '#/components/schemas/SourceInputMessage' },
}
import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson'
import { REQUEST_HEADER_REDACT } from '@stripe/sync-logger'
import { logger } from '../logger.js'
import {
sslConfigFromConnectionString,
Expand Down Expand Up @@ -142,28 +143,22 @@ export async function createApp(resolver: ConnectorResolver) {
}
})
logger.debug(
{ requestId, method: c.req.method, path: c.req.path, headers },
{ requestId, method: c.req.method, path: c.req.path, request_headers: headers },
'request headers'
)
}
logger.info({ requestId, method: c.req.method, path: c.req.path }, 'request start')
if (dangerouslyVerbose) {
const curlParts = [`curl -X ${c.req.method} '${c.req.url}'`]
c.req.raw.headers.forEach((value, key) => {
curlParts.push(` -H '${key}: ${value}'`)
curlParts.push(
REQUEST_HEADER_REDACT.has(key.toLowerCase())
? ` -H '${key}: [REDACTED]'`
: ` -H '${key}: ${value}'`
)
})
if (hasBody(c)) {
const cl = c.req.header('Content-Length')
if (cl && Number(cl) < 100_000) {
try {
const body = await c.req.raw.clone().text()
curlParts.push(` -d '${body.replace(/'/g, "'\\''")}'`)
} catch {
/* skip */
}
} else {
curlParts.push(' --data-binary @-')
}
curlParts.push(' --data-binary @-')
}
logger.debug(curlParts.join(' \\\n'))
}
Expand Down
10 changes: 5 additions & 5 deletions apps/engine/src/cli/supabase.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { defineCommand } from 'citty'
import { install, uninstall, getCurrentVersion } from '@stripe/sync-integration-supabase'
import { logger } from '../logger.js'

const installCmd = defineCommand({
meta: {
Expand Down Expand Up @@ -67,8 +68,7 @@ const installCmd = defineCommand({

const version = args.packageVersion || getCurrentVersion()

console.log(`Installing Stripe sync to Supabase project ${project}...`)
console.log(` Edge function version: ${version}`)
logger.info({ project, version }, 'Installing Stripe sync to Supabase project')

await install({
supabaseAccessToken: token,
Expand All @@ -82,7 +82,7 @@ const installCmd = defineCommand({
supabaseManagementUrl: managementUrl,
})

console.log('Installation complete.')
logger.info('Installation complete')
},
})

Expand Down Expand Up @@ -118,15 +118,15 @@ const uninstallCmd = defineCommand({
throw new Error('Missing --project or SUPABASE_PROJECT_REF env')
}

console.log(`Uninstalling Stripe sync from Supabase project ${project}...`)
logger.info({ project }, 'Uninstalling Stripe sync from Supabase project')

await uninstall({
supabaseAccessToken: token,
supabaseProjectRef: project,
supabaseManagementUrl: managementUrl,
})

console.log('Uninstall complete.')
logger.info('Uninstall complete')
},
})

Expand Down
14 changes: 10 additions & 4 deletions apps/engine/src/lib/createSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,25 @@ export function createConnectorSchemas(resolver: ConnectorResolver) {
return { name, config, variant: z.object({ type: z.literal(name), [name]: config }) }
})

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const SourceConfig =
sources.length > 0
? z
.discriminatedUnion('type', sources.map((s) => s.variant) as [any, any, ...any[]])
.discriminatedUnion(
'type',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sources.map((s) => s.variant) as [any, any, ...any[]]
)
.meta({ id: connectorUnionId('Source') })
: z.object({ type: z.string() }).catchall(z.unknown())

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const DestinationConfig =
destinations.length > 0
? z
.discriminatedUnion('type', destinations.map((d) => d.variant) as [any, any, ...any[]])
.discriminatedUnion(
'type',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
destinations.map((d) => d.variant) as [any, any, ...any[]]
)
.meta({ id: connectorUnionId('Destination') })
: z.object({ type: z.string() }).catchall(z.unknown())

Expand Down
1 change: 0 additions & 1 deletion apps/engine/src/lib/exec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { resolveBin } from './resolver.js'
import { createSourceFromExec } from './source-exec.js'
import { createDestinationFromExec } from './destination-exec.js'
import { collectFirst } from '@stripe/sync-protocol'
import type { Message } from '@stripe/sync-protocol'

// These tests use real connector binaries (built by `pnpm build`).

Expand Down
Loading
Loading