Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@hono/node-server": "^1",
"@scalar/hono-api-reference": "^0.6",
"@stripe/sync-destination-google-sheets": "workspace:*",
"@stripe/sync-destination-aws-dsql": "workspace:*",
"@stripe/sync-destination-postgres": "workspace:*",
"@stripe/sync-hono-zod-openapi": "workspace:*",
"@stripe/sync-integration-supabase": "workspace:*",
Expand Down
2 changes: 2 additions & 0 deletions apps/engine/src/lib/default-connectors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sourceStripe from '@stripe/sync-source-stripe'
import destinationAwsDsql from '@stripe/sync-destination-aws-dsql'
import destinationPostgres from '@stripe/sync-destination-postgres'
import destinationGoogleSheets from '@stripe/sync-destination-google-sheets'
import type { RegisteredConnectors } from './resolver.js'
Expand All @@ -7,6 +8,7 @@ import type { RegisteredConnectors } from './resolver.js'
export const defaultConnectors: RegisteredConnectors = {
sources: { stripe: sourceStripe },
destinations: {
aws_dsql: destinationAwsDsql,
postgres: destinationPostgres,
google_sheets: destinationGoogleSheets,
},
Expand Down
37 changes: 26 additions & 11 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,28 @@ This syncs `products`, `prices`, and `customers` into Postgres with full schema

## All demos

| Script | What it does | Required env vars |
|--------|-------------|-------------------|
| `read-from-stripe.sh` | Read from Stripe, output NDJSON to stdout | `STRIPE_API_KEY` |
| `write-to-postgres.sh` | Write NDJSON (stdin or sample data) to Postgres | `DATABASE_URL` |
| `write-to-sheets.sh` | Write NDJSON (stdin or sample data) to Google Sheets | `GOOGLE_*` |
| `stripe-to-postgres.sh` | Stripe → Postgres via the engine | `STRIPE_API_KEY`, `DATABASE_URL` |
| `stripe-to-google-sheets.sh` | Stripe → Google Sheets via the engine | `STRIPE_API_KEY`, `GOOGLE_*` |
| Script | What it does | Required env vars |
| ---------------------------- | ---------------------------------------------------- | -------------------------------- |
| `read-from-stripe.sh` | Read from Stripe, output NDJSON to stdout | `STRIPE_API_KEY` |
| `write-to-postgres.sh` | Write NDJSON (stdin or sample data) to Postgres | `DATABASE_URL` |
| `write-to-sheets.sh` | Write NDJSON (stdin or sample data) to Google Sheets | `GOOGLE_*` |
| `stripe-to-postgres.sh` | Stripe → Postgres via the engine | `STRIPE_API_KEY`, `DATABASE_URL` |
| `stripe-to-google-sheets.sh` | Stripe → Google Sheets via the engine | `STRIPE_API_KEY`, `GOOGLE_*` |
| `stripe-to-dsql.ts` | Stripe → AWS DSQL via the engine | `STRIPE_API_KEY`, `AWS_*` |

### Stripe → AWS DSQL

Sync Stripe data to [Aurora DSQL](https://aws.amazon.com/rds/aurora/dsql/) (serverless distributed SQL):

```sh
# One-time: provision the DSQL cluster
cd terraform && terraform init && terraform apply && cd ..

# Sync (auto-reads endpoint from terraform output)
node --import tsx demo/stripe-to-dsql.ts
```

Or with explicit env vars: `DSQL_ENDPOINT=<id>.dsql.<region>.on.aws node --import tsx demo/stripe-to-dsql.ts`

### TypeScript API

Expand All @@ -115,7 +130,7 @@ node --import tsx demo/stripe-to-google-sheets.ts

## Utilities

| Script | What it does |
|--------|-------------|
| `reset-postgres.sh` | Drop all tables and non-system schemas |
| `webhooksite.sh` | Set up webhook forwarding for live Stripe events |
| Script | What it does |
| ------------------- | ------------------------------------------------ |
| `reset-postgres.sh` | Drop all tables and non-system schemas |
| `webhooksite.sh` | Set up webhook forwarding for live Stripe events |
94 changes: 94 additions & 0 deletions demo/stripe-to-dsql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Sync Stripe → AWS DSQL via the engine API (TypeScript).
*
* Usage:
* npx tsx demo/stripe-to-dsql.ts
* bun demo/stripe-to-dsql.ts
*
* Env:
* STRIPE_API_KEY — Stripe secret key
* DSQL_ENDPOINT — DSQL cluster endpoint (e.g. <id>.dsql.us-east-1.on.aws)
* AWS_REGION — AWS region (default: us-east-1)
* AWS_ACCESS_KEY_ID — AWS credentials
* AWS_SECRET_ACCESS_KEY — AWS credentials
*/
import { execSync } from 'node:child_process'
import { createConnectorResolver, createEngine } from '../apps/engine/src/lib/index.js'
import { defaultConnectors } from '../apps/engine/src/lib/default-connectors.js'
import { fileStateStore } from '../apps/engine/src/lib/state-store.js'
import type { PipelineConfig } from '../packages/protocol/src/index.js'
import { buildPoolConfig, pg } from '../packages/destination-aws-dsql/src/index.js'

const stripeApiKey = process.env.STRIPE_API_KEY
const region = process.env.AWS_REGION ?? 'us-east-1'

// Auto-read endpoint from terraform output if not set
const dsqlEndpoint =
process.env.DSQL_ENDPOINT ??
(() => {
try {
return execSync('terraform -chdir=terraform output -raw cluster_endpoint', {
encoding: 'utf8',
}).trim()
} catch {
return undefined
}
})()

if (!stripeApiKey) throw new Error('Set STRIPE_API_KEY')
if (!dsqlEndpoint)
throw new Error('Set DSQL_ENDPOINT or run `terraform -chdir=terraform apply` first')

const pipeline: PipelineConfig = {
source: { type: 'stripe', stripe: { api_key: stripeApiKey, backfill_limit: 10 } },
destination: {
type: 'aws_dsql',
aws_dsql: { endpoint: dsqlEndpoint, region, schema: 'public' },
},
streams: [{ name: 'products' }, { name: 'prices' }, { name: 'customers' }],
}

const resolver = await createConnectorResolver(defaultConnectors, { path: true })
const engine = await createEngine(resolver)

// Create tables
for await (const _msg of engine.pipeline_setup(pipeline)) {
}

// State: file-backed, resumable across runs
const store = fileStateStore('.sync-state-dsql.json')
const state = await store.get()

// Sync
for await (const msg of engine.pipeline_sync(pipeline, { state })) {
if (msg.type === 'source_state') {
if (msg.source_state.state_type === 'global') await store.setGlobal(msg.source_state.data)
else await store.set(msg.source_state.stream, msg.source_state.data)
}
console.log(JSON.stringify(msg))
}

// Verify: query DSQL to show what was synced
console.log('\n--- Verifying data in DSQL ---')
const poolConfig = await buildPoolConfig({
endpoint: dsqlEndpoint,
region,
schema: 'public',
batch_size: 100,
})
const pool = new pg.Pool(poolConfig)

for (const table of ['customers', 'prices', 'products']) {
const { rows } = await pool.query(`SELECT count(*) FROM ${table}`)
console.log(`${table}: ${rows[0].count} rows`)
}

console.log('\nSample rows:')
for (const table of ['customers', 'products']) {
const { rows } = await pool.query(
`SELECT id, substring(_raw_data, 1, 100) as data FROM ${table} LIMIT 2`
)
for (const row of rows) console.log(` [${table}] ${row.id}: ${row.data}...`)
}

await pool.end()
32 changes: 32 additions & 0 deletions packages/destination-aws-dsql/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@stripe/sync-destination-aws-dsql",
"version": "0.1.0",
"private": false,
"type": "module",
"exports": {
".": {
"bun": "./src/index.ts",
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"test": "vitest"
},
"files": [
"dist",
"src"
],
"dependencies": {
"@aws-sdk/dsql-signer": "^3.1013.0",
"@stripe/sync-protocol": "workspace:*",
"@stripe/sync-util-postgres": "workspace:*",
"pg": "^8.16.3",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/pg": "^8.15.5",
"vitest": "^3.2.4"
}
}
Loading
Loading