Skip to content

Commit 0532802

Browse files
sgresh-stripetonyxiao
authored andcommitted
sheets
Committed-By-Agent: claude
1 parent 1d047a2 commit 0532802

17 files changed

Lines changed: 940 additions & 65 deletions

File tree

apps/engine/src/api/app.ts

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import {
2020
import { takeStateCheckpoints } from '../lib/pipeline.js'
2121
import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson'
2222
import { logger } from '../logger.js'
23-
import { createStripeSource, DEFAULT_MAX_RPS } from '@stripe/sync-source-stripe'
24-
import type { RateLimiter } from '@stripe/sync-source-stripe'
23+
import { createStripeSource, DEFAULT_MAX_RPS, fetchById } from '@stripe/sync-source-stripe'
24+
import type { RateLimiter, Config as StripeConfig } from '@stripe/sync-source-stripe'
2525
import {
2626
acquire,
2727
createRateLimiterTable,
@@ -208,8 +208,12 @@ export function createApp(resolver: ConnectorResolver) {
208208
logger.info(context, 'Engine API /setup started')
209209
const engine = await createEngineFromParams(params.pipeline, resolver, noopStateStore())
210210
try {
211-
await engine.setup()
211+
const result = await engine.setup()
212212
logger.info({ ...context, durationMs: Date.now() - startedAt }, 'Engine API /setup completed')
213+
logger.info(result, 'Engine API /setup result')
214+
if (result && Object.keys(result).length > 0) {
215+
return c.json(result, 200)
216+
}
213217
return c.body(null, 204)
214218
} catch (error) {
215219
logger.error(
@@ -276,6 +280,74 @@ export function createApp(resolver: ConnectorResolver) {
276280
)
277281
})
278282

283+
app.post('/write-events', async (c) => {
284+
const params = parseSyncParams(c)
285+
const context = { path: '/write-events', ...syncRequestContext(params) }
286+
const startedAt = Date.now()
287+
288+
// Body: [{stream, id, row_number?}]
289+
const ThinEventSchema = z.array(
290+
z.object({
291+
stream: z.string(),
292+
id: z.string(),
293+
row_number: z.number().int().positive().optional(),
294+
})
295+
)
296+
let events: z.infer<typeof ThinEventSchema>
297+
try {
298+
events = ThinEventSchema.parse(await c.req.json())
299+
} catch {
300+
return c.json({ error: 'Body must be an array of {stream, id, row_number?}' }, 400)
301+
}
302+
if (events.length === 0) {
303+
return c.json({ error: 'Events array must not be empty' }, 400)
304+
}
305+
306+
logger.info({ ...context, eventCount: events.length }, 'Engine API /write-events started')
307+
308+
const sourceConfig = params.pipeline.source as unknown as StripeConfig
309+
310+
// Fetch full objects from Stripe concurrently then build record messages
311+
const now = Date.now()
312+
const fetched = await Promise.all(
313+
events.map(async (event) => {
314+
const data = await fetchById(sourceConfig, event.stream, event.id)
315+
return { event, data }
316+
})
317+
)
318+
319+
// Log any IDs that couldn't be resolved
320+
for (const { event, data } of fetched) {
321+
if (!data) {
322+
logger.warn(
323+
{ stream: event.stream, id: event.id },
324+
'Engine API /write-events: fetchById returned null — skipping'
325+
)
326+
}
327+
}
328+
329+
const records = fetched.filter(({ data }) => data != null)
330+
331+
async function* recordMessages(): AsyncIterable<
332+
import('@stripe/sync-protocol').DestinationInput
333+
> {
334+
for (const { event, data } of records) {
335+
yield {
336+
type: 'record' as const,
337+
stream: event.stream,
338+
data: data!,
339+
emitted_at: now,
340+
row_number: event.row_number,
341+
}
342+
}
343+
}
344+
345+
const engine = await createEngineFromParams(params.pipeline, resolver, noopStateStore())
346+
return ndjsonResponse(
347+
logApiStream('Engine API /write-events', engine.write(recordMessages()), context, startedAt)
348+
)
349+
})
350+
279351
app.post('/sync', async (c) => {
280352
const params = parseSyncParams(c)
281353
const stateStore = await selectStateStore(params.pipeline)

apps/engine/src/lib/engine.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { logger } from '../logger.js'
1818
// MARK: - Engine interface
1919

2020
export interface Engine {
21-
setup(): Promise<void>
21+
setup(): Promise<Record<string, unknown> | undefined>
2222
teardown(): Promise<void>
2323
check(): Promise<{ source: CheckResult; destination: CheckResult }>
2424
read(input?: AsyncIterable<unknown>): AsyncIterable<Message>
@@ -168,22 +168,34 @@ export function createEngine(
168168
return _catalog
169169
}
170170

171+
async function setupSource(catalog: ConfiguredCatalog): Promise<void> {
172+
if (connectors.source.setup) {
173+
await withLoggedStep('Engine source setup', baseContext, () =>
174+
connectors.source.setup!({ config: sourceConfig, catalog })
175+
)
176+
}
177+
}
178+
179+
async function setupDestination(
180+
catalog: ConfiguredCatalog
181+
): Promise<Record<string, unknown> | undefined> {
182+
if (connectors.destination.setup) {
183+
const result = await withLoggedStep('Engine destination setup', baseContext, () =>
184+
connectors.destination.setup!({ config: destConfig, catalog })
185+
)
186+
return result ?? undefined
187+
}
188+
}
189+
171190
return {
172191
async setup() {
173192
const catalog = await getCatalog()
174193
const filteredCatalog = applySelection(catalog)
175-
await Promise.all([
176-
connectors.source.setup
177-
? withLoggedStep('Engine source setup', baseContext, () =>
178-
connectors.source.setup!({ config: sourceConfig, catalog })
179-
)
180-
: Promise.resolve(),
181-
connectors.destination.setup
182-
? withLoggedStep('Engine destination setup', baseContext, () =>
183-
connectors.destination.setup!({ config: destConfig, catalog: filteredCatalog })
184-
)
185-
: Promise.resolve(),
194+
const [, destResult] = await Promise.all([
195+
setupSource(catalog),
196+
setupDestination(filteredCatalog),
186197
])
198+
return destResult ?? undefined
187199
},
188200

189201
async teardown() {
@@ -244,7 +256,14 @@ export function createEngine(
244256
},
245257

246258
async *sync(input?: AsyncIterable<unknown>) {
247-
await this.setup()
259+
if (connectors.destination.skipAutoSetup) {
260+
// Destination manages its own setup externally (e.g. via /setup endpoint).
261+
// Only set up the source here.
262+
const catalog = await getCatalog()
263+
await setupSource(catalog)
264+
} else {
265+
await this.setup()
266+
}
248267
yield* pipe(this.read(input), this.write, persistState(stateStore))
249268
},
250269
}

docs/openapi/engine.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,11 @@
526526
},
527527
"emitted_at": {
528528
"type": "number"
529+
},
530+
"row_number": {
531+
"type": "integer",
532+
"exclusiveMinimum": 0,
533+
"maximum": 9007199254740991
529534
}
530535
},
531536
"required": ["type", "stream", "data", "emitted_at"]

packages/dashboard/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.next/
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
scripts/.state.json

packages/destination-google-sheets/__tests__/memory-sheets.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,19 @@ export function createMemorySheets() {
7676
const ss = getSpreadsheet(params.spreadsheetId)
7777
const requests = (params.requestBody?.requests ?? []) as Record<string, unknown>[]
7878

79+
const replies: unknown[] = []
80+
7981
for (const req of requests) {
8082
if (req.addSheet) {
8183
const props = (req.addSheet as { properties?: { title?: string } }).properties
8284
const name = props?.title ?? `Sheet${ss.sheets.size + 1}`
8385
if (ss.sheets.has(name)) {
8486
throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 })
8587
}
86-
ss.sheets.set(name, { sheetId: nextSheetId++, values: [] })
87-
}
88-
89-
if (req.updateSheetProperties) {
88+
const sheetId = nextSheetId++
89+
ss.sheets.set(name, { sheetId, values: [] })
90+
replies.push({ addSheet: { properties: { sheetId, title: name } } })
91+
} else if (req.updateSheetProperties) {
9092
const update = req.updateSheetProperties as {
9193
properties: { sheetId: number; title: string }
9294
fields: string
@@ -99,10 +101,13 @@ export function createMemorySheets() {
99101
break
100102
}
101103
}
104+
replies.push({})
105+
} else {
106+
replies.push({})
102107
}
103108
}
104109

105-
return { data: {} }
110+
return { data: { replies } }
106111
},
107112

108113
values: {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Shared helpers for the destination-google-sheets scripts.
2+
// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID.
3+
4+
import { readFileSync, writeFileSync, unlinkSync } from 'node:fs'
5+
import { resolve, dirname } from 'node:path'
6+
import { fileURLToPath } from 'node:url'
7+
8+
const __dirname = dirname(fileURLToPath(import.meta.url))
9+
const STATE_FILE = resolve(__dirname, '.state.json')
10+
11+
// ── Env loading ──────────────────────────────────────────────────────────────
12+
13+
export function loadEnv(): void {
14+
const envPath = resolve(__dirname, '../.env')
15+
try {
16+
const content = readFileSync(envPath, 'utf-8')
17+
for (const line of content.split('\n')) {
18+
const trimmed = line.trim()
19+
if (!trimmed || trimmed.startsWith('#')) continue
20+
const eqIdx = trimmed.indexOf('=')
21+
if (eqIdx === -1) continue
22+
const key = trimmed.slice(0, eqIdx).trim()
23+
const value = trimmed.slice(eqIdx + 1).trim()
24+
if (!(key in process.env)) process.env[key] = value
25+
}
26+
} catch {
27+
// .env is optional
28+
}
29+
}
30+
31+
// ── Sheet state ───────────────────────────────────────────────────────────────
32+
33+
export interface SheetState {
34+
spreadsheet_id: string
35+
/** Per-stream cursor state, persisted across sync calls for resumable pagination. */
36+
sync_state?: Record<string, unknown>
37+
}
38+
39+
export function loadState(): SheetState | null {
40+
try {
41+
return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState
42+
} catch {
43+
return null
44+
}
45+
}
46+
47+
export function saveState(state: SheetState): void {
48+
writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n')
49+
console.error(`Saved state → ${STATE_FILE}`)
50+
}
51+
52+
export function clearState(): void {
53+
try {
54+
unlinkSync(STATE_FILE)
55+
console.error(`Cleared state (${STATE_FILE})`)
56+
} catch {
57+
// already gone
58+
}
59+
}
60+
61+
// ── Pipeline builder ──────────────────────────────────────────────────────────
62+
63+
export function buildDestinationConfig(spreadsheetId?: string): Record<string, unknown> {
64+
return {
65+
name: 'google-sheets',
66+
client_id: process.env['GOOGLE_CLIENT_ID'],
67+
client_secret: process.env['GOOGLE_CLIENT_SECRET'],
68+
access_token: 'unused',
69+
refresh_token: process.env['GOOGLE_REFRESH_TOKEN'],
70+
...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}),
71+
}
72+
}
73+
74+
export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const
75+
76+
export function buildPipeline(spreadsheetId?: string): Record<string, unknown> {
77+
return {
78+
source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 },
79+
destination: buildDestinationConfig(spreadsheetId),
80+
streams: STREAMS.map((name) => ({ name })),
81+
}
82+
}
83+
84+
export function requireEnv(...keys: string[]): void {
85+
const missing = keys.filter((k) => !process.env[k])
86+
if (missing.length > 0) {
87+
console.error(`Error: missing required env vars: ${missing.join(', ')}`)
88+
process.exit(1)
89+
}
90+
}
91+
92+
export function getPort(): string {
93+
const idx = process.argv.indexOf('--port')
94+
return idx !== -1 ? process.argv[idx + 1] : '3000'
95+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env node
2+
// GET /check — validates credentials and sheet accessibility
3+
// Usage: npx tsx scripts/check-via-server.ts [--port 3000]
4+
5+
import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js'
6+
7+
loadEnv()
8+
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')
9+
10+
const state = loadState()
11+
if (!state) {
12+
console.error('No sheet state found — run setup-via-server.ts first')
13+
process.exit(1)
14+
}
15+
16+
const serverUrl = `http://localhost:${getPort()}`
17+
const pipeline = buildPipeline(state.spreadsheet_id)
18+
19+
console.error(`Hitting ${serverUrl}/check ...`)
20+
console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`)
21+
22+
const res = await fetch(`${serverUrl}/check`, {
23+
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
24+
})
25+
26+
const result = await res.json()
27+
console.log(JSON.stringify(result, null, 2))
28+
29+
if (res.status !== 200) process.exit(1)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env node
2+
// POST /setup — creates a new Google Sheet, saves its ID to .state.json
3+
// Usage: npx tsx scripts/setup-via-server.ts [--port 3000]
4+
5+
import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js'
6+
7+
loadEnv()
8+
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')
9+
10+
const serverUrl = `http://localhost:${getPort()}`
11+
12+
// No spreadsheet_id — setup always creates a new sheet
13+
const pipeline = buildPipeline()
14+
15+
console.error(`Hitting ${serverUrl}/setup ...`)
16+
17+
const res = await fetch(`${serverUrl}/setup`, {
18+
method: 'POST',
19+
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
20+
})
21+
22+
if (res.status === 200) {
23+
const result = (await res.json()) as { spreadsheet_id: string }
24+
saveState({ spreadsheet_id: result.spreadsheet_id })
25+
console.log(JSON.stringify(result, null, 2))
26+
} else {
27+
const body = await res.text()
28+
console.error(`Error: ${res.status} ${res.statusText}`)
29+
if (body) console.error(body)
30+
process.exit(1)
31+
}

0 commit comments

Comments
 (0)