Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions backend/migrations/dead_letter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Dead-letter queue migrations

These scripts create the `powersync_dead_letter` table/collection that the backend writes to when a transaction fails with a non-recoverable error (constraint violations, schema mismatches, validation failures). The scripts are templates — copy them into your own migration workflow and adapt as needed.

## Applying

| Source DB | Command |
| --- | --- |
| Postgres | `psql "$DATABASE_URI" -f postgres.sql` |
| MySQL | `mysql --defaults-extra-file=... < mysql.sql` |
| MSSQL | `sqlcmd -S ... -i mssql.sql` |
| MongoDB | `mongosh "$DATABASE_URI" mongo.js` |

## What's in a dead-letter row

Each row carries the full uploaded transaction plus a pointer to the operation that caused the rollback:

- `id` — DLQ row id (UUID).
- `transaction_id` — client-side transaction id (may be null per the wire spec).
- `crud` — JSON payload of the full `CrudEntry[]` as uploaded.
- `failed_client_id` — `CrudEntry.client_id` of the offending op.
- `failed_table`, `failed_op` — table and op type (`PUT|PATCH|DELETE`) of the offender.
- `error_code` — machine classification (`UNIQUE_VIOLATION`, `FOREIGN_KEY_VIOLATION`, `SCHEMA_MISMATCH`, ...).
- `error_message` — driver-supplied human-readable detail.
- `created_at` — when the DLQ row was written.

## What's covered

Errors the server classifies as non-transient. These come back to the client as `status: 'dead_lettered'`; the client completes the transaction and the row lands here.

## What's _not_ covered

- **Stuck transient errors.** A "retryable" error (network blip, deadlock, DB unavailable) that never resolves keeps the client upload queue stuck — it does not land in the DLQ. Fix the underlying issue or build a higher-level escape hatch.
- **Mutator failures.** `/api/mutators/invoke` is a separate write channel with its own failure semantics. Not handled by this DLQ.

## MongoDB caveat

The MongoDB persister currently runs ops in a loop without wrapping them in a multi-document transaction. If op #3 of 5 fails, ops #1 and #2 are already committed. The DLQ entry still records the full transaction and the failing-op pointer, but **the entry represents intent that was partially applied** — naive replay against the same `_id` set will collide. When designing replay or resolution for Mongo, account for this.

The other persisters (Postgres, MySQL, MSSQL) wrap the batch in a real transaction, so their DLQ entries always represent a clean rollback.

## Resolution hook

`backend/src/dead-letter-hook.ts` exports an `onDeadLetter` function that fires after each DLQ row is committed. It's fire-and-forget — errors in the hook are logged and dropped; they never block the upload response. Wire it to Slack, an admin dashboard refresh, metrics, or anything else.
4 changes: 4 additions & 0 deletions backend/migrations/dead_letter/mongo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Run with: mongosh "$DATABASE_URI" mongo.js
db.createCollection('powersync_dead_letter');
db.powersync_dead_letter.createIndex({ created_at: -1 });
db.powersync_dead_letter.createIndex({ failed_table: 1 });
17 changes: 17 additions & 0 deletions backend/migrations/dead_letter/mssql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE powersync_dead_letter (
id UNIQUEIDENTIFIER NOT NULL PRIMARY KEY,
transaction_id BIGINT NULL,
crud NVARCHAR(MAX) NOT NULL,
failed_client_id BIGINT NOT NULL,
failed_table NVARCHAR(255) NOT NULL,
failed_op NVARCHAR(16) NOT NULL,
error_code NVARCHAR(64) NOT NULL,
error_message NVARCHAR(MAX) NULL,
created_at DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);

CREATE INDEX powersync_dead_letter_created_at_idx
ON powersync_dead_letter (created_at DESC);

CREATE INDEX powersync_dead_letter_table_idx
ON powersync_dead_letter (failed_table);
13 changes: 13 additions & 0 deletions backend/migrations/dead_letter/mysql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE powersync_dead_letter (
id CHAR(36) NOT NULL PRIMARY KEY,
transaction_id BIGINT NULL,
crud JSON NOT NULL,
failed_client_id BIGINT NOT NULL,
failed_table VARCHAR(255) NOT NULL,
failed_op VARCHAR(16) NOT NULL,
error_code VARCHAR(64) NOT NULL,
error_message TEXT NULL,
created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
INDEX powersync_dead_letter_created_at_idx (created_at),
INDEX powersync_dead_letter_table_idx (failed_table)
);
17 changes: 17 additions & 0 deletions backend/migrations/dead_letter/postgres.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE powersync_dead_letter (
id UUID PRIMARY KEY,
transaction_id BIGINT,
crud JSONB NOT NULL,
failed_client_id BIGINT NOT NULL,
failed_table TEXT NOT NULL,
failed_op TEXT NOT NULL,
error_code TEXT NOT NULL,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX powersync_dead_letter_created_at_idx
ON powersync_dead_letter (created_at DESC);

CREATE INDEX powersync_dead_letter_table_idx
ON powersync_dead_letter (failed_table);
22 changes: 18 additions & 4 deletions backend/src/api/data.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import express, { type Request, type Response } from 'express';
import { randomUUID } from 'crypto';
import config from '../../config.js';
import { factories } from '../persistance/persister-factories.js';
import { FatalOperationError, RetryableError } from '../errors.js';
import type { OpBody, OpResponse } from '../types.js';
import { onDeadLetter } from '../dead-letter-hook.js';
import type { DeadLetterEntry, OpBody, OpResponse } from '../types.js';

const router = express.Router();

Expand All @@ -14,7 +16,7 @@ if (!config.database.uri) {
throw new Error('DATABASE_URI environment variable is required');
}

const { updateBatch } = await persistenceFactory(config.database.uri);
const persister = await persistenceFactory(config.database.uri, { onDeadLetter });

/**
* Handle a CrudTransaction.
Expand All @@ -26,12 +28,24 @@ router.post(
res: Response<OpResponse<'postCrudTransaction'>>
) => {
try {
await updateBatch(req.body.crud);
await persister.updateBatch(req.body.crud);
res.status(200).send({ status: 'success', message: 'Transaction completed' });
} catch (e) {
if (e instanceof FatalOperationError) {
const entry: DeadLetterEntry = {
id: randomUUID(),
transaction_id: req.body.transaction_id ?? null,
crud: req.body.crud,
failed_client_id: e.failedOp.client_id,
failed_table: e.failedOp.table,
failed_op: e.failedOp.op,
error_code: e.errorCode,
error_message: e.message,
created_at: new Date().toISOString()
};
await persister.writeDeadLetter(entry);
res.status(200).send({
status: 'fatal_error',
status: 'dead_lettered',
message: e.message,
failed_operation: { error_code: e.errorCode, message: e.message }
});
Expand Down
11 changes: 11 additions & 0 deletions backend/src/dead-letter-hook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { DeadLetterEntry } from './types.js';

/**
* Fires after a transaction has been persisted to the dead-letter queue.
* Fire-and-forget: errors here are logged and dropped; this never blocks
* the upload response. Wire to Slack, an admin dashboard refresh, metrics,
* etc.
*/
export const onDeadLetter = async (entry: DeadLetterEntry): Promise<void> => {
console.warn('Dead-letter entry:', entry.id, entry.error_code, entry.failed_table);
};
5 changes: 4 additions & 1 deletion backend/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { CrudEntry } from './types.js';

/** Transient failure (deadlock, timeout, connection error). Client should retry. */
export class RetryableError extends Error {
constructor(message: string) {
Expand All @@ -9,7 +11,8 @@ export class RetryableError extends Error {
export class FatalOperationError extends Error {
constructor(
public readonly errorCode: string,
message: string
message: string,
public readonly failedOp: CrudEntry
) {
super(message);
}
Expand Down
7 changes: 6 additions & 1 deletion backend/src/generated/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ export interface components {
* client should retry.
* fatal_error: transaction rolled back due to a non-recoverable
* issue — see failed_operation for details.
* dead_lettered: a non-recoverable error occurred and the
* transaction was persisted to the dead-letter queue for
* later resolution. The client should complete the transaction
* (the server has taken responsibility for it) and may surface
* this to the user.
* @enum {string}
*/
status: "success" | "retryable_error" | "fatal_error";
status: "success" | "retryable_error" | "fatal_error" | "dead_lettered";
/** @description Suggested retry delay in ms. Only meaningful for retryable_error. */
retry_after_ms?: number;
/** @description Present when status is fatal_error. Identifies what caused the rollback. */
Expand Down
34 changes: 30 additions & 4 deletions backend/src/persistance/mongo/mongo-persistance.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import * as mongo from 'mongodb';
import type { Persister, CrudEntry } from '../../types.js';
import type { Persister, PersisterConfig, CrudEntry, DeadLetterEntry } from '../../types.js';
import { RetryableError, FatalOperationError } from '../../errors.js';
import type { EntryMapper } from '../../mapping/types.js';
import { mongoMapper } from '../../mapping/mongo.js';

export const createMongoPersister = async (uri: string, mapper: EntryMapper = mongoMapper): Promise<Persister> => {
const DEAD_LETTER_COLLECTION = 'powersync_dead_letter';

export const createMongoPersister = async (uri: string, config: PersisterConfig = {}): Promise<Persister> => {
console.debug('Using MongoDB Persister');

const mapper = config.mapper ?? mongoMapper;
const onDeadLetter = config.onDeadLetter;

const client = new mongo.MongoClient(uri);
const db = client.db();
await client.connect();

const persister: Persister = {
updateBatch: async (batch: CrudEntry[]) => {
// TODO: Use batches & transactions.
let currentOp: CrudEntry | null = null;
try {
for (const op of batch) {
currentOp = op;
const mapped = mapper(op);
if (mapped === null) continue;

Expand All @@ -33,12 +39,32 @@ export const createMongoPersister = async (uri: string, mapper: EntryMapper = mo
} catch (e) {
const err = e as Error & { code?: number; hasErrorLabel?: (label: string) => boolean };
if (err.code === 11000) {
throw new FatalOperationError('UNIQUE_VIOLATION', err.message);
throw new FatalOperationError('UNIQUE_VIOLATION', err.message, currentOp!);
} else if (err.hasErrorLabel?.('TransientTransactionError')) {
throw new RetryableError(err.message);
}
throw new RetryableError(err.message);
}
},

writeDeadLetter: async (entry: DeadLetterEntry) => {
const collection = db.collection(DEAD_LETTER_COLLECTION);
await collection.insertOne({
_id: entry.id as unknown as mongo.ObjectId,
transaction_id: entry.transaction_id,
crud: entry.crud,
failed_client_id: entry.failed_client_id,
failed_table: entry.failed_table,
failed_op: entry.failed_op,
error_code: entry.error_code,
error_message: entry.error_message,
created_at: new Date(entry.created_at)
});
if (onDeadLetter) {
void Promise.resolve(onDeadLetter(entry)).catch((err) =>
console.error('onDeadLetter hook failed:', err)
);
}
}
};

Expand Down
39 changes: 34 additions & 5 deletions backend/src/persistance/mssql/mssql-persistance.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { URL } from 'url';
import sql from 'mssql';
import type { Persister, CrudEntry } from '../../types.js';
import type { Persister, PersisterConfig, CrudEntry, DeadLetterEntry } from '../../types.js';
import { RetryableError, FatalOperationError } from '../../errors.js';
import type { EntryMapper } from '../../mapping/types.js';
import { defaultMapper } from '../../mapping/default.js';

function escapeIdentifier(identifier: string): string {
return `[${identifier}]`;
}

export const createMSSQLPersister = async (uri: string, mapper: EntryMapper = defaultMapper): Promise<Persister> => {
export const createMSSQLPersister = async (uri: string, config: PersisterConfig = {}): Promise<Persister> => {
console.debug('Using MSSQL Persister');

const mapper = config.mapper ?? defaultMapper;
const onDeadLetter = config.onDeadLetter;

const url = new URL(uri);

const pool = new sql.ConnectionPool({
Expand All @@ -35,10 +37,12 @@ export const createMSSQLPersister = async (uri: string, mapper: EntryMapper = de
const persister: Persister = {
updateBatch: async (batch: CrudEntry[]) => {
const transaction = pool.transaction();
let currentOp: CrudEntry | null = null;
try {
await transaction.begin();

for (const op of batch) {
currentOp = op;
const mapped = mapper(op);
if (mapped === null) continue;

Expand Down Expand Up @@ -114,12 +118,37 @@ export const createMSSQLPersister = async (uri: string, mapper: EntryMapper = de
const err = e as Error & { number?: number };
const num = err.number ?? 0;
if (num === 2627 || num === 2601) {
throw new FatalOperationError('UNIQUE_VIOLATION', err.message);
throw new FatalOperationError('UNIQUE_VIOLATION', err.message, currentOp!);
} else if (num === 547) {
throw new FatalOperationError('FOREIGN_KEY_VIOLATION', err.message);
throw new FatalOperationError('FOREIGN_KEY_VIOLATION', err.message, currentOp!);
}
throw new RetryableError(err.message);
}
},

writeDeadLetter: async (entry: DeadLetterEntry) => {
const request = pool.request();
request.input('id', sql.UniqueIdentifier, entry.id);
request.input('transaction_id', sql.BigInt, entry.transaction_id);
request.input('crud', sql.NVarChar(sql.MAX), JSON.stringify(entry.crud));
request.input('failed_client_id', sql.BigInt, entry.failed_client_id);
request.input('failed_table', sql.NVarChar, entry.failed_table);
request.input('failed_op', sql.NVarChar, entry.failed_op);
request.input('error_code', sql.NVarChar, entry.error_code);
request.input('error_message', sql.NVarChar(sql.MAX), entry.error_message);
request.input('created_at', sql.DateTime2, entry.created_at);
await request.query(
`INSERT INTO powersync_dead_letter
(id, transaction_id, crud, failed_client_id, failed_table,
failed_op, error_code, error_message, created_at)
VALUES (@id, @transaction_id, @crud, @failed_client_id, @failed_table,
@failed_op, @error_code, @error_message, @created_at)`
);
if (onDeadLetter) {
void Promise.resolve(onDeadLetter(entry)).catch((err) =>
console.error('onDeadLetter hook failed:', err)
);
}
}
};
return persister;
Expand Down
Loading