diff --git a/apps/engine/src/lib/pipeline.test.ts b/apps/engine/src/lib/pipeline.test.ts index 9cfaaa460..79effca62 100644 --- a/apps/engine/src/lib/pipeline.test.ts +++ b/apps/engine/src/lib/pipeline.test.ts @@ -91,6 +91,41 @@ describe('enforceCatalog()', () => { expect((result[0] as { data: unknown }).data).toEqual({ id: 'sub_1', status: 'active' }) }) + it('drops unknown internal fields that are not present in the catalog schema', async () => { + const msgs: Message[] = [ + { + type: 'record', + stream: 'subscriptions', + data: { + id: 'sub_1', + status: 'active', + customer: 'cus_1', + _row_key: '["sub_1"]', + _row_number: 12, + }, + emitted_at: 1, + }, + ] + const result = await drain( + enforceCatalog( + catalog([ + { + name: 'subscriptions', + json_schema: { + type: 'object', + properties: { id: { type: 'string' }, status: { type: 'string' } }, + }, + }, + ]) + )(toAsync(msgs)) + ) + expect(result).toHaveLength(1) + expect((result[0] as { data: unknown }).data).toEqual({ + id: 'sub_1', + status: 'active', + }) + }) + it('passes records through unchanged when json_schema is absent', async () => { const msgs: Message[] = [ { diff --git a/apps/engine/src/lib/pipeline.ts b/apps/engine/src/lib/pipeline.ts index 741e9cb7e..75c0b00b0 100644 --- a/apps/engine/src/lib/pipeline.ts +++ b/apps/engine/src/lib/pipeline.ts @@ -25,7 +25,7 @@ export function enforceCatalog( if (props) { yield { ...msg, - data: Object.fromEntries(Object.entries(msg.data).filter(([k]) => k in props)), + data: Object.fromEntries(Object.entries(msg.data).filter(([key]) => key in props)), } } else { yield msg diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index eaf2db62f..1fec28727 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -3,11 +3,11 @@ import { TestWorkflowEnvironment } from '@temporalio/testing' import { Worker } from '@temporalio/worker' import path from 'node:path' import type { PipelineConfig } from '@stripe/sync-engine' -import type { SyncActivities } from '../temporal/activities.js' -import type { RunResult } from '../temporal/activities.js' +import type { SyncActivities } from '../temporal/activities/index.js' +import type { RunResult } from '../temporal/activities/index.js' -// workflowsPath must point to compiled JS (Temporal bundles it for V8 sandbox) -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +// workflowsPath points to the compiled workflow directory. +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const noErrors: RunResult = { errors: [], state: {} } @@ -19,9 +19,17 @@ const testPipeline = { function stubActivities(overrides: Partial = {}): SyncActivities { return { + discoverCatalog: async () => ({ streams: [] }), setup: async () => ({}), syncImmediate: async () => noErrors, + readIntoQueueWithState: async () => ({ count: 0, state: {} }), readIntoQueue: async () => ({ count: 0, state: {} }), + writeGoogleSheetsFromQueue: async () => ({ + errors: [], + state: {}, + written: 0, + rowAssignments: {}, + }), writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, ...overrides, @@ -284,3 +292,124 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }) }) }) + +describe('pipelineGoogleSheetsWorkflow (unit — stubbed activities)', () => { + it('uses the Sheets-specific read path and catalog discovery', async () => { + let discoverCalls = 0 + let readCalls = 0 + let syncCalls = 0 + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: 'test-queue-gs-1', + workflowsPath, + activities: stubActivities({ + discoverCatalog: async () => { + discoverCalls++ + return { streams: [] } + }, + readIntoQueueWithState: async () => { + readCalls++ + return { count: 0, state: {} } + }, + syncImmediate: async () => { + syncCalls++ + return noErrors + }, + }), + }) + + await worker.runUntil(async () => { + const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', { + args: [ + { + ...testPipeline, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + }, + }, + ], + workflowId: 'test-gs-sync-1', + taskQueue: 'test-queue-gs-1', + }) + + await new Promise((r) => setTimeout(r, 1500)) + await handle.signal('delete') + await handle.result() + + expect(discoverCalls).toBeGreaterThanOrEqual(1) + expect(readCalls).toBeGreaterThanOrEqual(1) + expect(syncCalls).toBe(0) + }) + }) + + it('passes the discovered catalog into the Sheets write activity', async () => { + const discoveredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id']], + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + sync_mode: 'full_refresh' as const, + destination_sync_mode: 'append' as const, + }, + ], + } + let readCalls = 0 + let writeCatalog: unknown + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: 'test-queue-gs-2', + workflowsPath, + activities: stubActivities({ + discoverCatalog: async () => discoveredCatalog, + readIntoQueueWithState: async () => { + readCalls++ + return readCalls === 1 + ? { count: 1, state: { customers: { cursor: 'cus_1' } } } + : { count: 0, state: { customers: { cursor: 'cus_1' } } } + }, + writeGoogleSheetsFromQueue: async (_config, _pipelineId, opts) => { + writeCatalog = opts?.catalog + return { + errors: [], + state: { customers: { cursor: 'cus_1' } }, + written: 0, + rowAssignments: {}, + } + }, + }), + }) + + await worker.runUntil(async () => { + const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', { + args: [ + { + ...testPipeline, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_456', + }, + }, + ], + workflowId: 'test-gs-sync-2', + taskQueue: 'test-queue-gs-2', + }) + + await new Promise((r) => setTimeout(r, 1500)) + await handle.signal('delete') + await handle.result() + + expect(writeCatalog).toEqual(discoveredCatalog) + }) + }) +}) diff --git a/apps/service/src/api/app.integration.test.ts b/apps/service/src/api/app.integration.test.ts index 0e73060cb..1e64385c8 100644 --- a/apps/service/src/api/app.integration.test.ts +++ b/apps/service/src/api/app.integration.test.ts @@ -11,7 +11,7 @@ import Stripe from 'stripe' import sourceStripe from '@stripe/sync-source-stripe' import destinationPostgres from '@stripe/sync-destination-postgres' import { createApp as createEngineApp, createConnectorResolver } from '@stripe/sync-engine' -import { createActivities } from '../temporal/activities.js' +import { createActivities } from '../temporal/activities/index.js' import { createApp } from './app.js' import type { paths } from '../__generated__/openapi.js' @@ -24,7 +24,7 @@ const STRIPE_API_KEY = process.env['STRIPE_API_KEY']! const POSTGRES_URL = process.env['POSTGRES_URL'] ?? process.env['DATABASE_URL']! const TASK_QUEUE = `test-app-${Date.now()}` const SCHEMA = `integration_${Date.now()}` -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const SKIP_CLEANUP = process.env['SKIP_CLEANUP'] === '1' diff --git a/apps/service/src/api/app.test.ts b/apps/service/src/api/app.test.ts index 8a0f9c87a..38c6f1e01 100644 --- a/apps/service/src/api/app.test.ts +++ b/apps/service/src/api/app.test.ts @@ -1,15 +1,16 @@ -import { describe, expect, it, beforeAll, afterAll } from 'vitest' +import { describe, expect, it, beforeAll, afterAll, vi } from 'vitest' import type { WorkflowClient } from '@temporalio/client' import { TestWorkflowEnvironment } from '@temporalio/testing' import { Worker } from '@temporalio/worker' import path from 'node:path' import { createConnectorResolver, sourceTest, destinationTest } from '@stripe/sync-engine' -import type { SyncActivities, RunResult } from '../temporal/activities.js' +import destinationGoogleSheets from '@stripe/sync-destination-google-sheets' +import type { SyncActivities, RunResult } from '../temporal/activities/index.js' import { createApp } from './app.js' const resolver = createConnectorResolver({ sources: { test: sourceTest }, - destinations: { test: destinationTest }, + destinations: { test: destinationTest, 'google-sheets': destinationGoogleSheets }, }) // Lightweight app for spec/health tests (no Temporal needed) @@ -68,18 +69,60 @@ describe('GET /health', () => { }) }) +describe('POST /pipelines workflow dispatch', () => { + it('starts google-sheets pipelines on the dedicated workflow', async () => { + const start = vi.fn(async () => ({})) + const res = await createApp({ + temporal: { client: { start } as unknown as WorkflowClient, taskQueue: 'unused' }, + resolver, + }).request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Test Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(res.status).toBe(201) + expect(start).toHaveBeenCalledOnce() + expect(start).toHaveBeenCalledWith( + 'pipelineGoogleSheetsWorkflow', + expect.objectContaining({ + taskQueue: 'unused', + }) + ) + }) +}) + // --------------------------------------------------------------------------- // Pipeline CRUD + pause/resume (in-memory Temporal, stub activities) // --------------------------------------------------------------------------- -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const noErrors: RunResult = { errors: [], state: {} } function stubActivities(): SyncActivities { return { + discoverCatalog: async () => ({ streams: [] }), setup: async () => ({}), syncImmediate: async () => noErrors, + readIntoQueueWithState: async () => ({ count: 0, state: {} }), readIntoQueue: async () => ({ count: 0, state: {} }), + writeGoogleSheetsFromQueue: async () => ({ + errors: [], + state: {}, + written: 0, + rowAssignments: {}, + }), writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, } @@ -175,6 +218,99 @@ describe('pipeline CRUD', () => { await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) }) + it('rejects changing the target spreadsheet for a google-sheets pipeline', async () => { + const a = liveApp() + + const createRes = await a.request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Original Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + const created = await createRes.json() + await waitForPipeline(a, created.id) + + const updateRes = await a.request(`/pipelines/${created.id}`, { + method: 'PATCH', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_456', + spreadsheet_title: 'Replacement Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(updateRes.status).toBe(400) + expect(await updateRes.json()).toEqual({ + error: + 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + }) + + await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) + }) + + it('allows changing spreadsheet title when spreadsheet_id is unchanged', async () => { + const a = liveApp() + + const createRes = await a.request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Original Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + const created = await createRes.json() + await waitForPipeline(a, created.id) + + const updateRes = await a.request(`/pipelines/${created.id}`, { + method: 'PATCH', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Renamed Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(updateRes.status).toBe(200) + const updated = await updateRes.json() + expect(updated.destination.spreadsheet_id).toBe('sheet_123') + expect(updated.destination.spreadsheet_title).toBe('Renamed Sheet') + + await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) + }) + it('pause and resume return pipeline with updated status', async () => { const a = liveApp() diff --git a/apps/service/src/api/app.ts b/apps/service/src/api/app.ts index 7252a3ac4..1163d1d53 100644 --- a/apps/service/src/api/app.ts +++ b/apps/service/src/api/app.ts @@ -6,7 +6,18 @@ import type { ConnectorResolver } from '@stripe/sync-engine' import { endpointTable, addDiscriminators } from '@stripe/sync-engine/api/openapi-utils' import { createSchemas } from '../lib/createSchemas.js' import type { Pipeline } from '../lib/createSchemas.js' -import type { WorkflowStatus } from '../temporal/workflows.js' +import type { WorkflowStatus } from '../temporal/workflows/_shared.js' + +const DEFAULT_PIPELINE_WORKFLOW = 'pipelineWorkflow' +const GOOGLE_SHEETS_PIPELINE_WORKFLOW = 'pipelineGoogleSheetsWorkflow' +const ACTIVE_PIPELINE_STATUSES = + "ExecutionStatus IN ('Running', 'Failed', 'Terminated', 'TimedOut', 'Canceled')" + +function workflowTypeForPipeline(pipeline: Pipeline): string { + return pipeline.destination.type === 'google-sheets' + ? GOOGLE_SHEETS_PIPELINE_WORKFLOW + : DEFAULT_PIPELINE_WORKFLOW +} // MARK: - Helpers @@ -114,28 +125,30 @@ export function createApp(options: AppOptions) { // Completed = soft-deleted (via delete signal). Show everything else // including failed/terminated so operators can see broken pipelines. const pipelines: Array = [] - for await (const wf of temporal.list({ - query: `WorkflowType = 'pipelineWorkflow' AND ExecutionStatus IN ('Running', 'Failed', 'Terminated', 'TimedOut', 'Canceled')`, - })) { - try { - const handle = temporal.getHandle(wf.workflowId) - const [pipeline, status] = await Promise.all([ - handle.query('config'), - handle.query('status'), - ]) - pipelines.push({ ...pipeline, status }) - } catch { - // Non-queryable (failed/terminated) — fall back to memo with derived status - const memo = wf.memo as { pipeline?: Pipeline } | undefined - if (memo?.pipeline) { - pipelines.push({ - ...memo.pipeline, - status: { - phase: wf.status.name.toLowerCase(), - paused: false, - iteration: 0, - }, - }) + for (const workflowType of [DEFAULT_PIPELINE_WORKFLOW, GOOGLE_SHEETS_PIPELINE_WORKFLOW]) { + for await (const wf of temporal.list({ + query: `WorkflowType = '${workflowType}' AND ${ACTIVE_PIPELINE_STATUSES}`, + })) { + try { + const handle = temporal.getHandle(wf.workflowId) + const [pipeline, status] = await Promise.all([ + handle.query('config'), + handle.query('status'), + ]) + pipelines.push({ ...pipeline, status }) + } catch { + // Non-queryable (failed/terminated) — fall back to memo with derived status + const memo = wf.memo as { pipeline?: Pipeline } | undefined + if (memo?.pipeline) { + pipelines.push({ + ...memo.pipeline, + status: { + phase: wf.status.name.toLowerCase(), + paused: false, + iteration: 0, + }, + }) + } } } } @@ -168,7 +181,7 @@ export function createApp(options: AppOptions) { const body = c.req.valid('json') const id = genId('pipe') const pipeline = { id, ...(body as Record) } as Pipeline - await temporal.start('pipelineWorkflow', { + await temporal.start(workflowTypeForPipeline(pipeline), { workflowId: id, taskQueue, args: [pipeline], @@ -263,6 +276,34 @@ export function createApp(options: AppOptions) { const patch = c.req.valid('json') try { const handle = temporal.getHandle(id) + const current = await handle.query('config') + const next = { + ...current, + source: patch.source ? patch.source : current.source, + destination: patch.destination ? patch.destination : current.destination, + streams: patch.streams !== undefined ? patch.streams : current.streams, + } as Pipeline + if (workflowTypeForPipeline(current) !== workflowTypeForPipeline(next)) { + return c.json( + { + error: + 'Changing destination.type between google-sheets and non-google-sheets requires recreating the pipeline', + }, + 400 + ) + } + if ( + current.destination.type === 'google-sheets' && + current.destination.spreadsheet_id !== next.destination.spreadsheet_id + ) { + return c.json( + { + error: + 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + }, + 400 + ) + } await handle.signal('update', patch) // Brief wait for signal to be processed before querying await new Promise((r) => setTimeout(r, 200)) diff --git a/apps/service/src/cli.test.ts b/apps/service/src/cli.test.ts new file mode 100644 index 000000000..7fafbb09d --- /dev/null +++ b/apps/service/src/cli.test.ts @@ -0,0 +1,87 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const runMock = vi.fn(async () => {}) +const createWorkerMock = vi.fn(async () => ({ run: runMock })) + +vi.mock('./temporal/worker.js', () => ({ + createWorker: createWorkerMock, +})) + +describe('worker CLI', () => { + const originalKafkaBroker = process.env['KAFKA_BROKER'] + + beforeEach(() => { + vi.clearAllMocks() + delete process.env['KAFKA_BROKER'] + }) + + afterEach(() => { + if (originalKafkaBroker === undefined) { + delete process.env['KAFKA_BROKER'] + } else { + process.env['KAFKA_BROKER'] = originalKafkaBroker + } + }) + + it('threads --kafka-broker through to createWorker', async () => { + vi.resetModules() + const { createProgram } = await import('./cli.js') + const program = (await createProgram()) as { + subCommands: Record< + string, + { + args: Record + run: (input: { args: Record }) => Promise + } + > + } + + expect(program.subCommands['worker']?.args['kafka-broker']).toBeDefined() + + await program.subCommands['worker']!.run({ + args: { + 'temporal-address': 'localhost:7233', + 'temporal-namespace': 'default', + 'temporal-task-queue': 'sync-engine', + 'engine-url': 'http://localhost:4010', + 'kafka-broker': 'localhost:9092', + }, + }) + + expect(createWorkerMock).toHaveBeenCalledWith( + expect.objectContaining({ + kafkaBroker: 'localhost:9092', + }) + ) + expect(runMock).toHaveBeenCalledOnce() + }) + + it('falls back to KAFKA_BROKER when the flag is omitted', async () => { + process.env['KAFKA_BROKER'] = 'env-broker:9092' + vi.resetModules() + const { createProgram } = await import('./cli.js') + const program = (await createProgram()) as { + subCommands: Record< + string, + { + run: (input: { args: Record }) => Promise + } + > + } + + await program.subCommands['worker']!.run({ + args: { + 'temporal-address': 'localhost:7233', + 'temporal-namespace': 'default', + 'temporal-task-queue': 'sync-engine', + 'engine-url': 'http://localhost:4010', + }, + }) + + expect(createWorkerMock).toHaveBeenCalledWith( + expect.objectContaining({ + kafkaBroker: 'env-broker:9092', + }) + ) + }) +}) diff --git a/apps/service/src/cli.ts b/apps/service/src/cli.ts index 790ad10a3..7c7ae1c32 100644 --- a/apps/service/src/cli.ts +++ b/apps/service/src/cli.ts @@ -91,30 +91,42 @@ const workerCmd = defineCommand({ default: 'http://localhost:4010', description: 'Sync engine URL for sync execution (default: http://localhost:4010)', }, + 'kafka-broker': { + type: 'string', + description: + 'Kafka broker for queue-backed workflows (for example localhost:9092). Can also be set via KAFKA_BROKER.', + }, }, async run({ args }) { const { createWorker } = await import('./temporal/worker.js') const taskQueue = args['temporal-task-queue'] || 'sync-engine' const namespace = args['temporal-namespace'] || 'default' const engineUrl = args['engine-url'] || 'http://localhost:4010' + const kafkaBroker = args['kafka-broker'] || process.env['KAFKA_BROKER'] const temporalAddress = args['temporal-address'] // import.meta.url is the URL of cli.ts/cli.js, NOT the bin entry point: - // tsx: file:///.../apps/service/src/cli.ts → ./temporal/workflows.ts - // compiled: file:///.../apps/service/dist/cli.js → ./temporal/workflows.js + // tsx: file:///.../apps/service/src/cli.ts → ./temporal/workflows/index.ts + // compiled: file:///.../apps/service/dist/cli.js → ./temporal/workflows/index.js const { fileURLToPath } = await import('node:url') const ext = import.meta.url.endsWith('.ts') ? '.ts' : '.js' - const workflowsPath = fileURLToPath(new URL(`./temporal/workflows${ext}`, import.meta.url)) + const workflowsPath = fileURLToPath( + new URL(`./temporal/workflows/index${ext}`, import.meta.url) + ) const worker = await createWorker({ temporalAddress, namespace, taskQueue, engineUrl, + kafkaBroker, workflowsPath, }) - logger.info({ temporalAddress, namespace, taskQueue, engineUrl }, 'Starting Temporal worker') + logger.info( + { temporalAddress, namespace, taskQueue, engineUrl, kafkaBroker }, + 'Starting Temporal worker' + ) await worker.run() }, diff --git a/apps/service/src/index.ts b/apps/service/src/index.ts index c614db809..66aa28cd8 100644 --- a/apps/service/src/index.ts +++ b/apps/service/src/index.ts @@ -17,8 +17,8 @@ export { createApp } from './api/app.js' export type { AppOptions } from './api/app.js' // Temporal workflow types (for consumers that need to reference them) -export { createActivities } from './temporal/activities.js' -export type { SyncActivities, RunResult } from './temporal/activities.js' -export type { WorkflowStatus } from './temporal/workflows.js' +export { createActivities } from './temporal/activities/index.js' +export type { SyncActivities, RunResult } from './temporal/activities/index.js' +export type { WorkflowStatus } from './temporal/workflows/_shared.js' export { createWorker } from './temporal/worker.js' export type { WorkerOptions } from './temporal/worker.js' diff --git a/apps/service/src/temporal/activities.ts b/apps/service/src/temporal/activities.ts deleted file mode 100644 index 17d54477f..000000000 --- a/apps/service/src/temporal/activities.ts +++ /dev/null @@ -1,180 +0,0 @@ -import { heartbeat } from '@temporalio/activity' -import { createRemoteEngine } from '@stripe/sync-engine' -import type { PipelineConfig, Message, SetupResult, SyncOpts } from '@stripe/sync-engine' -import { Kafka } from 'kafkajs' - -export interface RunResult { - errors: Array<{ message: string; failure_type?: string; stream?: string }> - state: Record -} - -/** Convert an array to an async iterable. */ -async function* asIterable(items: T[]): AsyncIterable { - for (const item of items) yield item -} - -/** Iterate a message stream, collecting errors/state/records/eof and heartbeating. */ -async function drainMessages(stream: AsyncIterable>): Promise<{ - errors: RunResult['errors'] - state: Record - records: unknown[] - eof?: { reason: string } -}> { - const errors: RunResult['errors'] = [] - const state: Record = {} - const records: unknown[] = [] - let eof: { reason: string } | undefined - let count = 0 - - for await (const m of stream) { - count++ - if (m.type === 'eof') { - eof = { reason: m.reason as string } - } else if (m.type === 'error') { - errors.push({ - message: - (m.message as string) || - ((m.data as Record)?.message as string) || - 'Unknown error', - failure_type: m.failure_type as string | undefined, - stream: m.stream as string | undefined, - }) - } else if (m.type === 'state' && typeof m.stream === 'string') { - state[m.stream] = m.data - } else if (m.type === 'record') { - records.push(m) - } - if (count % 50 === 0) heartbeat({ messages: count }) - } - if (count % 50 !== 0) heartbeat({ messages: count }) - - return { errors, state, records, eof } -} - -export function createActivities(opts: { engineUrl: string; kafkaBroker?: string }) { - const { engineUrl, kafkaBroker } = opts - - // Single engine client shared across all activity calls - const engine = createRemoteEngine(engineUrl) - - // Shared Kafka client + producer (created lazily, reused across activity calls) - let kafka: Kafka | undefined - let producerConnected: Promise | undefined - - function getKafka(): Kafka { - if (!kafka) { - if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') - kafka = new Kafka({ brokers: [kafkaBroker] }) - } - return kafka - } - - function getProducer(): Promise { - if (!producerConnected) { - const producer = getKafka().producer() - producerConnected = producer.connect().then(() => producer) - } - return producerConnected - } - - function topicName(pipelineId: string): string { - return `pipeline.${pipelineId}` - } - - return { - async setup(config: PipelineConfig): Promise { - return engine.pipeline_setup(config) - }, - - async syncImmediate( - config: PipelineConfig, - activityOpts?: SyncOpts & { input?: unknown[] } - ): Promise { - const { input: inputArr, ...syncOpts } = activityOpts ?? {} - const input = inputArr?.length ? asIterable(inputArr) : undefined - const { errors, state, eof } = await drainMessages( - engine.pipeline_sync(config, syncOpts, input) as AsyncIterable> - ) - return { errors, state, eof } - }, - - async readIntoQueue( - config: PipelineConfig, - pipelineId: string, - activityOpts?: SyncOpts & { input?: unknown[] } - ): Promise<{ count: number; state: Record; eof?: { reason: string } }> { - const { input: inputArr, ...syncOpts } = activityOpts ?? {} - const input = inputArr?.length ? asIterable(inputArr) : undefined - const { records, state, eof } = await drainMessages( - engine.pipeline_read(config, syncOpts, input) as AsyncIterable> - ) - - // If Kafka is configured, produce records to the pipeline topic - if (kafkaBroker && records.length > 0) { - const producer = await getProducer() - await producer.send({ - topic: topicName(pipelineId), - messages: records.map((r) => ({ value: JSON.stringify(r) })), - }) - } - - return { count: records.length, state, eof } - }, - - async writeFromQueue( - config: PipelineConfig, - pipelineId: string, - activityOpts?: { records?: unknown[]; maxBatch?: number } - ): Promise { - let records: unknown[] - - if (kafkaBroker) { - // Consume a batch from Kafka - const maxBatch = activityOpts?.maxBatch ?? 50 - records = [] - const consumer = getKafka().consumer({ groupId: `pipeline.${pipelineId}` }) - await consumer.connect() - await consumer.subscribe({ topic: topicName(pipelineId), fromBeginning: false }) - - await new Promise((resolve) => { - consumer.run({ - eachMessage: async ({ message }) => { - if (message.value) { - records.push(JSON.parse(message.value.toString())) - } - if (records.length >= maxBatch) { - resolve() - } - }, - }) - // If fewer than maxBatch messages are available, resolve after a short wait - setTimeout(resolve, 2000) - }) - - await consumer.disconnect() - } else { - // In-memory mode: records passed directly - records = activityOpts?.records ?? [] - } - - if (records.length === 0) { - return { errors: [], state: {}, written: 0 } - } - - const { errors, state } = await drainMessages( - engine.pipeline_write( - config, - asIterable(records) as AsyncIterable - ) as AsyncIterable> - ) - - return { errors, state, written: records.length } - }, - - async teardown(config: PipelineConfig): Promise { - await engine.pipeline_teardown(config) - }, - } -} - -export type SyncActivities = ReturnType diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts new file mode 100644 index 000000000..1f68edf14 --- /dev/null +++ b/apps/service/src/temporal/activities/_shared.ts @@ -0,0 +1,253 @@ +import { heartbeat } from '@temporalio/activity' +import type { ConfiguredCatalog, Message, RecordMessage } from '@stripe/sync-engine' +import { Kafka } from 'kafkajs' +import type { Producer } from 'kafkajs' +import { + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, +} from '@stripe/sync-destination-google-sheets' + +export interface ActivitiesContext { + engineUrl: string + kafkaBroker?: string + getProducer(): Promise + consumeQueueBatch(pipelineId: string, maxBatch: number): Promise +} + +export function createActivitiesContext(opts: { + engineUrl: string + kafkaBroker?: string +}): ActivitiesContext { + const { engineUrl, kafkaBroker } = opts + + let kafka: Kafka | undefined + let producerConnected: Promise | undefined + + function getKafka(): Kafka { + if (!kafka) { + if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') + kafka = new Kafka({ brokers: [kafkaBroker] }) + } + return kafka + } + + function topicName(pipelineId: string): string { + return `pipeline.${pipelineId}` + } + + async function getProducer(): Promise { + if (!producerConnected) { + const producer = getKafka().producer() + producerConnected = producer.connect().then(() => producer) + } + return producerConnected + } + + async function consumeQueueBatch(pipelineId: string, maxBatch: number): Promise { + if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') + + const topic = topicName(pipelineId) + const messages: Message[] = [] + const offsets = new Map() + const consumer = getKafka().consumer({ groupId: `pipeline.${pipelineId}` }) + await consumer.connect() + await consumer.subscribe({ topic, fromBeginning: true }) + + try { + await new Promise((resolve) => { + let resolved = false + const finish = () => { + if (resolved) return + resolved = true + resolve() + } + + consumer.run({ + eachMessage: async ({ partition, message }) => { + if (message.value) { + messages.push(JSON.parse(message.value.toString()) as Message) + offsets.set(partition, (BigInt(message.offset) + 1n).toString()) + } + if (messages.length >= maxBatch) finish() + }, + }) + + setTimeout(finish, 2000) + }) + + await consumer.stop() + + if (offsets.size > 0) { + await consumer.commitOffsets( + [...offsets.entries()].map(([partition, offset]) => ({ + topic, + partition, + offset, + })) + ) + } + } finally { + await consumer.disconnect() + } + + return messages + } + + return { + engineUrl, + kafkaBroker, + getProducer, + consumeQueueBatch, + } +} + +export interface RunResult { + errors: Array<{ message: string; failure_type?: string; stream?: string }> + state: Record +} + +export async function* asIterable(items: T[]): AsyncIterable { + for (const item of items) yield item +} + +export function pipelineHeader(config: Record): string { + return JSON.stringify(config) +} + +export function collectError(message: Record): RunResult['errors'][number] | null { + if (message.type !== 'error') return null + return { + message: + (message.message as string) || + ((message.data as Record)?.message as string) || + 'Unknown error', + failure_type: message.failure_type as string | undefined, + stream: message.stream as string | undefined, + } +} + +export function withRowKey(record: RecordMessage, catalog?: ConfiguredCatalog): RecordMessage { + const primaryKey = catalog?.streams.find((stream) => stream.stream.name === record.stream)?.stream + .primary_key + if (!primaryKey) return record + return { + ...record, + data: { + ...record.data, + [ROW_KEY_FIELD]: serializeRowKey(primaryKey, record.data), + }, + } +} + +export function compactGoogleSheetsMessages(messages: Message[]): Message[] { + const compacted: Message[] = [] + let pendingOrder: string[] = [] + let pending = new Map() + + const flushPending = () => { + for (const key of pendingOrder) { + const message = pending.get(key) + if (message) compacted.push(message) + } + pendingOrder = [] + pending = new Map() + } + + for (const message of messages) { + if (message.type === 'record') { + const rowKey = + typeof message.data[ROW_KEY_FIELD] === 'string' ? message.data[ROW_KEY_FIELD] : undefined + if (!rowKey) { + compacted.push(message) + continue + } + const dedupeKey = `${message.stream}:${rowKey}` + if (!pending.has(dedupeKey)) pendingOrder.push(dedupeKey) + pending.set(dedupeKey, message) + continue + } + + if (message.type === 'state') { + flushPending() + compacted.push(message) + } + } + + flushPending() + return compacted +} + +export function addRowNumbers( + messages: Message[], + rowIndex: Record> +): Message[] { + return messages.map((message) => { + if (message.type !== 'record') return message + const rowKey = + typeof message.data[ROW_KEY_FIELD] === 'string' ? message.data[ROW_KEY_FIELD] : undefined + const rowNumber = rowKey ? rowIndex[message.stream]?.[rowKey] : undefined + if (rowNumber === undefined) return message + return { + ...message, + data: { + ...message.data, + [ROW_NUMBER_FIELD]: rowNumber, + }, + } + }) +} + +export function augmentGoogleSheetsCatalog(catalog: ConfiguredCatalog): ConfiguredCatalog { + return { + streams: catalog.streams.map((configuredStream) => { + const props = configuredStream.stream.json_schema?.properties as + | Record + | undefined + + if (!props) return configuredStream + + return { + ...configuredStream, + stream: { + ...configuredStream.stream, + json_schema: { + ...configuredStream.stream.json_schema, + properties: { + ...props, + [ROW_KEY_FIELD]: { type: 'string' }, + [ROW_NUMBER_FIELD]: { type: 'number' }, + }, + }, + }, + } + }), + } +} + +export async function drainMessages(stream: AsyncIterable>): Promise<{ + errors: RunResult['errors'] + state: Record + records: unknown[] +}> { + const errors: RunResult['errors'] = [] + const state: Record = {} + const records: unknown[] = [] + let count = 0 + + for await (const message of stream) { + count++ + const error = collectError(message) + if (error) { + errors.push(error) + } else if (message.type === 'state' && typeof message.stream === 'string') { + state[message.stream] = message.data + } else if (message.type === 'record') { + records.push(message) + } + if (count % 50 === 0) heartbeat({ messages: count }) + } + if (count % 50 !== 0) heartbeat({ messages: count }) + + return { errors, state, records } +} diff --git a/apps/service/src/temporal/activities/discover-catalog.ts b/apps/service/src/temporal/activities/discover-catalog.ts new file mode 100644 index 000000000..13c66f75c --- /dev/null +++ b/apps/service/src/temporal/activities/discover-catalog.ts @@ -0,0 +1,19 @@ +import { applySelection, buildCatalog } from '@stripe/sync-engine' +import type { ConfiguredCatalog, PipelineConfig, Stream } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { pipelineHeader } from './_shared.js' + +export function createDiscoverCatalogActivity(context: ActivitiesContext) { + return async function discoverCatalog(config: PipelineConfig): Promise { + const response = await fetch(`${context.engineUrl}/discover`, { + method: 'POST', + headers: { 'x-pipeline': pipelineHeader(config) }, + }) + if (!response.ok) { + const text = await response.text().catch(() => '') + throw new Error(`Engine /discover failed (${response.status}): ${text}`) + } + const payload = (await response.json()) as { streams: Stream[] } + return applySelection(buildCatalog(payload.streams, config.streams)) + } +} diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts new file mode 100644 index 000000000..aaaff25ca --- /dev/null +++ b/apps/service/src/temporal/activities/index.ts @@ -0,0 +1,28 @@ +import { createActivitiesContext } from './_shared.js' +import { createDiscoverCatalogActivity } from './discover-catalog.js' +import { createReadIntoQueueActivity } from './read-into-queue.js' +import { createReadIntoQueueWithStateActivity } from './read-into-queue-with-state.js' +import { createSetupActivity } from './setup.js' +import { createSyncImmediateActivity } from './sync-immediate.js' +import { createTeardownActivity } from './teardown.js' +import { createWriteFromQueueActivity } from './write-from-queue.js' +import { createWriteGoogleSheetsFromQueueActivity } from './write-google-sheets-from-queue.js' + +export type { RunResult } from './_shared.js' + +export function createActivities(opts: { engineUrl: string; kafkaBroker?: string }) { + const context = createActivitiesContext(opts) + + return { + discoverCatalog: createDiscoverCatalogActivity(context), + setup: createSetupActivity(context), + syncImmediate: createSyncImmediateActivity(context), + readIntoQueueWithState: createReadIntoQueueWithStateActivity(context), + readIntoQueue: createReadIntoQueueActivity(context), + writeGoogleSheetsFromQueue: createWriteGoogleSheetsFromQueueActivity(context), + writeFromQueue: createWriteFromQueueActivity(context), + teardown: createTeardownActivity(context), + } +} + +export type SyncActivities = ReturnType diff --git a/apps/service/src/temporal/activities/read-into-queue-with-state.ts b/apps/service/src/temporal/activities/read-into-queue-with-state.ts new file mode 100644 index 000000000..ecfde00d2 --- /dev/null +++ b/apps/service/src/temporal/activities/read-into-queue-with-state.ts @@ -0,0 +1,64 @@ +import { heartbeat } from '@temporalio/activity' +import { createRemoteEngine } from '@stripe/sync-engine' +import type { + ConfiguredCatalog, + Message, + PipelineConfig, + RecordMessage, + SyncOpts, +} from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, collectError, type RunResult, withRowKey } from './_shared.js' + +export function createReadIntoQueueWithStateActivity(context: ActivitiesContext) { + return async function readIntoQueueWithState( + config: PipelineConfig, + pipelineId: string, + opts?: SyncOpts & { + input?: unknown[] + catalog?: ConfiguredCatalog + } + ): Promise<{ count: number; state: Record }> { + if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') + + const engine = createRemoteEngine(context.engineUrl) + const { input: inputArr, catalog, ...syncOpts } = opts ?? {} + const input = inputArr?.length ? asIterable(inputArr) : undefined + + const queued: Message[] = [] + const state: Record = {} + const errors: RunResult['errors'] = [] + let seen = 0 + + for await (const raw of engine.pipeline_read(config, syncOpts, input) as AsyncIterable< + Record + >) { + seen++ + const error = collectError(raw) + if (error) { + errors.push(error) + } else if (raw.type === 'record') { + queued.push(withRowKey(raw as RecordMessage, catalog)) + } else if (raw.type === 'state' && typeof raw.stream === 'string') { + state[raw.stream] = raw.data + queued.push(raw as Message) + } + if (seen % 50 === 0) heartbeat({ messages: seen }) + } + if (seen % 50 !== 0) heartbeat({ messages: seen }) + + if (errors.length > 0) { + throw new Error(errors.map((error) => error.message).join('; ')) + } + + if (queued.length > 0) { + const producer = await context.getProducer() + await producer.send({ + topic: `pipeline.${pipelineId}`, + messages: queued.map((message) => ({ value: JSON.stringify(message) })), + }) + } + + return { count: queued.length, state } + } +} diff --git a/apps/service/src/temporal/activities/read-into-queue.ts b/apps/service/src/temporal/activities/read-into-queue.ts new file mode 100644 index 000000000..20dd320ce --- /dev/null +++ b/apps/service/src/temporal/activities/read-into-queue.ts @@ -0,0 +1,29 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig, SyncOpts } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages } from './_shared.js' + +export function createReadIntoQueueActivity(context: ActivitiesContext) { + return async function readIntoQueue( + config: PipelineConfig, + pipelineId: string, + opts?: SyncOpts & { input?: unknown[] } + ): Promise<{ count: number; state: Record }> { + const engine = createRemoteEngine(context.engineUrl) + const { input: inputArr, ...syncOpts } = opts ?? {} + const input = inputArr?.length ? asIterable(inputArr) : undefined + const { records, state } = await drainMessages( + engine.pipeline_read(config, syncOpts, input) as AsyncIterable> + ) + + if (context.kafkaBroker && records.length > 0) { + const producer = await context.getProducer() + await producer.send({ + topic: `pipeline.${pipelineId}`, + messages: records.map((record) => ({ value: JSON.stringify(record) })), + }) + } + + return { count: records.length, state } + } +} diff --git a/apps/service/src/temporal/activities/setup.ts b/apps/service/src/temporal/activities/setup.ts new file mode 100644 index 000000000..03871e135 --- /dev/null +++ b/apps/service/src/temporal/activities/setup.ts @@ -0,0 +1,10 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig, SetupResult } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' + +export function createSetupActivity(context: ActivitiesContext) { + return async function setup(config: PipelineConfig): Promise { + const engine = createRemoteEngine(context.engineUrl) + return await engine.pipeline_setup(config) + } +} diff --git a/apps/service/src/temporal/activities/sync-immediate.ts b/apps/service/src/temporal/activities/sync-immediate.ts new file mode 100644 index 000000000..df2aff680 --- /dev/null +++ b/apps/service/src/temporal/activities/sync-immediate.ts @@ -0,0 +1,19 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig, SyncOpts } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages, type RunResult } from './_shared.js' + +export function createSyncImmediateActivity(context: ActivitiesContext) { + return async function syncImmediate( + config: PipelineConfig, + opts?: SyncOpts & { input?: unknown[] } + ): Promise { + const engine = createRemoteEngine(context.engineUrl) + const { input: inputArr, ...syncOpts } = opts ?? {} + const input = inputArr?.length ? asIterable(inputArr) : undefined + const { errors, state } = await drainMessages( + engine.pipeline_sync(config, syncOpts, input) as AsyncIterable> + ) + return { errors, state } + } +} diff --git a/apps/service/src/temporal/activities/teardown.ts b/apps/service/src/temporal/activities/teardown.ts new file mode 100644 index 000000000..dab9c90d8 --- /dev/null +++ b/apps/service/src/temporal/activities/teardown.ts @@ -0,0 +1,10 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' + +export function createTeardownActivity(context: ActivitiesContext) { + return async function teardown(config: PipelineConfig): Promise { + const engine = createRemoteEngine(context.engineUrl) + await engine.pipeline_teardown(config) + } +} diff --git a/apps/service/src/temporal/activities/write-from-queue.ts b/apps/service/src/temporal/activities/write-from-queue.ts new file mode 100644 index 000000000..d302406e6 --- /dev/null +++ b/apps/service/src/temporal/activities/write-from-queue.ts @@ -0,0 +1,34 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { Message, PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages, type RunResult } from './_shared.js' + +export function createWriteFromQueueActivity(context: ActivitiesContext) { + return async function writeFromQueue( + config: PipelineConfig, + pipelineId: string, + opts?: { records?: unknown[]; maxBatch?: number } + ): Promise { + let records: unknown[] + + if (context.kafkaBroker) { + const maxBatch = opts?.maxBatch ?? 50 + records = await context.consumeQueueBatch(pipelineId, maxBatch) + } else { + records = opts?.records ?? [] + } + + if (records.length === 0) { + return { errors: [], state: {}, written: 0 } + } + + const engine = createRemoteEngine(context.engineUrl) + const { errors, state } = await drainMessages( + engine.pipeline_write(config, asIterable(records) as AsyncIterable) as AsyncIterable< + Record + > + ) + + return { errors, state, written: records.length } + } +} diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts new file mode 100644 index 000000000..a74ce99db --- /dev/null +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -0,0 +1,85 @@ +import { enforceCatalog } from '@stripe/sync-engine' +import type { ConfiguredCatalog, DestinationInput, PipelineConfig } from '@stripe/sync-engine' +import { + configSchema as googleSheetsConfigSchema, + createDestination as createGoogleSheetsDestination, + parseGoogleSheetsMetaLog, +} from '@stripe/sync-destination-google-sheets' +import type { ActivitiesContext } from './_shared.js' +import { + addRowNumbers, + asIterable, + augmentGoogleSheetsCatalog, + collectError, + compactGoogleSheetsMessages, + type RunResult, +} from './_shared.js' + +export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesContext) { + return async function writeGoogleSheetsFromQueue( + config: PipelineConfig, + pipelineId: string, + opts?: { + maxBatch?: number + rowIndex?: Record> + catalog?: ConfiguredCatalog + } + ): Promise< + RunResult & { + written: number + rowAssignments: Record> + } + > { + if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') + + const maxBatch = opts?.maxBatch ?? 50 + const queued = await context.consumeQueueBatch(pipelineId, maxBatch) + + if (queued.length === 0) { + return { errors: [], state: {}, written: 0, rowAssignments: {} } + } + + const writeBatch = addRowNumbers(compactGoogleSheetsMessages(queued), opts?.rowIndex ?? {}) + if (config.destination.type !== 'google-sheets') { + throw new Error('writeGoogleSheetsFromQueue requires a google-sheets destination') + } + if (!opts?.catalog) { + throw new Error('catalog is required for Google Sheets workflow writes') + } + + const destinationConfig = googleSheetsConfigSchema.parse(config.destination) + const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) + const destination = createGoogleSheetsDestination() + const errors: RunResult['errors'] = [] + const state: Record = {} + const rowAssignments: Record> = {} + const input = enforceCatalog(filteredCatalog)( + asIterable(writeBatch) + ) as AsyncIterable + + for await (const raw of destination.write( + { + config: destinationConfig, + catalog: filteredCatalog, + }, + input + )) { + const error = collectError(raw) + if (error) { + errors.push(error) + } else if (raw.type === 'state' && typeof raw.stream === 'string') { + state[raw.stream] = raw.data + } else if (raw.type === 'log' && typeof raw.message === 'string') { + const meta = parseGoogleSheetsMetaLog(raw.message) + if (meta?.type === 'row_assignments') { + for (const [stream, assignments] of Object.entries(meta.assignments)) { + rowAssignments[stream] ??= {} + Object.assign(rowAssignments[stream], assignments) + } + } + } + } + + return { errors, state, written: queued.length, rowAssignments } + } +} diff --git a/apps/service/src/temporal/worker.ts b/apps/service/src/temporal/worker.ts index 3515501fd..97a497be4 100644 --- a/apps/service/src/temporal/worker.ts +++ b/apps/service/src/temporal/worker.ts @@ -1,5 +1,5 @@ import { NativeConnection, Worker } from '@temporalio/worker' -import { createActivities } from './activities.js' +import { createActivities } from './activities/index.js' export interface WorkerOptions { temporalAddress: string @@ -7,7 +7,7 @@ export interface WorkerOptions { taskQueue: string engineUrl: string kafkaBroker?: string - /** Path to compiled workflows.js (Temporal bundles it for V8 sandbox). */ + /** Path to a compiled workflow module or directory (Temporal bundles it for V8 sandbox). */ workflowsPath: string } diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts new file mode 100644 index 000000000..9c1c0b6eb --- /dev/null +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -0,0 +1,55 @@ +import { defineQuery, defineSignal, proxyActivities } from '@temporalio/workflow' +import type { PipelineConfig } from '@stripe/sync-protocol' + +import type { SyncActivities } from '../activities/index.js' +import { retryPolicy } from '../../lib/utils.js' + +export interface WorkflowStatus { + phase: string + paused: boolean + iteration: number +} + +export type Pipeline = PipelineConfig & { + // Keep `id` on the workflow-local shape for now so configQuery still returns + // the full pipeline resource expected by the API and queue-backed activities + // can continue using it as the pipeline key. A cleaner split would derive + // this from Temporal workflow metadata, but that is a broader refactor. + id: string +} + +export type RowIndex = Record> + +export function toConfig(pipeline: Pipeline): PipelineConfig { + return { + source: pipeline.source, + destination: pipeline.destination, + streams: pipeline.streams, + } +} + +export const stripeEventSignal = defineSignal<[unknown]>('stripe_event') +export const updateSignal = defineSignal<[Partial]>('update') +export const deleteSignal = defineSignal('delete') + +export const statusQuery = defineQuery('status') +export const configQuery = defineQuery('config') +export const stateQuery = defineQuery>('state') + +export const { setup, teardown } = proxyActivities({ + startToCloseTimeout: '2m', + retry: retryPolicy, +}) + +export const { syncImmediate, readIntoQueue, writeFromQueue } = proxyActivities({ + startToCloseTimeout: '10m', + heartbeatTimeout: '2m', + retry: retryPolicy, +}) + +export const { discoverCatalog, readIntoQueueWithState, writeGoogleSheetsFromQueue } = + proxyActivities({ + startToCloseTimeout: '10m', + heartbeatTimeout: '2m', + retry: retryPolicy, + }) diff --git a/apps/service/src/temporal/workflows/index.ts b/apps/service/src/temporal/workflows/index.ts new file mode 100644 index 000000000..8cdb2cbab --- /dev/null +++ b/apps/service/src/temporal/workflows/index.ts @@ -0,0 +1,2 @@ +export { pipelineWorkflow } from './pipeline.js' +export { pipelineGoogleSheetsWorkflow } from './pipeline-google-sheets.js' diff --git a/apps/service/src/temporal/workflows/pipeline-google-sheets.ts b/apps/service/src/temporal/workflows/pipeline-google-sheets.ts new file mode 100644 index 000000000..6fdfea9e5 --- /dev/null +++ b/apps/service/src/temporal/workflows/pipeline-google-sheets.ts @@ -0,0 +1,191 @@ +import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' +import type { ConfiguredCatalog } from '@stripe/sync-engine' + +import { + configQuery, + deleteSignal, + discoverCatalog, + Pipeline, + readIntoQueueWithState, + RowIndex, + setup, + stateQuery, + statusQuery, + stripeEventSignal, + teardown, + toConfig, + updateSignal, + WorkflowStatus, + writeGoogleSheetsFromQueue, +} from './_shared.js' +import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../lib/utils.js' + +export interface PipelineGoogleSheetsWorkflowOpts { + phase?: string + sourceState?: Record + readState?: Record + rowIndex?: RowIndex + catalog?: ConfiguredCatalog + pendingWrites?: boolean + inputQueue?: unknown[] + readComplete?: boolean + writeRps?: number +} + +export async function pipelineGoogleSheetsWorkflow( + pipeline: Pipeline, + opts?: PipelineGoogleSheetsWorkflowOpts +): Promise { + let paused = false + let deleted = false + const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] + let iteration = 0 + let sourceState: Record = opts?.sourceState ?? {} + let readState: Record = opts?.readState ?? { ...sourceState } + let rowIndex: RowIndex = opts?.rowIndex ?? {} + let catalog: ConfiguredCatalog | undefined = opts?.catalog + let readComplete = opts?.readComplete ?? false + let pendingWrites = opts?.pendingWrites ?? false + + setHandler(stripeEventSignal, (event: unknown) => { + inputQueue.push(event) + }) + setHandler(updateSignal, (patch: Partial) => { + if (patch.source) { + pipeline = { ...pipeline, source: patch.source } + catalog = undefined + readComplete = false + readState = { ...sourceState } + } + if (patch.destination) pipeline = { ...pipeline, destination: patch.destination } + if (patch.streams !== undefined) { + pipeline = { ...pipeline, streams: patch.streams } + catalog = undefined + readComplete = false + readState = { ...sourceState } + } + if ('paused' in (patch as Record)) { + paused = !!(patch as Record).paused + } + }) + setHandler(deleteSignal, () => { + deleted = true + }) + + const phase = opts?.phase ?? 'setup' + setHandler( + statusQuery, + (): WorkflowStatus => ({ + phase: phase === 'setup' && iteration > 0 ? 'running' : phase, + paused, + iteration, + }) + ) + setHandler(configQuery, (): Pipeline => pipeline) + setHandler(stateQuery, (): Record => sourceState) + + async function waitWhilePaused() { + await condition(() => !paused || deleted) + } + + async function tickIteration() { + iteration++ + if (iteration >= CONTINUE_AS_NEW_THRESHOLD) { + await continueAsNew(pipeline, { + phase: 'running', + sourceState, + readState, + rowIndex, + catalog, + pendingWrites, + inputQueue: inputQueue.length > 0 ? [...inputQueue] : undefined, + readComplete, + writeRps: opts?.writeRps, + }) + } + } + + if (phase !== 'running') { + const setupResult = await setup(toConfig(pipeline)) + if (setupResult.source) { + pipeline = { ...pipeline, source: { ...pipeline.source, ...setupResult.source } } + } + if (setupResult.destination) { + pipeline = { + ...pipeline, + destination: { ...pipeline.destination, ...setupResult.destination }, + } + } + catalog = await discoverCatalog(toConfig(pipeline)) + if (deleted) { + await teardown(toConfig(pipeline)) + return + } + } + + async function readLoop(): Promise { + while (!deleted) { + await waitWhilePaused() + if (deleted) break + + const config = toConfig(pipeline) + if (!catalog) catalog = await discoverCatalog(config) + + if (inputQueue.length > 0) { + const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) + const { count } = await readIntoQueueWithState(config, pipeline.id, { + input: batch, + catalog, + }) + if (count > 0) pendingWrites = true + await tickIteration() + continue + } + + if (!readComplete) { + const before = readState + const { count, state: nextReadState } = await readIntoQueueWithState(config, pipeline.id, { + state: readState, + stateLimit: 1, + catalog, + }) + if (count > 0) pendingWrites = true + readState = { ...readState, ...nextReadState } + readComplete = deepEqual(readState, before) + await tickIteration() + continue + } + + await condition(() => inputQueue.length > 0 || deleted) + } + } + + async function writeLoop(): Promise { + while (!deleted) { + await waitWhilePaused() + if (deleted) break + + if (pendingWrites) { + if (!catalog) catalog = await discoverCatalog(toConfig(pipeline)) + const result = await writeGoogleSheetsFromQueue(toConfig(pipeline), pipeline.id, { + maxBatch: 50, + rowIndex, + catalog, + }) + pendingWrites = result.written > 0 + sourceState = { ...sourceState, ...result.state } + for (const [stream, assignments] of Object.entries(result.rowAssignments)) { + rowIndex[stream] ??= {} + Object.assign(rowIndex[stream], assignments) + } + if (opts?.writeRps) await sleep(Math.ceil(1000 / opts.writeRps)) + await tickIteration() + } else { + await condition(() => pendingWrites || deleted) + } + } + } + + await Promise.all([readLoop(), writeLoop()]) + await teardown(toConfig(pipeline)) +} diff --git a/apps/service/src/temporal/workflows.ts b/apps/service/src/temporal/workflows/pipeline.ts similarity index 64% rename from apps/service/src/temporal/workflows.ts rename to apps/service/src/temporal/workflows/pipeline.ts index b0bdac293..df54bbeba 100644 --- a/apps/service/src/temporal/workflows.ts +++ b/apps/service/src/temporal/workflows/pipeline.ts @@ -1,90 +1,35 @@ -import { - proxyActivities, - defineSignal, - defineQuery, - setHandler, - condition, - continueAsNew, - sleep, -} from '@temporalio/workflow' - -import type { SyncActivities } from './activities.js' +import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' -export interface WorkflowStatus { - phase: string - paused: boolean - iteration: number -} import { - deepEqual, - CONTINUE_AS_NEW_THRESHOLD, - EVENT_BATCH_SIZE, - retryPolicy, -} from '../lib/utils.js' - -// Setup/teardown: 2m with retry -const { setup, teardown } = proxyActivities({ - startToCloseTimeout: '2m', - retry: retryPolicy, -}) - -// Data activities: 10m with retry and heartbeat -const { syncImmediate, readIntoQueue, writeFromQueue } = proxyActivities({ - startToCloseTimeout: '10m', - heartbeatTimeout: '2m', - retry: retryPolicy, -}) - -// Pipeline type (matches lib/schemas.ts — keep in sync) -type SyncMode = 'incremental' | 'full_refresh' - -interface StreamDef { - name: string - sync_mode?: SyncMode - fields?: string[] -} - -interface Pipeline { - id: string - source: { type: string; [key: string]: unknown } - destination: { type: string; [key: string]: unknown } - streams?: StreamDef[] -} - -type PipelineConfig = { - source: { type: string; [key: string]: unknown } - destination: { type: string; [key: string]: unknown } - streams?: StreamDef[] + configQuery, + deleteSignal, + Pipeline, + readIntoQueue, + setup, + stateQuery, + statusQuery, + stripeEventSignal, + syncImmediate, + teardown, + toConfig, + updateSignal, + WorkflowStatus, + writeFromQueue, +} from './_shared.js' +import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../lib/utils.js' + +export interface PipelineWorkflowOpts { + phase?: string + state?: Record + mode?: 'sync' | 'read-write' + writeRps?: number + pendingWrites?: boolean + inputQueue?: unknown[] } -function toConfig(pipeline: Pipeline): PipelineConfig { - return { - source: pipeline.source, - destination: pipeline.destination, - streams: pipeline.streams, - } -} - -// Signals -export const stripeEventSignal = defineSignal<[unknown]>('stripe_event') -export const updateSignal = defineSignal<[Partial]>('update') -export const deleteSignal = defineSignal('delete') - -// Queries -export const statusQuery = defineQuery('status') -export const configQuery = defineQuery('config') -export const stateQuery = defineQuery>('state') - export async function pipelineWorkflow( pipeline: Pipeline, - opts?: { - phase?: string - state?: Record - mode?: 'sync' | 'read-write' - writeRps?: number - pendingWrites?: boolean - inputQueue?: unknown[] - } + opts?: PipelineWorkflowOpts ): Promise { let paused = false let deleted = false @@ -101,7 +46,6 @@ export async function pipelineWorkflow( // written:0 when the queue is actually empty, written:>0 when it's not). let pendingWrites = opts?.pendingWrites ?? false - // Register signal handlers (must be before any await) setHandler(stripeEventSignal, (event: unknown) => { inputQueue.push(event) }) @@ -117,7 +61,6 @@ export async function pipelineWorkflow( deleted = true }) - // Register query handlers const phase = opts?.phase ?? 'setup' setHandler( statusQuery, @@ -130,8 +73,6 @@ export async function pipelineWorkflow( setHandler(configQuery, (): Pipeline => pipeline) setHandler(stateQuery, (): Record => syncState) - // --- Helpers --- - async function waitWhilePaused() { await condition(() => !paused || deleted) } @@ -150,13 +91,10 @@ export async function pipelineWorkflow( } } - // --- Setup (first sync only) --- - const config = toConfig(pipeline) if (phase !== 'running') { const setupResult = await setup(config) - // Merge setup-provisioned fields (webhook_secret, account_id, spreadsheet_id, etc.) if (setupResult.source) { pipeline = { ...pipeline, source: { ...pipeline.source, ...setupResult.source } } } @@ -172,12 +110,7 @@ export async function pipelineWorkflow( } } - // --- Main loop --- - if (opts?.mode === 'read-write') { - // Concurrent read/write via Kafka queue — each loop runs at its own pace - // writeState: persisted pipeline state, only advanced after successful writes (source of truth) - // readState: pagination cursor for source reads, starts from writeState let writeState: Record = { ...syncState } let readState: Record = { ...writeState } @@ -188,7 +121,6 @@ export async function pipelineWorkflow( const config = toConfig(pipeline) - // Resolve events through read → Kafka if (inputQueue.length > 0) { const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) const { count } = await readIntoQueue(config, pipeline.id, { input: batch }) @@ -197,7 +129,6 @@ export async function pipelineWorkflow( continue } - // Backfill one page → Kafka if (!readComplete) { const before = readState const { count, state: nextReadState } = await readIntoQueue(config, pipeline.id, { @@ -211,7 +142,6 @@ export async function pipelineWorkflow( continue } - // All caught up — wait for new events or delete await condition(() => inputQueue.length > 0 || deleted) } } @@ -230,7 +160,6 @@ export async function pipelineWorkflow( if (opts?.writeRps) await sleep(Math.ceil(1000 / opts.writeRps)) await tickIteration() } else { - // Wait until the read loop signals there's work, or we're deleted await condition(() => pendingWrites || deleted) } } @@ -238,15 +167,12 @@ export async function pipelineWorkflow( await Promise.all([readLoop(), writeLoop()]) } else { - // sync mode: combined read+write in a single activity call - while (true) { await waitWhilePaused() if (deleted) break const config = toConfig(pipeline) - // 1. Drain buffered events if (inputQueue.length > 0) { const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) await syncImmediate(config, { input: batch }) @@ -254,7 +180,6 @@ export async function pipelineWorkflow( continue } - // 2. Reconciliation page if (!readComplete) { const before = syncState const result = await syncImmediate(config, { state: syncState, stateLimit: 1 }) @@ -264,11 +189,9 @@ export async function pipelineWorkflow( continue } - // 3. Wait await condition(() => inputQueue.length > 0 || deleted) } } - // Teardown on delete await teardown(toConfig(pipeline)) } diff --git a/docs/plans/2026-04-02-google-sheets-row-index-workflow.md b/docs/plans/2026-04-02-google-sheets-row-index-workflow.md new file mode 100644 index 000000000..90f30a90c --- /dev/null +++ b/docs/plans/2026-04-02-google-sheets-row-index-workflow.md @@ -0,0 +1,190 @@ +# Google Sheets Row-Index Workflow + +## Context + +The Google Sheets destination needs upsert behavior for repeated Stripe objects. +When the same object is seen again in a later sync, we want to overwrite the +existing row instead of appending a duplicate. + +Google Sheets does not provide native upsert semantics, and the destination +connector is intentionally stateless. It can read and write sheets, but it has +nowhere to persist "record X was previously written to row Y". + +That row mapping has to live somewhere durable outside the destination. + +## Constraints + +- The Google Sheets destination must remain stateless. +- We do not want to store Google-Sheets-specific metadata in + `packages/protocol`. +- We want to keep the Kafka-backed read/write split used by the service. +- The generic pipeline workflow should stay simple for destinations that do not + need row-index bookkeeping. +- Any solution has to survive workflow replay, retries, and continue-as-new. + +## Why the generic workflow was not enough + +The existing `pipelineWorkflow` keeps a single source checkpoint and assumes the +destination can consume records without extra destination-specific durable +state. + +That assumption breaks for Sheets: + +- source progress and row-index progress are not the same thing +- writes may lag reads, so source checkpoints cannot be advanced optimistically +- row assignments must survive workflow restarts and continue-as-new +- this logic only applies to one destination type + +Trying to fold all of that into the generic workflow would add Google +Sheets-specific state and branching to the default path used by every other +destination. + +## Decision + +Use a dedicated Temporal workflow for `google-sheets` pipelines. + +This workflow owns the Sheets-specific durable state: + +- `sourceState`: committed source checkpoint, only advanced after successful + writes +- `readState`: optimistic read cursor used while backfilling or processing + events +- `rowIndex`: `stream -> serialized primary key -> sheet row number` +- `catalog`: discovered stream metadata used to derive row keys + +The generic workflow remains unchanged for non-Sheets destinations. + +## Why workflow state is the right place + +Workflow state is the only place in the current architecture that satisfies all +requirements at once: + +- durable across retries and worker restarts +- local to the specific pipeline +- not exposed in the wire protocol +- safe to carry through `continueAsNew` +- able to coordinate source progress with destination progress + +This keeps connector isolation intact. The destination still only consumes +records and emits output messages. It does not learn about Temporal, Kafka, or +state storage. + +## Kafka stays in the design + +We considered bypassing the queue for Sheets, but kept Kafka for consistency +with the service's existing read/write split. + +The dedicated Sheets workflow still uses: + +1. `readIntoQueueWithState` to read from the source and enqueue ordered + `record` and `state` messages. +2. `writeGoogleSheetsFromQueue` to consume from Kafka, compact duplicate record + updates by key, inject known row numbers, and write to the destination. + +This preserves the operational model already used by the service while letting +Sheets add destination-specific bookkeeping on top. + +## Data flow + +### Read side + +- discover the configured catalog +- derive a stable `_row_key` from the stream primary key +- enqueue `record` and `state` messages to Kafka in source order +- update `readState` optimistically as source state messages arrive + +### Write side + +- consume a batch from Kafka +- compact duplicate records by `(stream, _row_key)` within the batch +- if the workflow already knows the row, inject `_row_number` +- send the records to the Google Sheets destination +- parse destination-emitted row assignments for newly appended rows +- merge those row assignments into `rowIndex` +- advance `sourceState` only after the write succeeds + +## Why `_row_key` and `_row_number` stay local to the Sheets workflow + +These fields are still needed on the write side: + +- `_row_key`: stable identifier derived from the stream primary key +- `_row_number`: known row number for updates + +But the generic engine write path should not know about them. To keep that +boundary intact, the dedicated Sheets write activity calls the Sheets +destination directly instead of routing through the generic engine `/write` +pipeline. + +Inside that activity we: + +- take the workflow-owned discovered catalog +- add the two Sheets-only metadata fields to a local copy of the catalog +- run catalog enforcement there +- pass the filtered records to the Sheets destination + +That keeps `_row_key` and `_row_number` as internal workflow transport +metadata, not engine-wide protocol behavior. The destination still strips them +before writing visible sheet cells. + +## Why the destination reports row assignments + +For new rows, the workflow does not know the final row number ahead of time. +The destination is authoritative because the sheet itself decides where appended +rows land. + +After appending, the destination emits structured metadata describing the row +assignments it observed. The workflow merges that into `rowIndex` and uses it on +future writes. + +This keeps the destination stateless while still making it the source of truth +for the exact append result. + +## API guardrails + +A Sheets pipeline's `rowIndex` is tied to a specific spreadsheet. + +Changing a live pipeline to point at a different spreadsheet would silently +reuse stale row mappings and corrupt writes. Because of that, changing the +target spreadsheet now requires recreating the pipeline. + +This is intentionally strict. + +## Alternatives considered + +### Store row numbers in `packages/protocol` + +Rejected because it would leak Google-Sheets-specific behavior into the shared +wire format and make connector isolation worse. + +### Let the destination persist its own mapping + +Rejected because the destination is intentionally stateless and has no access to +durable storage. + +### Reuse the generic workflow with destination-type conditionals + +Rejected because it pushes one destination's durability requirements into the +default path used by all destinations. + +### Make row number the primary key directly + +Rejected because row numbers are not stable source identifiers. They are derived +write locations that only become known after the destination has written data. +The stable key must come from the source primary key, not from the sheet. + +## Operational risks + +- manual row deletion or row reordering in the sheet can invalidate `rowIndex` +- the dedicated workflow adds another workflow type to service operations +- Kafka consumption still needs end-to-end coverage beyond unit and package + tests + +These are acceptable for now because the alternative was to embed Sheets +complexity into the generic pipeline path. + +## Outcome + +The implementation in PR #228 adds a dedicated Google Sheets workflow that +preserves connector isolation, keeps Kafka in the service design, stores the +minimum extra durable state needed to make row-based upserts work, and keeps +Sheets-only metadata out of the generic engine write path. diff --git a/packages/dashboard/.gitignore b/packages/dashboard/.gitignore new file mode 100644 index 000000000..536d88c8a --- /dev/null +++ b/packages/dashboard/.gitignore @@ -0,0 +1 @@ +.next/ diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index 799c103bf..adc878eb7 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -45,6 +45,22 @@ export function createMemorySheets() { return bang >= 0 ? range.slice(0, bang) : range } + function parseStartRow(range: string): number { + const match = range.match(/(\d+)/) + return match ? Number(match[1]) : 1 + } + + function columnLabel(index: number): string { + let value = index + let label = '' + while (value > 0) { + const remainder = (value - 1) % 26 + label = String.fromCharCode(65 + remainder) + label + value = Math.floor((value - 1) / 26) + } + return label || 'A' + } + function getTab(spreadsheetId: string, range: string): SheetTab { const ss = getSpreadsheet(spreadsheetId) const name = parseSheetName(range) @@ -76,6 +92,8 @@ export function createMemorySheets() { const ss = getSpreadsheet(params.spreadsheetId) const requests = (params.requestBody?.requests ?? []) as Record[] + const replies: unknown[] = [] + for (const req of requests) { if (req.addSheet) { const props = (req.addSheet as { properties?: { title?: string } }).properties @@ -83,10 +101,10 @@ export function createMemorySheets() { if (ss.sheets.has(name)) { throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 }) } - ss.sheets.set(name, { sheetId: nextSheetId++, values: [] }) - } - - if (req.updateSheetProperties) { + const sheetId = nextSheetId++ + ss.sheets.set(name, { sheetId, values: [] }) + replies.push({ addSheet: { properties: { sheetId, title: name } } }) + } else if (req.updateSheetProperties) { const update = req.updateSheetProperties as { properties: { sheetId: number; title: string } fields: string @@ -99,10 +117,13 @@ export function createMemorySheets() { break } } + replies.push({}) + } else { + replies.push({}) } } - return { data: {} } + return { data: { replies } } }, values: { @@ -114,9 +135,9 @@ export function createMemorySheets() { }) { const tab = getTab(params.spreadsheetId, params.range) const rows = params.requestBody?.values ?? [] - // values.update at A1 replaces from the top + const startRow = parseStartRow(params.range) for (let i = 0; i < rows.length; i++) { - tab.values[i] = rows[i] + tab.values[startRow - 1 + i] = rows[i] } return { data: {} } }, @@ -130,8 +151,16 @@ export function createMemorySheets() { }) { const tab = getTab(params.spreadsheetId, params.range) const rows = params.requestBody?.values ?? [] + const startRow = tab.values.length + 1 tab.values.push(...rows) - return { data: {} } + const endRow = tab.values.length + return { + data: { + updates: { + updatedRange: `'${parseSheetName(params.range)}'!A${startRow}:${columnLabel(rows[0]?.length ?? 1)}${endRow}`, + }, + }, + } }, async get(params: { spreadsheetId: string; range: string }) { diff --git a/packages/destination-google-sheets/package.json b/packages/destination-google-sheets/package.json index 69be2e6db..d81adbda4 100644 --- a/packages/destination-google-sheets/package.json +++ b/packages/destination-google-sheets/package.json @@ -27,6 +27,7 @@ "zod": "^4.3.6" }, "devDependencies": { + "@types/node": "^25.5.0", "vitest": "^3.2.4" } } diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index cff66853a..057cea60d 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -1,6 +1,13 @@ -import type { DestinationInput, DestinationOutput } from '@stripe/sync-protocol' +import type { ConfiguredCatalog, DestinationInput, DestinationOutput } from '@stripe/sync-protocol' import { afterEach, beforeEach, describe, expect, it } from 'vitest' -import { createDestination, envVars, type Config } from './index.js' +import { + createDestination, + envVars, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + type Config, +} from './index.js' import { readSheet } from './writer.js' import { createMemorySheets } from '../__tests__/memory-sheets.js' @@ -247,6 +254,110 @@ describe('destination-google-sheets', () => { expect(logs).toHaveLength(1) expect(logs[0]).toMatchObject({ type: 'log', level: 'info' }) }) + + it('updates existing rows and emits row assignments for new appends', async () => { + const { sheets, getData } = createMemorySheets() + const dest = createDestination(sheets) + const configuredCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id']], + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + name: { type: 'string' }, + }, + }, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'append', + }, + ], + } + + await collect( + dest.write( + { config: cfg(), catalog: configuredCatalog }, + toAsyncIter([ + record('customers', { + id: 'cus_1', + name: 'Alice', + [ROW_KEY_FIELD]: '["cus_1"]', + }), + ]) + ) + ) + + const output = await collect( + dest.write( + { + config: cfg({ spreadsheet_id: dest.spreadsheetId! }), + catalog: configuredCatalog, + }, + toAsyncIter([ + record('customers', { + id: 'cus_1', + name: 'Alice Updated', + [ROW_KEY_FIELD]: '["cus_1"]', + [ROW_NUMBER_FIELD]: 2, + }), + record('customers', { + id: 'cus_2', + name: 'Bob', + [ROW_KEY_FIELD]: '["cus_2"]', + }), + ]) + ) + ) + + const rows = getData(dest.spreadsheetId!, 'customers')! + expect(rows).toEqual([ + ['id', 'name'], + ['cus_1', 'Alice Updated'], + ['cus_2', 'Bob'], + ]) + + const metaLog = output.find((message) => message.type === 'log' && message.level === 'debug') + expect(metaLog).toBeDefined() + const meta = parseGoogleSheetsMetaLog((metaLog as { message: string }).message) + expect(meta).toEqual({ + type: 'row_assignments', + assignments: { customers: { '["cus_2"]': 3 } }, + }) + }) + + it('extends existing headers when a later write introduces new fields', async () => { + const { sheets, getData } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([record('customers', { id: 'cus_1', name: 'Alice' })]) + ) + ) + + await collect( + dest.write( + { config: cfg({ spreadsheet_id: dest.spreadsheetId! }), catalog }, + toAsyncIter([ + record('customers', { + id: 'cus_2', + name: 'Bob', + email: 'bob@test.invalid', + }), + ]) + ) + ) + + const rows = getData(dest.spreadsheetId!, 'customers')! + expect(rows[0]).toEqual(['id', 'name', 'email']) + expect(rows[1]).toEqual(['cus_1', 'Alice']) + expect(rows[2]).toEqual(['cus_2', 'Bob', 'bob@test.invalid']) + }) }) describe('envVars', () => { diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index 25e49db9e..7e26f9fab 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -11,11 +11,48 @@ import type { import type { sheets_v4 } from 'googleapis' import { google } from 'googleapis' import { z } from 'zod' +import { + GOOGLE_SHEETS_META_LOG_PREFIX, + formatGoogleSheetsMetaLog, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, + stripSystemFields, +} from './metadata.js' import { configSchema } from './spec.js' import type { Config } from './spec.js' -import { appendRows, ensureSheet, ensureSpreadsheet } from './writer.js' +import { + appendRows, + createIntroSheet, + deleteSpreadsheet, + ensureSheet, + ensureSpreadsheet, + protectSheets, + readHeaderRow, + updateRows, +} from './writer.js' -export { ensureSpreadsheet, ensureSheet, appendRows, readSheet } from './writer.js' +export { + ensureSpreadsheet, + ensureSheet, + appendRows, + updateRows, + readHeaderRow, + readSheet, + createIntroSheet, + protectSheets, + deleteSpreadsheet, +} from './writer.js' +export { + GOOGLE_SHEETS_META_LOG_PREFIX, + formatGoogleSheetsMetaLog, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, + stripSystemFields, +} from './metadata.js' // MARK: - Spec @@ -23,7 +60,7 @@ export { configSchema, envVars, type Config } from './spec.js' // MARK: - Helpers -function makeSheetsClient(config: Config) { +function makeOAuth2Client(config: Config) { const clientId = config.client_id || process.env['GOOGLE_CLIENT_ID'] const clientSecret = config.client_secret || process.env['GOOGLE_CLIENT_SECRET'] if (!clientId) throw new Error('client_id required (provide in config or set GOOGLE_CLIENT_ID)') @@ -34,7 +71,15 @@ function makeSheetsClient(config: Config) { access_token: config.access_token, refresh_token: config.refresh_token, }) - return google.sheets({ version: 'v4', auth }) + return auth +} + +function makeSheetsClient(config: Config) { + return google.sheets({ version: 'v4', auth: makeOAuth2Client(config) }) +} + +function makeDriveClient(config: Config) { + return google.drive({ version: 'v3', auth: makeOAuth2Client(config) }) } /** Stringify a value for a Sheets cell. */ @@ -52,6 +97,21 @@ function isTransient(err: unknown): boolean { return code === 429 || code >= 500 } +function extendHeaders( + existingHeaders: string[], + data: Record +): { headers: string[]; changed: boolean } { + const headers = [...existingHeaders] + let changed = false + for (const key of Object.keys(data)) { + if (!headers.includes(key)) { + headers.push(key) + changed = true + } + } + return { headers, changed } +} + // MARK: - Destination /** @@ -75,23 +135,52 @@ export function createDestination( return { config: z.toJSONSchema(configSchema) } }, - async check({ config }: { config: Config }): Promise { + async setup({ config, catalog }: { config: Config; catalog: ConfiguredCatalog }) { + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + return + } const sheets = sheetsClient ?? makeSheetsClient(config) - try { - await sheets.spreadsheets.get({ - spreadsheetId: config.spreadsheet_id ?? 'test', - }) - return { status: 'succeeded' } - } catch { - return { status: 'succeeded', message: 'Sheets client is configured' } + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + + // Create the Overview intro tab first (handles "Sheet1" rename if needed) + const streamNames = catalog.streams.map((s) => s.stream.name) + await createIntroSheet(sheets, spreadsheetId, streamNames) + + // Create a data tab for each stream with headers derived from its JSON schema + const sheetIds: number[] = [] + for (const { stream } of catalog.streams) { + const properties = stream.json_schema?.['properties'] as Record | undefined + const headers = properties ? Object.keys(properties) : [] + const sheetId = await ensureSheet(sheets, spreadsheetId, stream.name, headers) + sheetIds.push(sheetId) } + + // Protect all data tabs with a warning so users know edits may be overwritten + await protectSheets(sheets, spreadsheetId, sheetIds) + + return { spreadsheet_id: spreadsheetId } + }, + + async teardown({ config }: { config: Config }) { + const id = config.spreadsheet_id + if (!id) throw new Error('spreadsheet_id is required for teardown') + const drive = makeDriveClient(config) + await deleteSpreadsheet(drive, id) }, - async setup({ config }: { config: Config }) { - if (config.spreadsheet_id) return + async check({ config }: { config: Config }): Promise { const sheets = sheetsClient ?? makeSheetsClient(config) - const id = await ensureSpreadsheet(sheets, config.spreadsheet_title) - return { spreadsheet_id: id } + if (!config.spreadsheet_id) throw new Error('spreadsheet_id is required for check') + try { + await sheets.spreadsheets.get({ spreadsheetId: config.spreadsheet_id }) + return { status: 'succeeded' } + } catch (err) { + return { + status: 'failed', + message: err instanceof Error ? err.message : String(err), + } + } }, async *write( @@ -100,24 +189,98 @@ export function createDestination( ): AsyncIterable { const sheets = sheetsClient ?? makeSheetsClient(config) const batchSize = config.batch_size ?? 50 + const primaryKeys = new Map( + catalog.streams.map((configuredStream) => [ + configuredStream.stream.name, + configuredStream.stream.primary_key, + ]) + ) - // Resolve or create spreadsheet - spreadsheetId = - config.spreadsheet_id || (await ensureSpreadsheet(sheets, config.spreadsheet_title)) + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + } else { + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + } - // Per-stream state: column headers and buffered rows + // Per-stream state: column headers plus buffered appends/updates. const streamHeaders = new Map() - const streamBuffers = new Map() + const appendBuffers = new Map>() + const updateBuffers = new Map>() + const rowAssignments: Record> = {} + + const ensureHeadersForRecord = async ( + streamName: string, + cleanData: Record + ): Promise => { + let headers = streamHeaders.get(streamName) + + if (!headers) { + try { + headers = await readHeaderRow(sheets, spreadsheetId!, streamName) + } catch (error) { + const code = + error instanceof Error && 'code' in error + ? (error as { code?: number }).code + : undefined + if (code !== 400 && code !== 404) throw error + headers = [] + } + + if (headers.length === 0) { + headers = Object.keys(cleanData) + await ensureSheet(sheets, spreadsheetId!, streamName, headers) + } + + streamHeaders.set(streamName, headers) + appendBuffers.set(streamName, []) + updateBuffers.set(streamName, []) + } + + const next = extendHeaders(headers, cleanData) + if (next.changed) { + await ensureSheet(sheets, spreadsheetId!, streamName, next.headers) + streamHeaders.set(streamName, next.headers) + headers = next.headers + } + + return headers + } const flushStream = async (streamName: string) => { - const buffer = streamBuffers.get(streamName) - if (!buffer || buffer.length === 0) return - await appendRows(sheets, spreadsheetId!, streamName, buffer) - streamBuffers.set(streamName, []) + const updates = updateBuffers.get(streamName) + if (updates && updates.length > 0) { + await updateRows(sheets, spreadsheetId!, streamName, updates) + updateBuffers.set(streamName, []) + } + + const appends = appendBuffers.get(streamName) + if (!appends || appends.length === 0) return + + const range = await appendRows( + sheets, + spreadsheetId!, + streamName, + appends.map((entry) => entry.row) + ) + if (range) { + const expectedEndRow = range.startRow + appends.length - 1 + if (range.endRow !== expectedEndRow) { + throw new Error( + `Append row mismatch for ${streamName}: expected ${expectedEndRow}, got ${range.endRow}` + ) + } + for (let index = 0; index < appends.length; index++) { + const rowKey = appends[index]?.rowKey + if (!rowKey) continue + rowAssignments[streamName] ??= {} + rowAssignments[streamName][rowKey] = range.startRow + index + } + } + appendBuffers.set(streamName, []) } const flushAll = async () => { - for (const streamName of streamBuffers.keys()) { + for (const streamName of new Set([...appendBuffers.keys(), ...updateBuffers.keys()])) { await flushStream(streamName) } } @@ -126,23 +289,28 @@ export function createDestination( for await (const msg of $stdin) { if (msg.type === 'record') { const { stream, data } = msg + const cleanData = stripSystemFields(data) + const headers = await ensureHeadersForRecord(stream, cleanData) + const row = headers.map((header) => stringify(cleanData[header])) + const rowNumber = + typeof data[ROW_NUMBER_FIELD] === 'number' ? data[ROW_NUMBER_FIELD] : undefined + const primaryKey = primaryKeys.get(stream) + const rowKey = + typeof data[ROW_KEY_FIELD] === 'string' + ? data[ROW_KEY_FIELD] + : primaryKey + ? serializeRowKey(primaryKey, cleanData) + : undefined - // First record for this stream — discover headers, create tab - if (!streamHeaders.has(stream)) { - const headers = Object.keys(data) - streamHeaders.set(stream, headers) - streamBuffers.set(stream, []) - await ensureSheet(sheets, spreadsheetId!, stream, headers) + if (rowNumber !== undefined) { + updateBuffers.get(stream)!.push({ rowNumber, values: row }) + } else { + appendBuffers.get(stream)!.push({ row, rowKey }) } - // Map record data to row values in header order - const headers = streamHeaders.get(stream)! - const row = headers.map((h) => stringify(data[h])) - const buffer = streamBuffers.get(stream)! - buffer.push(row) - - // Flush when batch is full - if (buffer.length >= batchSize) { + const appendCount = appendBuffers.get(stream)?.length ?? 0 + const updateCount = updateBuffers.get(stream)?.length ?? 0 + if (appendCount + updateCount >= batchSize) { await flushStream(stream) } } else if (msg.type === 'state') { @@ -169,6 +337,18 @@ export function createDestination( stack_trace: err instanceof Error ? err.stack : undefined, } yield errorMsg + return + } + + if (Object.keys(rowAssignments).length > 0) { + yield { + type: 'log', + level: 'debug', + message: formatGoogleSheetsMetaLog({ + type: 'row_assignments', + assignments: rowAssignments, + }), + } } const logMsg: LogMessage = { diff --git a/packages/destination-google-sheets/src/metadata.ts b/packages/destination-google-sheets/src/metadata.ts new file mode 100644 index 000000000..325e78bd7 --- /dev/null +++ b/packages/destination-google-sheets/src/metadata.ts @@ -0,0 +1,44 @@ +export const ROW_KEY_FIELD = '_row_key' +export const ROW_NUMBER_FIELD = '_row_number' +export const GOOGLE_SHEETS_META_LOG_PREFIX = '__sync_engine_google_sheets__:' + +export interface GoogleSheetsRowAssignmentsMeta { + type: 'row_assignments' + assignments: Record> +} + +function getPathValue(data: Record, path: string[]): unknown { + let current: unknown = data + for (const segment of path) { + if (!current || typeof current !== 'object') return undefined + current = (current as Record)[segment] + } + return current +} + +export function serializeRowKey(primaryKey: string[][], data: Record): string { + return JSON.stringify(primaryKey.map((path) => getPathValue(data, path))) +} + +export function stripSystemFields(data: Record): Record { + return Object.fromEntries( + Object.entries(data).filter(([key]) => key !== ROW_KEY_FIELD && key !== ROW_NUMBER_FIELD) + ) +} + +export function formatGoogleSheetsMetaLog(meta: GoogleSheetsRowAssignmentsMeta): string { + return `${GOOGLE_SHEETS_META_LOG_PREFIX}${JSON.stringify(meta)}` +} + +export function parseGoogleSheetsMetaLog( + message: string +): GoogleSheetsRowAssignmentsMeta | undefined { + if (!message.startsWith(GOOGLE_SHEETS_META_LOG_PREFIX)) return undefined + try { + return JSON.parse( + message.slice(GOOGLE_SHEETS_META_LOG_PREFIX.length) + ) as GoogleSheetsRowAssignmentsMeta + } catch { + return undefined + } +} diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index 8c1ab252e..23e6fd242 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -1,4 +1,4 @@ -import type { sheets_v4 } from 'googleapis' +import type { drive_v3, sheets_v4 } from 'googleapis' /** * Low-level Sheets API write operations. @@ -48,13 +48,14 @@ export async function ensureSpreadsheet(sheets: sheets_v4.Sheets, title: string) /** * Ensure a tab (sheet) exists for a given stream name with a header row. * If the spreadsheet already has a "Sheet1" default tab, rename it for the first stream. + * Returns the numeric sheetId for use in subsequent API calls (e.g. protect range). */ export async function ensureSheet( sheets: sheets_v4.Sheets, spreadsheetId: string, streamName: string, headers: string[] -): Promise { +): Promise { // Get existing sheets const meta = await withRetry(() => sheets.spreadsheets.get({ @@ -63,12 +64,12 @@ export async function ensureSheet( }) ) const existing = meta.data.sheets ?? [] - const existingNames = existing.map((s) => s.properties?.title) - if (existingNames.includes(streamName)) { - // Tab already exists — write header row in case it's empty + // Tab already exists — write header row and return its ID + const found = existing.find((s) => s.properties?.title === streamName) + if (found) { await writeHeaderRow(sheets, spreadsheetId, streamName, headers) - return + return found.properties!.sheetId! } // If there's a default "Sheet1" and this is the first real stream, rename it @@ -77,6 +78,7 @@ export async function ensureSheet( existing[0]?.properties?.title === 'Sheet1' && existing[0]?.properties?.sheetId !== undefined ) { + const sheetId = existing[0].properties.sheetId! await withRetry(() => sheets.spreadsheets.batchUpdate({ spreadsheetId, @@ -84,10 +86,7 @@ export async function ensureSheet( requests: [ { updateSheetProperties: { - properties: { - sheetId: existing[0]!.properties!.sheetId!, - title: streamName, - }, + properties: { sheetId, title: streamName }, fields: 'title', }, }, @@ -95,19 +94,25 @@ export async function ensureSheet( }, }) ) - } else { - // Add a new tab - await withRetry(() => - sheets.spreadsheets.batchUpdate({ - spreadsheetId, - requestBody: { - requests: [{ addSheet: { properties: { title: streamName } } }], - }, - }) - ) + await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } + // Add a new tab and capture its sheetId from the response + const addRes = await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: streamName } } }], + }, + }) + ) + const sheetId = addRes.data.replies?.[0]?.addSheet?.properties?.sheetId + if (sheetId == null) { + throw new Error(`Failed to get sheetId for new sheet "${streamName}"`) + } await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } async function writeHeaderRow( @@ -116,6 +121,7 @@ async function writeHeaderRow( sheetName: string, headers: string[] ): Promise { + if (headers.length === 0) return await withRetry(() => sheets.spreadsheets.values.update({ spreadsheetId, @@ -126,16 +132,155 @@ async function writeHeaderRow( ) } +/** Read the first row from a sheet tab and treat it as headers. */ +export async function readHeaderRow( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string +): Promise { + const res = await withRetry(() => + sheets.spreadsheets.values.get({ + spreadsheetId, + range: `'${sheetName}'!1:1`, + }) + ) + const [headerRow] = res.data.values ?? [] + return Array.isArray(headerRow) ? headerRow.map((value) => String(value)) : [] +} + +function parseUpdatedRows(updatedRange: string): { startRow: number; endRow: number } { + const match = updatedRange.match(/![A-Z]+(\d+)(?::[A-Z]+(\d+))?$/i) + if (!match) throw new Error(`Unable to parse updated range: ${updatedRange}`) + return { + startRow: Number(match[1]), + endRow: Number(match[2] ?? match[1]), + } +} + +/** + * Create or update an "Overview" intro tab at index 0. + * Lists the synced streams and warns users not to edit data tabs. + */ +export async function createIntroSheet( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + streamNames: string[] +): Promise { + const TITLE = 'Overview' + + const meta = await withRetry(() => + sheets.spreadsheets.get({ spreadsheetId, fields: 'sheets.properties' }) + ) + const existing = meta.data.sheets ?? [] + const hasOverview = existing.some((s) => s.properties?.title === TITLE) + + if (!hasOverview) { + // Rename "Sheet1" if it's the only tab, otherwise insert at index 0 + const onlySheet1 = + existing.length === 1 && + existing[0]?.properties?.title === 'Sheet1' && + existing[0]?.properties?.sheetId !== undefined + if (onlySheet1) { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + updateSheetProperties: { + properties: { sheetId: existing[0]!.properties!.sheetId!, title: TITLE }, + fields: 'title', + }, + }, + ], + }, + }) + ) + } else { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: TITLE, index: 0 } } }], + }, + }) + ) + } + } + + const now = new Date().toISOString() + const rows = [ + ['Stripe Sync Engine'], + [''], + ['This spreadsheet is managed by Stripe Sync Engine.'], + ['Data is synced automatically from your Stripe account.'], + [''], + ['Synced streams:'], + ...streamNames.map((name) => [` • ${name}`]), + [''], + [`Last setup: ${now}`], + [''], + ['⚠️ Do not edit data in the synced tabs. Changes will be overwritten on the next sync.'], + ] + + await withRetry(() => + sheets.spreadsheets.values.update({ + spreadsheetId, + range: `'${TITLE}'!A1`, + valueInputOption: 'RAW', + requestBody: { values: rows }, + }) + ) +} + +/** + * Add warning-only protection to a set of sheets by their numeric sheetIds. + * Users will see a warning dialog before editing but are not blocked. + * Idempotent — skips sheets that already have protection. + */ +export async function protectSheets( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetIds: number[] +): Promise { + for (const sheetId of sheetIds) { + try { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + addProtectedRange: { + protectedRange: { + range: { sheetId }, + description: + 'Managed by Stripe Sync Engine — edits may be overwritten on next sync', + warningOnly: true, + }, + }, + }, + ], + }, + }) + ) + } catch (err) { + if (err instanceof Error && err.message.includes('already has sheet protection')) continue + throw err + } + } +} + /** Append rows to a named sheet tab. Values are stringified for Sheets. */ export async function appendRows( sheets: sheets_v4.Sheets, spreadsheetId: string, sheetName: string, rows: unknown[][] -): Promise { +): Promise<{ startRow: number; endRow: number } | undefined> { if (rows.length === 0) return - await withRetry(() => + const res = await withRetry(() => sheets.spreadsheets.values.append({ spreadsheetId, range: `'${sheetName}'!A1`, @@ -144,6 +289,43 @@ export async function appendRows( requestBody: { values: rows }, }) ) + const updatedRange = res.data.updates?.updatedRange + return updatedRange ? parseUpdatedRows(updatedRange) : undefined +} + +/** + * Update specific rows in a sheet by their 1-based row numbers. + * Uses a single batchUpdate call for efficiency. + */ +export async function updateRows( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string, + updates: { rowNumber: number; values: string[] }[] +): Promise { + if (updates.length === 0) return + + for (const update of updates) { + await withRetry(() => + sheets.spreadsheets.values.update({ + spreadsheetId, + range: `'${sheetName}'!A${update.rowNumber}`, + valueInputOption: 'RAW', + requestBody: { values: [update.values] }, + }) + ) + } +} + +/** + * Permanently delete a spreadsheet file via the Drive API. + * The Sheets API does not support deletion — Drive is required. + */ +export async function deleteSpreadsheet( + drive: drive_v3.Drive, + spreadsheetId: string +): Promise { + await withRetry(() => drive.files.delete({ fileId: spreadsheetId })) } /** Read all values from a sheet tab. Used for verification in tests. */ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c8fcd8f72..f7cec1f94 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -470,6 +470,9 @@ importers: specifier: ^4.3.6 version: 4.3.6 devDependencies: + '@types/node': + specifier: ^25.5.0 + version: 25.5.0 vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) @@ -7597,7 +7600,6 @@ snapshots: '@types/node@25.5.0': dependencies: undici-types: 7.18.2 - optional: true '@types/pg@8.15.6': dependencies: @@ -9660,8 +9662,7 @@ snapshots: undici-types@7.16.0: {} - undici-types@7.18.2: - optional: true + undici-types@7.18.2: {} undici@7.24.6: {}