This document is a code-accurate explanation of how syncing is triggered, queued, processed, retried, and observed in this repo. It is intended to be a “single source of truth” you can use while debugging production behavior.
It covers:
- Supabase deployment & first-time setup
- Edge Functions and how they authenticate
- Database schema: runs, object runs, cursors
- pg_cron + pg_net trigger loop
- pgmq queue semantics
- Worker processing & re-queue behavior
- Webhook processing
- Event catch-up (
_event_catchup) - Sigma worker (separate loop)
- CLI modes (local/dev + backfills)
Throughout, code references are to the monorepo package packages/sync-engine.
There are two complementary sync mechanisms:
-
Realtime updates via Stripe webhooks
- Stripe calls a Supabase Edge Function (
stripe-webhook) - The engine verifies the signature, routes by event type, and upserts/deletes rows accordingly.
- Stripe calls a Supabase Edge Function (
-
Continuous reconciliation via a scheduled worker
- Postgres
pg_cronperiodically calls another Edge Function (stripe-worker) viapg_net - That worker pulls work items from a durable pgmq queue
- Each work item is “sync the next page of object X”
- If more pages exist, the worker re-enqueues the same object so it will be picked up again
- Postgres
Additionally:
- A special object
_event_catchupuses Stripe Events API to catch missed webhook changes. - If Sigma is enabled, a separate Sigma worker runs on its own schedule and continues via a self-trigger.
Source files (checked into this repo):
packages/sync-engine/src/supabase/edge-functions/stripe-setup.tspackages/sync-engine/src/supabase/edge-functions/stripe-webhook.tspackages/sync-engine/src/supabase/edge-functions/stripe-worker.tspackages/sync-engine/src/supabase/edge-functions/sigma-data-worker.ts
Deployment plumbing (turns those TS files into deployable function bodies):
packages/sync-engine/src/supabase/edge-function-code.tspackages/sync-engine/src/supabase/supabase.ts(Supabase Management API client + installer)
Main class:
packages/sync-engine/src/stripeSync.ts(StripeSync)
Database access:
packages/sync-engine/src/database/postgres.ts(PostgresClient)- migrations in
packages/sync-engine/src/database/migrations/*.sql - migrations runner in
packages/sync-engine/src/database/migrate.ts
packages/sync-engine/src/cli/commands.ts
The install flow is orchestrated by SupabaseSetupClient.install():
packages/sync-engine/src/supabase/supabase.ts
At a high level it does:
- Validate project access via Supabase Management API
- Create schema (so it can store an installation comment marker)
- Deploy edge functions
- Set function secrets (env vars)
- Invoke
stripe-setup(POST) to run migrations + create a managed webhook in Stripe - Create pg_cron job(s) and queue(s)
- Update schema comment to reflect successful installation
Supabase install writes a comment on the stripe schema used by stripe-setup GET “status” endpoint:
- logic in
packages/sync-engine/src/supabase/supabase.ts(updateInstallationComment,isInstalled) - status endpoint in
packages/sync-engine/src/supabase/edge-functions/stripe-setup.ts(GET)
The comment includes stripe-sync plus suffixes like:
installation:startedinstallation:errorinstalled
This is how the system distinguishes:
- “not installed”
- “installation started but incomplete”
- “installed”
- “failed install”
stripe-setup.ts is a multipurpose function:
- GET: returns installation status and recent run summaries (if present)
- POST: runs migrations and creates/ensures a managed webhook
- DELETE: uninstall: deletes managed webhooks, unschedules cron, deletes vault secrets, drops schema, deletes edge functions and secrets
File: packages/sync-engine/src/supabase/edge-functions/stripe-setup.ts
Every request must have Authorization: Bearer <token>.
stripe-setup then validates this bearer token by calling Supabase’s Management API:
- It extracts the project ref from
SUPABASE_URL - It calls
GET /v1/projects/{projectRef}with that bearer token - If that succeeds, the token is valid for that project
This is why stripe-setup can be invoked from the installer using the Supabase access token.
During install, stripe-setup creates (or reuses) a managed webhook in Stripe whose URL is:
SUPABASE_URL + '/functions/v1/stripe-webhook'
stripe-setup uses:
StripeSync.findOrCreateManagedWebhook(url)(inpackages/sync-engine/src/stripeSync.ts)
File: packages/sync-engine/src/supabase/edge-functions/stripe-webhook.ts
Flow:
- Requires
POST - Reads
stripe-signatureheader - Reads raw request body
- Instantiates
StripeSync - Calls
stripeSync.processWebhook(rawBody, sig)
Inside StripeSync.processWebhook(payload, signature):
- If
stripeWebhookSecretis configured explicitly, it uses that. - Otherwise it fetches the managed webhook secret from the DB table
stripe._managed_webhooksscoped to the current account and uses it to verify the signature.
That’s why managed webhooks work even without the caller supplying a signing secret.
After signature verification, StripeSync.processEvent(event):
- Resolves the Stripe account ID for the current API key (and upserts account into DB if needed)
- Looks up the handler in an
eventHandlersmap (event type → handler) - Calls the handler which upserts/deletes entities
The handler registry is in packages/sync-engine/src/stripeSync.ts as eventHandlers.
The engine supports a safety mode where webhook payloads are treated as “triggers” and the engine refetches the canonical object from Stripe before writing, controlled by:
StripeSyncConfig.revalidateObjectsViaStripeApi- helpers in
StripeSynclikeshouldRefetchEntity()/fetchOrUseWebhookData()
This is not a separate trigger mechanism; it changes how webhook handlers source truth.
SupabaseSetupClient.setupPgCronJob() installs:
- extensions:
pg_cron,pg_net,pgmq - a pgmq queue named
stripe_sync_work - a vault secret
stripe_sync_worker_secret - a cron job
stripe-sync-workerthat periodically calls:net.http_post('https://{projectRef}.supabase.co/functions/v1/stripe-worker', Authorization: Bearer <vault_secret>)
All of that is assembled and run via Supabase Management API in:
packages/sync-engine/src/supabase/supabase.ts
File: packages/sync-engine/src/supabase/edge-functions/stripe-worker.ts
The worker:
- Requires
Authorization: Bearer <token> - Fetches the expected secret from
vault.decrypted_secretswhere name isstripe_sync_worker_secret - Compares the bearer token to the stored secret
This design avoids relying on Supabase service_role JWTs; the database itself issues and validates the secret.
Worker constants:
- Queue:
stripe_sync_work - Visibility timeout: 60 seconds
- Batch size: 10 messages per invocation
The worker reads with:
pgmq.read(queue, vt_seconds, qty)
Semantics:
- Messages are hidden from other readers for
vtseconds (visibility timeout). - If a worker crashes and doesn’t delete the message, it reappears after
vtseconds.
If pgmq.read() returns no messages, the worker enqueues one message per object.
To discover which objects exist, it calls:
StripeSync.joinOrCreateSyncRun('worker')
That method returns an ordered list from:
StripeSync.getSupportedSyncObjects()
and it also ensures corresponding _sync_obj_runs rows exist for observability.
Then the worker sends a batch of JSON messages:
{ "object": "<objectName>" }
For each message in the batch, the worker does (in parallel via Promise.all):
- Read
{ object } - Call
stripeSync.processNext(object) - If successful:
pgmq.delete(msg_id) - If
hasMore:pgmq.send({object})to requeue it - If error: it logs and leaves the message (it becomes visible again after vt)
This is subtle but critical if you are reading stripe.sync_runs for observability.
- When the queue is empty, the worker calls
joinOrCreateSyncRun('worker')(triggered_by ='worker') to enqueue objects. - But when processing messages, it calls
processNext(object)without parameters. processNext()defaults totriggeredBy = 'processNext'when it creates/joins a run.
So “enqueue run” and “processing run” can be different runs, because they use different triggered_by values.
If you expect “the run created during enqueue” to reflect processing progress, it may not.
If you want run tracking to be aligned, the worker would need to pass { triggeredBy: 'worker', runStartedAt: ... } into processNext(). (This is not what the current worker does.)
StripeSync.processNext(object, params?) is the engine’s unit of work for the worker queue.
File: packages/sync-engine/src/stripeSync.ts
For each call:
- Ensure account exists in DB for FK integrity (
getCurrentAccount()) - Determine the run:
- use
params.runStartedAtif provided - else
joinOrCreateSyncRun(params.triggeredBy ?? 'processNext')
- use
- Ensure an object-run row exists for this object in that run:
postgresClient.createObjectRuns(...)
- If object is already terminal (
complete/error), returnhasMore=false - If object is
pending, try to claim it viatryStartObjectSync()which enforcesmax_concurrent- If not claimed (at concurrency limit), return
hasMore=trueso the caller can retry later
- If not claimed (at concurrency limit), return
There are two independent pieces of state:
-
Incremental cursor (
_sync_obj_runs.cursor)- Used as
created.gtefor most Stripe list endpoints - It typically stores the maximum
createdtimestamp seen so far (monotonically increasing) processNext()intentionally does not use the current run’s cursor for created filtering; it prefers the previous completed run’s cursor unless an explicit created filter is passed.
- Used as
-
Page cursor (
_sync_obj_runs.page_cursor)- Used as
starting_afterfor Stripe pagination within a single sweep/backfill - It stores the last object ID (or last event ID for
_event_catchup)
- Used as
When Stripe returns has_more=false, processNext() completes the object-run:
postgresClient.completeObjectSync(...)- which may auto-close the parent run if all objects are done
On error, it marks the object-run as failed:
postgresClient.failObjectSync(...)
Within StripeSync.fetchOnePage() there are non-obvious branches:
payment_methoduses a custom handler (fetchOnePagePaymentMethods) because it requires customer context.tax_idis immediately completed without syncing, due to permission limitations in some account contexts._event_catchuphas its own paging/processing strategy (next section).- Sigma-backed objects go through
fetchOneSigmaPage()(Sigma section).
_event_catchup exists to reduce the “missed webhook” surface area by periodically reconciling recent Stripe Events.
Key characteristics:
- Uses Stripe Events API (
stripe.events.list) - Works within Stripe’s retention window (clamps to 30 days)
- Deduplicates events to latest per entity
- For non-hard-deletes, it may skip refetch if local
_last_synced_atis already newer than the event
Implementation:
StripeSync.fetchOnePageEventCatchup(...)inpackages/sync-engine/src/stripeSync.ts
Hard delete logic:
- Only certain event types are treated as “true hard deletes” in the DB (e.g.
product.deleted,price.deleted, etc.) - Other “*.deleted” events that represent a state change are handled by re-fetch + upsert.
This mechanism is why _event_catchup is placed late in object ordering: it’s meant to reconcile after primary entities are present.
Core tables:
stripe._sync_runs(one “run” per account + triggered_by, open whileclosed_at IS NULL)stripe._sync_obj_runs(one row per object per run, status pending/running/complete/error)
Key migrations:
0053_sync_observability.sqlintroduces the system (initial shape)0056_sync_run_closed_at.sqlswitches toclosed_atand derived status0057_rename_sync_tables.sqlpluralizes table names and createsstripe.sync_runsview0058_improve_sync_runs_status.sqlimproves status derivation logic in the view0061_add_page_cursor.sqladdspage_cursor0062_sigma_query_runs.sqlupdates run uniqueness to allow parallel runs bytriggered_by
Current constraint (after 0062) allows multiple simultaneous runs as long as triggered_by differs (e.g. sigma vs worker):
- Exclusion constraint is on
(_account_id, coalesce(triggered_by,'default'))whereclosed_at IS NULL.
When an object-run becomes terminal (complete or error), completeObjectSync() / failObjectSync() checks whether all object-runs are terminal and if so sets:
stripe._sync_runs.closed_at = now()
Before creating/returning a run in PostgresClient.getOrCreateSyncRun(), the engine calls:
cancelStaleRuns(accountId)
This marks running object-runs as errored if they have not updated recently and may close runs that are fully terminal afterward.
This is meant to recover from crashed/abandoned runs.
StripeSync.findOrCreateManagedWebhook(url) is responsible for:
- Ensuring a Stripe webhook endpoint exists and is enabled
- Cleaning up old/orphaned managed webhooks
- Persisting the webhook (including the signing secret) in Postgres
Important correctness mechanisms:
- Uses a Postgres advisory lock to prevent concurrent creation races.
- Also enforces DB-level uniqueness of
(url, account_id)via migration:0052_webhook_url_uniqueness.sql
Webhook secret is later retrieved by processWebhook() to verify signatures without additional configuration.
Sigma is optional and controlled by ENABLE_SIGMA (installer sets it as a secret env var for Edge Functions).
There are two Sigma aspects:
-
Schema + table reconciliation (migrations)
runMigrations({ enableSigma: true })triggers dynamic creation/reconciliation of Sigma destination tables.- Implementation lives in
packages/sync-engine/src/database/migrate.ts(Sigma schema reconciliation logic).
-
Sigma worker execution
- Installer sets up:
stripe.trigger_sigma_worker()(a DB function that calls the Edge Function vianet.http_post)- a pg_cron job
stripe-sigma-workerscheduled at0 */12 * * * - a vault secret
stripe_sigma_worker_secret
- All in
SupabaseSetupClient.setupSigmaPgCronJob()(packages/sync-engine/src/supabase/supabase.ts)
- Installer sets up:
File: packages/sync-engine/src/supabase/edge-functions/sigma-data-worker.ts
High-level loop:
- Authenticate via
stripe_sigma_worker_secret - Ensure/claim a sigma run (
triggered_by='sigma-worker') - Create object-runs for sigma objects
- Process a small batch (configured by
BATCH_SIZE, currently 1) - If work remains, call
SELECT stripe.trigger_sigma_worker()to invoke itself again - Stop self-triggering if run age exceeds
MAX_RUN_AGE_MS
As of the current repo state, sigma-data-worker.ts imports StripeSync from npm:stripe-experiment-sync rather than the same package import used by the other edge functions.
If this is unintentional, it can cause real-world divergence (different engine version/behavior for sigma worker vs core).
The CLI has two main roles:
-
One-off backfill
backfillCommand()runs migrations, creates aStripeSync, and callsprocessUntilDone()/processUntilDoneParallel().
-
Live sync in local mode
syncCommand()supports:- WebSocket mode (default if ngrok token is absent): receives Stripe events via a Stripe websocket client and calls
stripeSync.processEvent(payload) - Webhook mode (ngrok + express): sets up a public webhook URL, creates a managed webhook endpoint, receives webhook requests locally, and calls
stripeSync.processWebhook(rawBody, sig)
- WebSocket mode (default if ngrok token is absent): receives Stripe events via a Stripe websocket client and calls
- After setup, it typically runs:
- a “historical backfill sweep” via
processUntilDoneParallel() - then an “incremental backfill” via
processUntilDone() - then streams live changes
- a “historical backfill sweep” via
This CLI path is separate from Supabase Edge Functions; it’s primarily for development/testing or non-Supabase deployments.
If a worker invocation crashes mid-batch:
- Messages not deleted reappear after the pgmq visibility timeout (60s).
processNext()is designed to be safe to call repeatedly because progress is persisted in_sync_obj_runs(cursor + page cursor + status).
The engine defensively fails an object-run if Stripe ever returns has_more=true with an empty page, to avoid infinite loops.
If an object-run is marked running but stops updating for a while, cancelStaleRuns() may mark it errored and close the run so new work can proceed.
-
Worker not running at all
- Check
cron.jobforstripe-sync-worker - Check
vault.secretsforstripe_sync_worker_secret - Check Edge Function logs for
stripe-worker - Installer logic:
packages/sync-engine/src/supabase/supabase.ts(setupPgCronJob)
- Check
-
Queue is always empty / never enqueues
- Edge worker enqueue branch:
packages/sync-engine/src/supabase/edge-functions/stripe-worker.ts(messages.length === 0) - Check pgmq queue existence: created in
setupPgCronJob()
- Edge worker enqueue branch:
-
Runs show “pending” but nothing progresses
- Check the
triggered_bymismatch described above (enqueue uses'worker'; processing uses'processNext'by default). - Inspect
stripe.sync_runsview grouped bytriggered_by.
- Check the
-
Missing webhook updates
- Ensure managed webhook exists/enabled in Stripe (managed webhook logic in
StripeSync.findOrCreateManagedWebhook) - Confirm
stripe-webhookcan verify signatures (secret is stored instripe._managed_webhooks) _event_catchupis designed to backstop missed webhooks; verify it’s running and cursor advances.
- Ensure managed webhook exists/enabled in Stripe (managed webhook logic in
-
Sigma not syncing
- Confirm
ENABLE_SIGMAwas set during install - Confirm cron job
stripe-sigma-workerexists and vault secretstripe_sigma_worker_secretexists - Confirm
sigma-data-workerimport/package is correct for your deployment expectations
- Confirm