diff --git a/.release-please-manifest.premain.json b/.release-please-manifest.premain.json index 6362bad..63eaaf5 100644 --- a/.release-please-manifest.premain.json +++ b/.release-please-manifest.premain.json @@ -1,3 +1,3 @@ { - ".": "1.2.0-rc" + ".": "1.3.0-rc.1" } diff --git a/CHANGELOG.md b/CHANGELOG.md index ccc8403..ee88510 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,30 @@ # Changelog +## [1.3.0-rc.1](https://github.com/theory-cloud/TableTheory/compare/v1.3.0-rc...v1.3.0-rc.1) (2026-01-23) + + +### Features + +* FaceTheory ISR support (FT-T0..FT-T4) ([66eb65a](https://github.com/theory-cloud/TableTheory/commit/66eb65af9b253946c53f4f88af9dc10668e6d5bd)) +* FT-T1 lease helper (Go) ([bb6860a](https://github.com/theory-cloud/TableTheory/commit/bb6860a46380f1227a067dd87ea9fd0d8c4f4f34)) +* FT-T1 lease helper (Py) ([71b1c01](https://github.com/theory-cloud/TableTheory/commit/71b1c01ca9d8347e8443627c023d698f4ce4d34b)) +* FT-T1 lease helper (TS) ([b04dbb1](https://github.com/theory-cloud/TableTheory/commit/b04dbb17cc72b25d0ba328680170b64fcc37efc6)) +* **mocks:** add transaction builder mock ([ea39672](https://github.com/theory-cloud/TableTheory/commit/ea39672edffd22bf24b1471e244c14b79f06211d)) +* **mocks:** add transaction builder mock ([16ab5a5](https://github.com/theory-cloud/TableTheory/commit/16ab5a5d7b22d1087973f96a5c69b2c3a3796c3e)) + + +### Bug Fixes + +* address security/quality findings ([3b56fb4](https://github.com/theory-cloud/TableTheory/commit/3b56fb4986d2a0e93ced5c682caa2fd401a62087)) +* address security/quality findings ([f7adaf7](https://github.com/theory-cloud/TableTheory/commit/f7adaf79d1e7248d3b2654f5c82b33f79cc6e4ac)) +* **ci:** make release assets immutable ([1ef4aca](https://github.com/theory-cloud/TableTheory/commit/1ef4aca7bbd6ef6fffe9a86b9f33b1c0c28e1e97)) +* **ci:** make release assets immutable ([e9ad219](https://github.com/theory-cloud/TableTheory/commit/e9ad219f7d806e8faff7422c45ea2b1f066e3904)) +* **ci:** retry git fetch in branch-version-sync ([1b4d855](https://github.com/theory-cloud/TableTheory/commit/1b4d8557fe66c5c333846469369d9e5285cc1232)) +* improved transaction handling ([30a5d7a](https://github.com/theory-cloud/TableTheory/commit/30a5d7acc371cbcbd38bee1d240e5eab24d49882)) +* **mocks:** satisfy lint gates ([a9cd117](https://github.com/theory-cloud/TableTheory/commit/a9cd1170fc200489369b76f098635321ed3d81c0)) +* **premain:** restore prerelease version alignment ([9b07cdb](https://github.com/theory-cloud/TableTheory/commit/9b07cdb7df5e69be8012374f742d89252ffde942)) +* **security:** harden API key hashing ([2c47b6c](https://github.com/theory-cloud/TableTheory/commit/2c47b6c7dac1084b66448f81dc3d49ce4e4114e0)) + ## [1.2.0](https://github.com/theory-cloud/TableTheory/compare/v1.1.5...v1.2.0) (2026-01-22) diff --git a/docs/README.md b/docs/README.md index 21a04ea..576d3fb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -20,6 +20,10 @@ - [API Reference (Go)](./api-reference.md) – Go SDK public API (DB, Query, Transactions) - [Core Patterns (Go)](./core-patterns.md) – Go SDK canonical usage patterns (Lambda, Batch, Streams) +- [FaceTheory ISR Cache Schema](./facetheory/isr-cache-schema.md) – Recommended cache metadata + regeneration lease item shapes +- [FaceTheory ISR Transaction Recipes](./facetheory/isr-transaction-recipes.md) – Correctness-first patterns for publishing metadata under lease +- [FaceTheory TTL Cache Patterns](./facetheory/ttl-cache-patterns.md) – Freshness vs expiry, TTL lag, and operational guidance for ISR tables +- [FaceTheory ISR Idempotency Patterns](./facetheory/isr-idempotency.md) – Request-id driven regeneration guidance and replay safety - [Development Guidelines](./development-guidelines.md) – Repo-wide coding standards (Go + TS + Python) - [Testing Guide](./testing-guide.md) – Repo-wide testing strategy (Go + TS + Python) - [Troubleshooting (Go)](./troubleshooting.md) – Go SDK troubleshooting (TypeScript/Python have their own) diff --git a/docs/development/planning/README.md b/docs/development/planning/README.md index 1d64d34..b5259f6 100644 --- a/docs/development/planning/README.md +++ b/docs/development/planning/README.md @@ -13,6 +13,7 @@ Start here: - `docs/development/planning/theorydb-10of10-rubric.md` (the definition of “good”; versioned) - `docs/development/planning/theorydb-10of10-roadmap.md` (milestones mapped to rubric IDs) - `docs/development/planning/theorydb-multilang-roadmap.md` (multi-language expansion plan; TypeScript first) +- `docs/development/planning/theorydb-facetheory-support-roadmap.md` (FaceTheory enablement: ISR locks + cache metadata) - `docs/development/planning/theorydb-spec-dms-v0.1.md` (language-agnostic schema + semantics contract, draft) - `docs/development/planning/theorydb-go-ts-parity-matrix.md` (feature parity tiers for TypeScript) - `docs/development/planning/theorydb-multilang-feature-parity-matrix.md` (feature parity across Go/TS/Py) diff --git a/docs/development/planning/theorydb-facetheory-support-roadmap.md b/docs/development/planning/theorydb-facetheory-support-roadmap.md new file mode 100644 index 0000000..d08a610 --- /dev/null +++ b/docs/development/planning/theorydb-facetheory-support-roadmap.md @@ -0,0 +1,113 @@ +# TableTheory: FaceTheory Enablement Roadmap (ISR Locks + Cache Metadata) + +This roadmap captures what TableTheory should provide so **FaceTheory** can implement **SSG + ISR** on AWS with +correctness-first semantics and without per-app “glue code” re-implementations. + +Source document: `FaceTheory/docs/WISHLIST.md` (TableTheory wishlist section). + +## Baseline (current) + +As of TableTheory `v1.1.5`: + +- TableTheory has strong DynamoDB primitives (conditional writes, transactions) and a contract-test posture. +- TTL encoding is covered by the contract suite (`ttl` as epoch seconds) and is safe to build cache tables around. +- Documentation already contains **idempotency/locking patterns** (see `docs/cdk/README.md`), but these patterns are not + yet packaged as small, canonical, multi-language helpers. + +## Goals + +- Provide a **canonical** and **portable** implementation for ISR regeneration locks (lease/lock). +- Provide safe “transaction recipes” for cache metadata updates (pointer swap, freshness, etag) that apps can copy or call. +- Provide TTL-first cache schema guidance so teams don’t invent incompatible models. +- Keep TableTheory’s supply-chain posture: **GitHub releases only** for TS/Py assets; no registry tokens. + +## Non-goals (initially) + +- A full “FaceTheory cache framework” inside TableTheory (FaceTheory owns ISR policies and rendering orchestration). +- Storing full HTML in DynamoDB (FaceTheory stores bodies in S3; DynamoDB is metadata + locks only). + +## Milestones + +### FT-T0 — ISR cache schema guidance (docs + recommended model shapes) + +**Goal:** standardize the DynamoDB model shape used for ISR cache metadata and regeneration locks. + +**Acceptance criteria** +- A doc exists describing recommended roles for a “FaceTheory cache metadata” table: + - key design (cache key partitioning, tenant partitioning guidance) + - metadata attributes (S3 object key pointer, `generated_at`, `revalidate_seconds`, `etag`, etc) + - TTL strategy and pitfalls (TTL lag, safety buffer) +- The doc includes at least one runnable model definition per language (Go struct, TS type, Py dataclass/pydantic) showing: + - canonical tags/roles (`pk`, optional `sk`, `ttl`) + - attribute naming conventions +- Guidance explicitly calls out where transactions are required vs optional. + +--- + +### FT-T1 — Lease/lock helper (canonical, multi-language) + +**Goal:** ship a small helper for distributed regeneration locks with safe defaults and predictable failure modes. + +**Wishlist mapping:** “Lease/lock helper patterns” (FaceTheory P0). + +**Acceptance criteria** +- Go/TS/Py each expose a small, focused lease API (names TBD) that supports: + - acquire via conditional write with lease expiry + - optional refresh/extend + - best-effort release +- The helper is safe-by-default: + - lease ownership token required for refresh/release + - time/clock source is injectable for deterministic tests + - failures are typed enough for callers to distinguish “lock held” vs “unexpected error” +- Tests exist: + - unit tests for condition-expression correctness and edge cases + - DynamoDB Local integration test proving two contenders behave correctly + +--- + +### FT-T2 — Transactional update recipes (metadata + pointer swap) + +**Goal:** make it easy to implement “update metadata + update pointer” safely under concurrent regeneration. + +**Wishlist mapping:** “Transactional update recipes” (FaceTheory P0). + +**Acceptance criteria** +- A doc (and optionally helper functions) exists that covers: + - transactional update for (lock held) → write new metadata pointer → release + - guarding against stale writers (lease token check, version check, or conditional expressions) + - etag update patterns +- If helpers are added, they exist in Go/TS/Py with equivalent semantics and tests. + +--- + +### FT-T3 — TTL-first cache table patterns (high-leverage docs + examples) + +**Goal:** reduce schema drift and operational pitfalls for TTL-based cache metadata tables. + +**Wishlist mapping:** “TTL-first cache schema guidance” (FaceTheory P1). + +**Acceptance criteria** +- Guidance covers: + - selecting TTL attribute and TTL window strategies + - how to represent “freshness” vs “expiry” + - dealing with DynamoDB TTL deletion lag + - operational recommendations (alarms, hot partitions, sizing) +- At least one example exists demonstrating: + - writing metadata with TTL + - reading and deciding fresh/stale based on `generated_at` and `revalidate_seconds` + +--- + +### FT-T4 — Idempotency patterns (request-id driven regeneration) + +**Goal:** standardize “exactly-once-ish” regeneration and async revalidation triggers. + +**Wishlist mapping:** “Idempotency patterns” (FaceTheory P1). + +**Acceptance criteria** +- A doc exists that maps regeneration operations to idempotency keys (request ID, cache key + version, etc). +- The doc includes guidance for: + - replay safety (same request key, different payload) + - lock + idempotency interplay (avoid double work) +- If helpers are added, they build on existing TableTheory patterns and are multi-language with tests. + diff --git a/docs/facetheory/isr-cache-schema.md b/docs/facetheory/isr-cache-schema.md new file mode 100644 index 0000000..25ce9b1 --- /dev/null +++ b/docs/facetheory/isr-cache-schema.md @@ -0,0 +1,235 @@ +# FaceTheory ISR Cache Schema (TableTheory-Compatible) + +This document defines a **recommended DynamoDB item shape** for FaceTheory’s ISR cache metadata and regeneration locks. +The intent is to standardize schema and semantics so multiple services (Go/TypeScript/Python) can interoperate without +re-inventing lock + metadata patterns. + +## Recommended table shape + +Use **one DynamoDB table** that stores two item types per cache key: + +- **Metadata item**: `sk = "META"` +- **Lease/lock item**: `sk = "LOCK"` + +This keeps all ISR-related coordination for a cache key in a single partition, while still allowing clean separation +between “what was generated” (metadata) and “who is regenerating” (lease). + +## Key design + +### Partition key (`pk`) + +`pk` MUST uniquely identify the cache entry (and tenant/site if applicable). + +Recommended format (examples): + +- single-tenant: `pk = "CACHE#"` +- multi-tenant: `pk = "TENANT##CACHE#"` + +Notes: + +- Prefer a **stable hash** of long/variable URL-like keys (e.g., sha256 hex) to keep the partition key short and avoid + surprising hot partitions caused by common prefixes. +- Keep tenant/site identifiers at the front so operational queries can still group by tenant when needed. + +### Sort key (`sk`) + +Use constant sort keys to model item roles: + +- `META` for cache metadata +- `LOCK` for regeneration lease state + +## Metadata item shape (`sk = "META"`) + +Recommended attributes: + +- `s3_key` (string): S3 object key for the generated body (HTML/JSON/etc). +- `generated_at` (number, epoch seconds): when the body was generated. +- `revalidate_seconds` (number): ISR interval (freshness window). +- `etag` (string, optional): strong or weak ETag for clients/CDN. +- `ttl` (number, epoch seconds, optional): best-effort GC horizon for old metadata. + +Freshness rule (correctness boundary): + +- **Fresh/stale is determined by** `generated_at + revalidate_seconds`, not by DynamoDB TTL. + +## Lease item shape (`sk = "LOCK"`) + +Recommended attributes: + +- `lease_token` (string): random token identifying the lock owner (required for refresh/release). +- `lease_expires_at` (number, epoch seconds): lease expiry time. +- `ttl` (number, epoch seconds, optional): best-effort GC horizon for lock rows (>= `lease_expires_at` + buffer). + +Lock rule (correctness boundary): + +- A lease is considered **held** iff `lease_expires_at > now`. + +## TTL strategy (do’s and don’ts) + +- ✅ Use DynamoDB TTL as **garbage collection**, not as a correctness boundary (TTL deletion has lag). +- ✅ Use a safety buffer for TTL (minutes to hours) so readers don’t depend on exact deletion timing. +- ❌ Do not treat “item missing” as meaning “never existed”; TTL may delete later than expected. + +## Transactions: required vs optional + +- **No transaction required**: + - acquire lease (`PutItem`/`UpdateItem` with a condition expression) + - refresh lease (`UpdateItem` with `lease_token` check) + - release lease (best-effort delete or conditional update) + - read metadata (`GetItem`) +- **Use a transaction (recommended)** when a single regeneration needs to update **multiple items atomically**, e.g.: + - write new metadata + release lease as one atomic step + - pointer-swap designs where you write a new version item and then update the “current” pointer + +(See `docs/facetheory/isr-transaction-recipes.md` once FT-T2 lands.) + +## Runnable model definitions + +### Go (struct tags) + +These examples use `snake_case` DynamoDB attribute names. The Go structs include `theorydb:"naming:snake_case"` so +TableTheory accepts underscores in `attr:` overrides. + +```go +package models + +import "os" + +type FaceTheoryCacheMetadata struct { + _ struct{} `theorydb:"naming:snake_case"` + + PK string `theorydb:"pk,attr:pk" json:"pk"` + SK string `theorydb:"sk,attr:sk" json:"sk"` + + S3Key string `theorydb:"attr:s3_key" json:"s3_key"` + GeneratedAt int64 `theorydb:"attr:generated_at" json:"generated_at"` + RevalidateSeconds int64 `theorydb:"attr:revalidate_seconds" json:"revalidate_seconds"` + ETag string `theorydb:"attr:etag,omitempty" json:"etag,omitempty"` + + TTL int64 `theorydb:"ttl,attr:ttl,omitempty" json:"ttl,omitempty"` +} + +func (FaceTheoryCacheMetadata) TableName() string { + return os.Getenv("FACETHEORY_CACHE_TABLE_NAME") +} +``` + +```go +package models + +import "os" + +type FaceTheoryCacheLease struct { + _ struct{} `theorydb:"naming:snake_case"` + + PK string `theorydb:"pk,attr:pk" json:"pk"` + SK string `theorydb:"sk,attr:sk" json:"sk"` + + LeaseToken string `theorydb:"attr:lease_token" json:"lease_token"` + LeaseExpiresAt int64 `theorydb:"attr:lease_expires_at" json:"lease_expires_at"` + + TTL int64 `theorydb:"ttl,attr:ttl,omitempty" json:"ttl,omitempty"` +} + +func (FaceTheoryCacheLease) TableName() string { + return os.Getenv("FACETHEORY_CACHE_TABLE_NAME") +} +``` + +### TypeScript (`defineModel`) + +```ts +import { defineModel } from '@theory-cloud/tabletheory-ts'; + +export const faceTheoryCacheMetadataModel = defineModel({ + name: 'FaceTheoryCacheMetadata', + table: { name: process.env.FACETHEORY_CACHE_TABLE_NAME! }, + keys: { + partition: { attribute: 'pk', type: 'S' }, + sort: { attribute: 'sk', type: 'S' }, + }, + attributes: [ + { attribute: 'pk', type: 'S', roles: ['pk'] }, + { attribute: 'sk', type: 'S', roles: ['sk'] }, + { attribute: 's3_key', type: 'S', required: true }, + { attribute: 'generated_at', type: 'N', required: true }, + { attribute: 'revalidate_seconds', type: 'N', required: true }, + { attribute: 'etag', type: 'S', optional: true }, + { attribute: 'ttl', type: 'N', roles: ['ttl'], optional: true }, + ], +}); +``` + +```ts +import { defineModel } from '@theory-cloud/tabletheory-ts'; + +export const faceTheoryCacheLeaseModel = defineModel({ + name: 'FaceTheoryCacheLease', + table: { name: process.env.FACETHEORY_CACHE_TABLE_NAME! }, + keys: { + partition: { attribute: 'pk', type: 'S' }, + sort: { attribute: 'sk', type: 'S' }, + }, + attributes: [ + { attribute: 'pk', type: 'S', roles: ['pk'] }, + { attribute: 'sk', type: 'S', roles: ['sk'] }, + { attribute: 'lease_token', type: 'S', required: true }, + { attribute: 'lease_expires_at', type: 'N', required: true }, + { attribute: 'ttl', type: 'N', roles: ['ttl'], optional: true }, + ], +}); +``` + +### Python (dataclass + `ModelDefinition`) + +```py +from __future__ import annotations + +import os +from dataclasses import dataclass + +from theorydb_py.model import ModelDefinition, theorydb_field + + +@dataclass +class FaceTheoryCacheMetadata: + pk: str = theorydb_field(roles=["pk"], name="pk") + sk: str = theorydb_field(roles=["sk"], name="sk") + + s3_key: str = theorydb_field(name="s3_key") + generated_at: int = theorydb_field(name="generated_at") + revalidate_seconds: int = theorydb_field(name="revalidate_seconds") + etag: str | None = theorydb_field(name="etag", omitempty=True, default=None) + ttl: int | None = theorydb_field(roles=["ttl"], name="ttl", omitempty=True, default=None) + + +FACE_THEORY_CACHE_METADATA = ModelDefinition.from_dataclass( + FaceTheoryCacheMetadata, + table_name=os.environ["FACETHEORY_CACHE_TABLE_NAME"], +) +``` + +```py +from __future__ import annotations + +import os +from dataclasses import dataclass + +from theorydb_py.model import ModelDefinition, theorydb_field + + +@dataclass +class FaceTheoryCacheLease: + pk: str = theorydb_field(roles=["pk"], name="pk") + sk: str = theorydb_field(roles=["sk"], name="sk") + + lease_token: str = theorydb_field(name="lease_token") + lease_expires_at: int = theorydb_field(name="lease_expires_at") + ttl: int | None = theorydb_field(roles=["ttl"], name="ttl", omitempty=True, default=None) + + +FACE_THEORY_CACHE_LEASE = ModelDefinition.from_dataclass( + FaceTheoryCacheLease, + table_name=os.environ["FACETHEORY_CACHE_TABLE_NAME"], +) +``` diff --git a/docs/facetheory/isr-idempotency.md b/docs/facetheory/isr-idempotency.md new file mode 100644 index 0000000..52055ed --- /dev/null +++ b/docs/facetheory/isr-idempotency.md @@ -0,0 +1,76 @@ +# FaceTheory ISR Idempotency Patterns (Request-ID Driven Regeneration) + +This document describes “exactly-once-ish” regeneration patterns for ISR by combining: + +- an ISR lease/lock (`sk = "LOCK"`) and +- a request-scoped idempotency record (`sk = "REQ#"`) + +The goal is to prevent double work (and stale overwrites) when requests, queues, or retries re-trigger regeneration. + +## Choose an idempotency key + +Your idempotency key MUST uniquely represent one regeneration intent. + +Common choices: + +- **Request ID**: `idempotency_key = ` (HTTP-triggered) +- **Queue message ID**: `idempotency_key = ` (async-triggered) +- **Cache key + version**: `idempotency_key = hash(||)` + +Recommendation: include a deployment identifier or policy version when ISR behavior changes (so old retries don’t collide +with new semantics). + +## Model the idempotency record + +Recommended attributes on `REQ#...` items: + +- `request_hash` (string): hash of the inputs that matter (tenant, cache key, policy knobs). +- `status` (string): `STARTED` / `COMPLETED` / `FAILED`. +- `result_s3_key` (string, optional): pointer to the generated output. +- `ttl` (epoch seconds): GC horizon for idempotency records. + +## Replay safety: same key, different payload + +If the same `idempotency_key` is replayed with different inputs, you MUST treat it as an error. + +Pattern: + +1. Compute `request_hash` deterministically from the important inputs. +2. On first attempt, create the idempotency record with `request_hash`. +3. On retries, read the record and verify the stored `request_hash` matches: + - if it matches and status is `COMPLETED`, return the recorded result + - if it matches and status is `STARTED`, avoid duplicate work (either wait or serve stale) + - if it does not match, fail closed (inputs are inconsistent) + +## Lock + idempotency interplay (avoid double work) + +Idempotency records prevent duplicate work across retries; the lock prevents concurrent regeneration for a cache key. + +A safe ordering is: + +1. Read `META`: + - if fresh, serve and return (no lock, no idempotency needed) +2. Create (or read) the idempotency record: + - `PutItem` guarded by `attribute_not_exists(pk)` on `REQ#...` + - if it already exists: + - if `COMPLETED`, short-circuit and return the recorded result + - if `STARTED`, short-circuit to “already in progress” +3. Acquire the lease (`LOCK`). +4. Regenerate and write body to S3. +5. Publish `META` and finalize idempotency in one transaction: + - `ConditionCheck` the lease token (still owned + not expired) + - `Put/Update` `META` (new pointer, etag, generated_at) + - `Update` the idempotency record to `COMPLETED` with `result_s3_key` + - `Delete` the lease (best-effort) + +This prevents: + +- two workers doing the same regeneration work (idempotency record) +- a stale worker overwriting newer metadata (lease token checks in the publish transaction) + +## Notes + +- Always TTL idempotency records; they are coordination state, not permanent history. +- Keep idempotency rows in the same table/partition as the cache key when possible (simpler transactions). +- If your “in progress” behavior is to serve stale content, do so without waiting on the lock holder. + diff --git a/docs/facetheory/isr-transaction-recipes.md b/docs/facetheory/isr-transaction-recipes.md new file mode 100644 index 0000000..f509cd6 --- /dev/null +++ b/docs/facetheory/isr-transaction-recipes.md @@ -0,0 +1,171 @@ +# FaceTheory ISR Transaction Recipes (Metadata + Pointer Swap) + +This document describes **correctness-first** transaction patterns for ISR regeneration when using: + +- a **metadata item** (`sk = "META"`) and +- a **lease item** (`sk = "LOCK"`) + +as defined in `docs/facetheory/isr-cache-schema.md`. + +The goal is to ensure: + +- only the current lock holder can publish new metadata +- stale writers (expired lock, stolen lock) cannot overwrite newer state +- releasing the lock is tied to publishing metadata (where appropriate) + +## Recipe A: In-place metadata publish (single `META` row) + +Use this when you only need one metadata record per cache key. + +### Steps + +1. Acquire the lease (`LOCK`) for `pk`. +2. Regenerate the body and write it to S3. +3. Publish metadata and release the lock with a single DynamoDB transaction: + - `ConditionCheck` the lease row (`lease_token` matches AND `lease_expires_at > now`) + - `Put` the metadata row (`META`) with the new pointer (`s3_key`), timestamps, etag, ttl + - `Delete` the lease row (optionally conditioned on `lease_token`) + +### Why the transaction matters + +Without the transaction, a writer can: + +- finish regeneration after its lease expires, and +- overwrite the metadata even though another contender acquired the lease. + +The transaction makes publishing contingent on still owning the lease. + +## Recipe B: Pointer swap (versioned metadata rows) + +Use this when you want history, safer rollbacks, or deduping/observability across generations. + +Recommended item roles (same `pk`): + +- pointer row: `sk = "META"` with `current_sk = "VER#"` +- version rows: `sk = "VER#"` containing `s3_key`, `generated_at`, `etag`, etc. + +Transaction sketch: + +1. `ConditionCheck` the lease row (token + not expired). +2. `Put` the new version row (`VER#...`) guarded with `attribute_not_exists(pk)` to avoid duplicate IDs. +3. `Update` the pointer row (`META`) to set `current_sk` to the new version (optionally guard with optimistic `version`). +4. `Delete` the lease row (optionally conditioned on token). + +## Stale-writer protection options + +You can guard against stale writers in one (or multiple) of these ways: + +- **Lease token check (recommended)**: the lease row stores `lease_token` and transactions require it. +- **Lease expiry check**: require `lease_expires_at > now` in the transaction. +- **Optimistic version check**: add a `version` field to the pointer row and require it matches before updating. +- **Monotonic timestamp check**: conditionally update only if `generated_at < :new_generated_at`. + +## ETag update patterns + +Recommended: publish `etag` alongside the pointer update. + +- In Recipe A, store `etag` directly on `META` and overwrite it atomically in the transaction. +- In Recipe B, store `etag` on the version row and optionally denormalize the current `etag` onto the pointer row for + faster reads. + +Stale-writer guard: if you rely on monotonicity, require `generated_at < :new_generated_at` before setting a new `etag`. + +## Multi-language examples + +These examples focus on the **transaction shape**; model definitions are in `docs/facetheory/isr-cache-schema.md`. + +### Go (TableTheory transaction builder) + +```go +ctx := context.Background() +nowUnix := time.Now().Unix() + +leaseItem := &models.FaceTheoryCacheLease{ + PK: "TENANT#t1#CACHE#abc", + SK: "LOCK", +} + +metaItem := &models.FaceTheoryCacheMetadata{ + PK: leaseItem.PK, + SK: "META", + S3Key: "s3://bucket/key.html", + GeneratedAt: nowUnix, + RevalidateSeconds: 60, + ETag: "\"abc123\"", + TTL: nowUnix + 86400, +} + +err := db.TransactWrite(ctx, func(tx core.TransactionBuilder) error { + tx.ConditionCheck( + leaseItem, + tabletheory.Condition("lease_token", "=", leaseToken), + tabletheory.Condition("lease_expires_at", ">", nowUnix), + ) + + tx.Put(metaItem) + + // Optional: make the delete conditional on the same token. + tx.Delete(leaseItem, tabletheory.Condition("lease_token", "=", leaseToken)) + return nil +}) +``` + +### TypeScript (`TheorydbClient.transactWrite`) + +```ts +await client.transactWrite([ + { + kind: 'condition', + model: 'FaceTheoryCacheLease', + key: { pk, sk: 'LOCK' }, + conditionExpression: '#tok = :tok AND #exp > :now', + expressionAttributeNames: { '#tok': 'lease_token', '#exp': 'lease_expires_at' }, + expressionAttributeValues: { + ':tok': { S: leaseToken }, + ':now': { N: String(nowUnix) }, + }, + }, + { + kind: 'put', + model: 'FaceTheoryCacheMetadata', + item: { pk, sk: 'META', s3_key, generated_at: nowUnix, revalidate_seconds, etag, ttl }, + }, + { + kind: 'delete', + model: 'FaceTheoryCacheLease', + key: { pk, sk: 'LOCK' }, + }, +]); +``` + +### Python (`Table.transact_write`) + +```py +table.transact_write( + [ + TransactConditionCheck( + pk=pk, + sk="LOCK", + condition_expression="#tok = :tok AND #exp > :now", + expression_attribute_names={"#tok": "lease_token", "#exp": "lease_expires_at"}, + expression_attribute_values={":tok": lease_token, ":now": now_unix}, + ), + TransactPut( + item=FaceTheoryCacheMetadata( + pk=pk, + sk="META", + s3_key=s3_key, + generated_at=now_unix, + revalidate_seconds=revalidate_seconds, + etag=etag, + ttl=ttl, + ), + ), + TransactDelete( + pk=pk, + sk="LOCK", + ), + ] +) +``` + diff --git a/docs/facetheory/ttl-cache-patterns.md b/docs/facetheory/ttl-cache-patterns.md new file mode 100644 index 0000000..d96e49a --- /dev/null +++ b/docs/facetheory/ttl-cache-patterns.md @@ -0,0 +1,103 @@ +# TTL-First Cache Table Patterns (FaceTheory ISR) + +This document describes operationally safe patterns for TTL-based cache metadata tables used for ISR: + +- how to choose a TTL attribute and window +- how to separate “freshness” from “expiry” +- how to account for DynamoDB TTL deletion lag +- operational recommendations to avoid surprises in production + +Schema context: `docs/facetheory/isr-cache-schema.md`. + +## Choose a TTL attribute + +Recommended: + +- Use a single numeric attribute named `ttl` that stores **epoch seconds**. +- Treat it as **garbage collection**, not as correctness logic. + +Why: + +- TTL is asynchronous and can delete later than the timestamp you set. +- A stable `ttl` attribute name keeps Go/TypeScript/Python models aligned. + +## Freshness vs expiry (two clocks) + +Use two separate concepts: + +- **Freshness window** (correctness boundary): + - `fresh_until = generated_at + revalidate_seconds` +- **Expiry / GC horizon** (storage boundary): + - `ttl = generated_at + retention_seconds (+ safety_buffer)` + +Rules of thumb: + +- `revalidate_seconds` is usually **minutes to hours**. +- `retention_seconds` is usually **days to weeks** (enough for debuggability and rollback). +- `ttl` SHOULD be **much larger** than `revalidate_seconds`. + +## DynamoDB TTL deletion lag + +TTL deletion is best-effort; items can remain after their `ttl` passes. + +Implications: + +- ✅ Treat `ttl` as “eligible for deletion”, not “will be deleted immediately”. +- ✅ Readers MUST decide fresh/stale based on `generated_at` and `revalidate_seconds`. +- ❌ Never treat “item missing” as “item never existed”; it may have been deleted earlier or later than expected. + +## Operational recommendations + +- **Hot partitions**: avoid putting raw URL paths directly in `pk`; use a stable hash to reduce common-prefix hotspots. +- **Capacity**: for ISR tables, on-demand billing is often simplest; watch `ThrottledRequests` and adjust. +- **Alarms**: alert on read/write throttles, elevated `UserErrors`, and high latency; consider tracking + `TimeToLiveDeletedItemCount` as a sanity signal (not a correctness signal). +- **Retention**: choose a retention long enough for investigations (and long enough to survive TTL lag), but not so long + that storage cost grows unbounded. + +## Example: write metadata with TTL, then read and decide fresh/stale + +This example uses the `META` row pattern (`sk = "META"`). + +### Write + +1. Compute `generated_at = now_unix`. +2. Set `ttl = generated_at + retention_seconds`. +3. Store `revalidate_seconds` alongside the metadata. + +### Read + +1. Fetch the `META` item by primary key. +2. Compute `fresh_until = generated_at + revalidate_seconds`. +3. Consider it fresh iff `now_unix < fresh_until`. + +### Go example + +```go +nowUnix := time.Now().Unix() + +meta := &FaceTheoryCacheMetadata{ + PK: "TENANT#t1#CACHE#abc", + SK: "META", + S3Key: "pages/t1/abc.html", + GeneratedAt: nowUnix, + RevalidateSeconds: 60, // freshness window + TTL: nowUnix + 86400, // GC horizon (1 day) +} + +if err := db.Model(meta).CreateOrUpdate(); err != nil { + return err +} + +var got FaceTheoryCacheMetadata +if err := db.Model(&FaceTheoryCacheMetadata{}). + Where("PK", "=", meta.PK). + Where("SK", "=", "META"). + First(&got); err != nil { + return err +} + +freshUntil := got.GeneratedAt + got.RevalidateSeconds +isFresh := time.Now().Unix() < freshUntil +``` + diff --git a/examples/facetheory_ttl_cache_example.go b/examples/facetheory_ttl_cache_example.go new file mode 100644 index 0000000..12bffed --- /dev/null +++ b/examples/facetheory_ttl_cache_example.go @@ -0,0 +1,43 @@ +package examples + +import ( + "os" + "time" +) + +// FaceTheoryCacheMetadata is a minimal metadata row shape for ISR cache tables. +// +// See: docs/facetheory/isr-cache-schema.md +type FaceTheoryCacheMetadata struct { + _ struct{} `theorydb:"naming:snake_case"` + + PK string `theorydb:"pk,attr:pk" json:"pk"` + SK string `theorydb:"sk,attr:sk" json:"sk"` + + S3Key string `theorydb:"attr:s3_key" json:"s3_key"` + GeneratedAt int64 `theorydb:"attr:generated_at" json:"generated_at"` + RevalidateSeconds int64 `theorydb:"attr:revalidate_seconds" json:"revalidate_seconds"` + ETag string `theorydb:"attr:etag,omitempty" json:"etag,omitempty"` + + TTL int64 `theorydb:"ttl,attr:ttl,omitempty" json:"ttl,omitempty"` +} + +func (FaceTheoryCacheMetadata) TableName() string { + return os.Getenv("FACETHEORY_CACHE_TABLE_NAME") +} + +// FaceTheoryFreshUntilUnix returns the freshness boundary for ISR (not DynamoDB TTL). +func FaceTheoryFreshUntilUnix(meta FaceTheoryCacheMetadata) int64 { + return meta.GeneratedAt + meta.RevalidateSeconds +} + +// FaceTheoryIsFresh reports whether the metadata is within its ISR freshness window. +func FaceTheoryIsFresh(meta FaceTheoryCacheMetadata, now time.Time) bool { + return now.Unix() < FaceTheoryFreshUntilUnix(meta) +} + +// FaceTheoryTTLUnix returns a TTL epoch-seconds value suitable for DynamoDB TTL. +// This is a garbage-collection horizon, not a freshness boundary. +func FaceTheoryTTLUnix(generatedAt int64, retention time.Duration) int64 { + return generatedAt + int64(retention.Seconds()) +} diff --git a/hgm-infra/evidence/hgm-rubric-report.json b/hgm-infra/evidence/hgm-rubric-report.json index 9d5a175..619cf47 100644 --- a/hgm-infra/evidence/hgm-rubric-report.json +++ b/hgm-infra/evidence/hgm-rubric-report.json @@ -1,7 +1,7 @@ { "$schema": "https://hgm.pai.dev/schemas/hgm-rubric-report.schema.json", "schemaVersion": 1, - "timestamp": "2026-01-19T05:20:17Z", + "timestamp": "2026-01-23T01:42:09Z", "pack": { "version": "816465a1618d", "digest": "896aed16549928f21626fb4effe9bb6236fc60292a8f50bae8ce77e873ac775b" diff --git a/pkg/lease/errors.go b/pkg/lease/errors.go new file mode 100644 index 0000000..25598b8 --- /dev/null +++ b/pkg/lease/errors.go @@ -0,0 +1,33 @@ +package lease + +import "errors" + +// LeaseHeldError indicates a lease could not be acquired because it is held by another contender. +// This is the "expected" failure mode for lock contention. +type LeaseHeldError struct { + Key Key +} + +func (e *LeaseHeldError) Error() string { + return "lease held" +} + +// LeaseNotOwnedError indicates a refresh attempt failed because the caller's token no longer owns the lease. +// This can happen if the lease expired and was acquired by another contender, or if the token is wrong. +type LeaseNotOwnedError struct { + Key Key +} + +func (e *LeaseNotOwnedError) Error() string { + return "lease not owned" +} + +func IsLeaseHeld(err error) bool { + var target *LeaseHeldError + return errors.As(err, &target) +} + +func IsLeaseNotOwned(err error) bool { + var target *LeaseNotOwnedError + return errors.As(err, &target) +} diff --git a/pkg/lease/lease.go b/pkg/lease/lease.go new file mode 100644 index 0000000..cf078b4 --- /dev/null +++ b/pkg/lease/lease.go @@ -0,0 +1,373 @@ +// Package lease provides a small, correctness-first DynamoDB lease/lock helper. +// +// It is designed for ISR-style regeneration locks (FaceTheory) and similar distributed coordination needs. +package lease + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/uuid" +) + +type DynamoDBLeaseAPI interface { + PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) + UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) + DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) +} + +type Key struct { + PK string + SK string +} + +type Lease struct { + Key Key + Token string + ExpiresAt int64 +} + +type Manager struct { + client DynamoDBLeaseAPI + + tableName string + + pkAttr string + skAttr string + + tokenAttr string + expiresAtAttr string + ttlAttr string + + now func() time.Time + token func() string + lockSortKey string + ttlBuffer time.Duration + includeTTL bool + validateInput bool +} + +type Option func(*Manager) + +const ( + DefaultPKAttribute = "pk" + DefaultSKAttribute = "sk" + DefaultTokenAttribute = "lease_token" + DefaultExpiresAtAttribute = "lease_expires_at" + DefaultTTLAttribute = "ttl" + DefaultLockSortKey = "LOCK" + DefaultTTLBuffer = time.Hour + defaultValidateInput = true + defaultIncludeTTLAttribute = true +) + +func WithNow(now func() time.Time) Option { + return func(m *Manager) { + if now != nil { + m.now = now + } + } +} + +func WithTokenGenerator(token func() string) Option { + return func(m *Manager) { + if token != nil { + m.token = token + } + } +} + +func WithTTLBuffer(buffer time.Duration) Option { + return func(m *Manager) { + m.ttlBuffer = buffer + } +} + +func WithLockSortKey(lockSortKey string) Option { + return func(m *Manager) { + if lockSortKey != "" { + m.lockSortKey = lockSortKey + } + } +} + +func WithKeyAttributeNames(pkAttr, skAttr string) Option { + return func(m *Manager) { + if pkAttr != "" { + m.pkAttr = pkAttr + } + if skAttr != "" { + m.skAttr = skAttr + } + } +} + +func WithLeaseAttributeNames(tokenAttr, expiresAtAttr, ttlAttr string) Option { + return func(m *Manager) { + if tokenAttr != "" { + m.tokenAttr = tokenAttr + } + if expiresAtAttr != "" { + m.expiresAtAttr = expiresAtAttr + } + if ttlAttr != "" { + m.ttlAttr = ttlAttr + } + } +} + +func WithIncludeTTL(include bool) Option { + return func(m *Manager) { + m.includeTTL = include + } +} + +func WithValidateInput(validate bool) Option { + return func(m *Manager) { + m.validateInput = validate + } +} + +func NewManager(client DynamoDBLeaseAPI, tableName string, opts ...Option) (*Manager, error) { + if client == nil { + return nil, fmt.Errorf("lease manager: client is required") + } + if tableName == "" { + return nil, fmt.Errorf("lease manager: tableName is required") + } + + m := &Manager{ + client: client, + + tableName: tableName, + + pkAttr: DefaultPKAttribute, + skAttr: DefaultSKAttribute, + + tokenAttr: DefaultTokenAttribute, + expiresAtAttr: DefaultExpiresAtAttribute, + ttlAttr: DefaultTTLAttribute, + + now: time.Now, + token: uuid.NewString, + ttlBuffer: DefaultTTLBuffer, + lockSortKey: DefaultLockSortKey, + includeTTL: defaultIncludeTTLAttribute, + validateInput: defaultValidateInput, + } + + for _, opt := range opts { + if opt != nil { + opt(m) + } + } + + return m, nil +} + +func (m *Manager) Acquire(ctx context.Context, pk string, duration time.Duration) (*Lease, error) { + if m == nil { + return nil, fmt.Errorf("lease manager: nil receiver") + } + return m.AcquireKey(ctx, Key{PK: pk, SK: m.lockSortKey}, duration) +} + +func (m *Manager) AcquireKey(ctx context.Context, key Key, duration time.Duration) (*Lease, error) { + if m == nil { + return nil, fmt.Errorf("lease manager: nil receiver") + } + if m.client == nil { + return nil, fmt.Errorf("lease manager: client is nil") + } + + if m.validateInput { + if stringsEmpty(key.PK) || stringsEmpty(key.SK) { + return nil, fmt.Errorf("lease manager: PK and SK are required") + } + if duration <= 0 { + return nil, fmt.Errorf("lease manager: duration must be > 0") + } + } + + now := m.now() + nowUnix := now.Unix() + expiresAt := now.Add(duration).Unix() + token := m.token() + + item := map[string]types.AttributeValue{ + m.pkAttr: &types.AttributeValueMemberS{Value: key.PK}, + m.skAttr: &types.AttributeValueMemberS{Value: key.SK}, + + m.tokenAttr: &types.AttributeValueMemberS{Value: token}, + m.expiresAtAttr: &types.AttributeValueMemberN{Value: strconv.FormatInt(expiresAt, 10)}, + } + + if m.includeTTL && m.ttlBuffer > 0 && m.ttlAttr != "" { + ttl := expiresAt + int64(m.ttlBuffer.Seconds()) + item[m.ttlAttr] = &types.AttributeValueMemberN{Value: strconv.FormatInt(ttl, 10)} + } + + input := &dynamodb.PutItemInput{ + TableName: aws.String(m.tableName), + Item: item, + ConditionExpression: aws.String( + "attribute_not_exists(#pk) OR #lease_expires_at <= :now", + ), + ExpressionAttributeNames: map[string]string{ + "#pk": m.pkAttr, + "#lease_expires_at": m.expiresAtAttr, + }, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":now": &types.AttributeValueMemberN{Value: strconv.FormatInt(nowUnix, 10)}, + }, + } + + _, err := m.client.PutItem(ctx, input) + if err != nil { + if isConditionalCheckFailed(err) { + return nil, &LeaseHeldError{Key: key} + } + return nil, fmt.Errorf("lease manager: acquire failed: %w", err) + } + + return &Lease{ + Key: key, + Token: token, + ExpiresAt: expiresAt, + }, nil +} + +func (m *Manager) Refresh(ctx context.Context, lease Lease, duration time.Duration) (*Lease, error) { + if m == nil { + return nil, fmt.Errorf("lease manager: nil receiver") + } + if m.client == nil { + return nil, fmt.Errorf("lease manager: client is nil") + } + + if m.validateInput { + if stringsEmpty(lease.Key.PK) || stringsEmpty(lease.Key.SK) { + return nil, fmt.Errorf("lease manager: PK and SK are required") + } + if stringsEmpty(lease.Token) { + return nil, fmt.Errorf("lease manager: token is required") + } + if duration <= 0 { + return nil, fmt.Errorf("lease manager: duration must be > 0") + } + } + + now := m.now() + nowUnix := now.Unix() + expiresAt := now.Add(duration).Unix() + + key := map[string]types.AttributeValue{ + m.pkAttr: &types.AttributeValueMemberS{Value: lease.Key.PK}, + m.skAttr: &types.AttributeValueMemberS{Value: lease.Key.SK}, + } + + names := map[string]string{ + "#lease_token": m.tokenAttr, + "#lease_expires_at": m.expiresAtAttr, + } + values := map[string]types.AttributeValue{ + ":token": &types.AttributeValueMemberS{Value: lease.Token}, + ":now": &types.AttributeValueMemberN{Value: strconv.FormatInt(nowUnix, 10)}, + ":exp": &types.AttributeValueMemberN{Value: strconv.FormatInt(expiresAt, 10)}, + } + + updateExpr := "SET #lease_expires_at = :exp" + if m.includeTTL && m.ttlBuffer > 0 && m.ttlAttr != "" { + ttl := expiresAt + int64(m.ttlBuffer.Seconds()) + names["#ttl"] = m.ttlAttr + values[":ttl"] = &types.AttributeValueMemberN{Value: strconv.FormatInt(ttl, 10)} + updateExpr += ", #ttl = :ttl" + } + + input := &dynamodb.UpdateItemInput{ + TableName: aws.String(m.tableName), + Key: key, + UpdateExpression: aws.String( + updateExpr, + ), + ConditionExpression: aws.String( + "#lease_token = :token AND #lease_expires_at > :now", + ), + ExpressionAttributeNames: names, + ExpressionAttributeValues: values, + } + + _, err := m.client.UpdateItem(ctx, input) + if err != nil { + if isConditionalCheckFailed(err) { + return nil, &LeaseNotOwnedError{Key: lease.Key} + } + return nil, fmt.Errorf("lease manager: refresh failed: %w", err) + } + + out := lease + out.ExpiresAt = expiresAt + return &out, nil +} + +func (m *Manager) Release(ctx context.Context, lease Lease) error { + if m == nil { + return fmt.Errorf("lease manager: nil receiver") + } + if m.client == nil { + return fmt.Errorf("lease manager: client is nil") + } + + if m.validateInput { + if stringsEmpty(lease.Key.PK) || stringsEmpty(lease.Key.SK) { + return fmt.Errorf("lease manager: PK and SK are required") + } + if stringsEmpty(lease.Token) { + return fmt.Errorf("lease manager: token is required") + } + } + + key := map[string]types.AttributeValue{ + m.pkAttr: &types.AttributeValueMemberS{Value: lease.Key.PK}, + m.skAttr: &types.AttributeValueMemberS{Value: lease.Key.SK}, + } + + input := &dynamodb.DeleteItemInput{ + TableName: aws.String(m.tableName), + Key: key, + ConditionExpression: aws.String( + "#lease_token = :token", + ), + ExpressionAttributeNames: map[string]string{ + "#lease_token": m.tokenAttr, + }, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":token": &types.AttributeValueMemberS{Value: lease.Token}, + }, + } + + _, err := m.client.DeleteItem(ctx, input) + if err != nil { + if isConditionalCheckFailed(err) { + return nil // best-effort + } + return fmt.Errorf("lease manager: release failed: %w", err) + } + + return nil +} + +func isConditionalCheckFailed(err error) bool { + var cfe *types.ConditionalCheckFailedException + return errors.As(err, &cfe) +} + +func stringsEmpty(s string) bool { + return len(s) == 0 +} diff --git a/pkg/lease/lease_test.go b/pkg/lease/lease_test.go new file mode 100644 index 0000000..68decc8 --- /dev/null +++ b/pkg/lease/lease_test.go @@ -0,0 +1,370 @@ +package lease + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/theory-cloud/tabletheory/pkg/mocks" +) + +func TestManager_Acquire_BuildsConditionalPut(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + fixed := time.Unix(1000, 0) + mgr, err := NewManager( + mockClient, + "tbl", + WithNow(func() time.Time { return fixed }), + WithTokenGenerator(func() string { return "tok" }), + WithTTLBuffer(10*time.Second), + ) + require.NoError(t, err) + + mockClient. + On( + "PutItem", + mock.Anything, + mock.MatchedBy(func(in *dynamodb.PutItemInput) bool { + if in == nil { + return false + } + if in.TableName == nil || *in.TableName != "tbl" { + return false + } + if in.ConditionExpression == nil || *in.ConditionExpression != "attribute_not_exists(#pk) OR #lease_expires_at <= :now" { + return false + } + if in.ExpressionAttributeNames["#pk"] != "pk" { + return false + } + if in.ExpressionAttributeNames["#lease_expires_at"] != "lease_expires_at" { + return false + } + + nowAV, ok := in.ExpressionAttributeValues[":now"].(*types.AttributeValueMemberN) + if !ok || nowAV.Value != "1000" { + return false + } + + pkAV, ok := in.Item["pk"].(*types.AttributeValueMemberS) + if !ok || pkAV.Value != "CACHE#A" { + return false + } + skAV, ok := in.Item["sk"].(*types.AttributeValueMemberS) + if !ok || skAV.Value != DefaultLockSortKey { + return false + } + + tokenAV, ok := in.Item["lease_token"].(*types.AttributeValueMemberS) + if !ok || tokenAV.Value != "tok" { + return false + } + expAV, ok := in.Item["lease_expires_at"].(*types.AttributeValueMemberN) + if !ok || expAV.Value != "1030" { + return false + } + ttlAV, ok := in.Item["ttl"].(*types.AttributeValueMemberN) + if !ok || ttlAV.Value != "1040" { + return false + } + + return true + }), + mock.Anything, + ). + Return(&dynamodb.PutItemOutput{}, nil). + Once() + + lease, err := mgr.Acquire(context.Background(), "CACHE#A", 30*time.Second) + require.NoError(t, err) + require.NotNil(t, lease) + require.Equal(t, "CACHE#A", lease.Key.PK) + require.Equal(t, DefaultLockSortKey, lease.Key.SK) + require.Equal(t, "tok", lease.Token) + require.Equal(t, int64(1030), lease.ExpiresAt) + mockClient.AssertExpectations(t) +} + +func TestManager_Acquire_ReturnsLeaseHeldOnConditionalFailure(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + fixed := time.Unix(1000, 0) + mgr, err := NewManager( + mockClient, + "tbl", + WithNow(func() time.Time { return fixed }), + WithTokenGenerator(func() string { return "tok" }), + ) + require.NoError(t, err) + + mockClient. + On("PutItem", mock.Anything, mock.Anything, mock.Anything). + Return((*dynamodb.PutItemOutput)(nil), &types.ConditionalCheckFailedException{}). + Once() + + lease, err := mgr.Acquire(context.Background(), "CACHE#A", 30*time.Second) + require.Nil(t, lease) + require.Error(t, err) + require.True(t, IsLeaseHeld(err)) + mockClient.AssertExpectations(t) +} + +func TestManager_Refresh_ConditionedOnTokenAndUnexpired(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + fixed := time.Unix(1000, 0) + mgr, err := NewManager( + mockClient, + "tbl", + WithNow(func() time.Time { return fixed }), + WithTTLBuffer(10*time.Second), + ) + require.NoError(t, err) + + mockClient. + On( + "UpdateItem", + mock.Anything, + mock.MatchedBy(func(in *dynamodb.UpdateItemInput) bool { + if in == nil { + return false + } + if in.TableName == nil || *in.TableName != "tbl" { + return false + } + if in.ConditionExpression == nil || *in.ConditionExpression != "#lease_token = :token AND #lease_expires_at > :now" { + return false + } + if in.UpdateExpression == nil || *in.UpdateExpression != "SET #lease_expires_at = :exp, #ttl = :ttl" { + return false + } + + if in.ExpressionAttributeNames["#lease_token"] != "lease_token" { + return false + } + if in.ExpressionAttributeNames["#lease_expires_at"] != "lease_expires_at" { + return false + } + if in.ExpressionAttributeNames["#ttl"] != "ttl" { + return false + } + + tokenAV, ok := in.ExpressionAttributeValues[":token"].(*types.AttributeValueMemberS) + if !ok || tokenAV.Value != "tok" { + return false + } + nowAV, ok := in.ExpressionAttributeValues[":now"].(*types.AttributeValueMemberN) + if !ok || nowAV.Value != "1000" { + return false + } + expAV, ok := in.ExpressionAttributeValues[":exp"].(*types.AttributeValueMemberN) + if !ok || expAV.Value != "1060" { + return false + } + ttlAV, ok := in.ExpressionAttributeValues[":ttl"].(*types.AttributeValueMemberN) + if !ok || ttlAV.Value != "1070" { + return false + } + + return true + }), + mock.Anything, + ). + Return(&dynamodb.UpdateItemOutput{}, nil). + Once() + + out, err := mgr.Refresh( + context.Background(), + Lease{ + Key: Key{PK: "CACHE#A", SK: DefaultLockSortKey}, + Token: "tok", + }, + 60*time.Second, + ) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, int64(1060), out.ExpiresAt) + mockClient.AssertExpectations(t) +} + +func TestManager_Release_IsBestEffortOnConditionalFailure(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + mgr, err := NewManager(mockClient, "tbl") + require.NoError(t, err) + + mockClient. + On("DeleteItem", mock.Anything, mock.Anything, mock.Anything). + Return((*dynamodb.DeleteItemOutput)(nil), &types.ConditionalCheckFailedException{}). + Once() + + err = mgr.Release( + context.Background(), + Lease{ + Key: Key{PK: "CACHE#A", SK: DefaultLockSortKey}, + Token: "tok", + }, + ) + require.NoError(t, err) + mockClient.AssertExpectations(t) +} + +func TestNewManager_ValidatesInputs(t *testing.T) { + _, err := NewManager(nil, "tbl") + require.Error(t, err) + + mockClient := new(mocks.MockDynamoDBClient) + _, err = NewManager(mockClient, "") + require.Error(t, err) +} + +func TestManager_Acquire_RespectsCustomizationOptions(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + fixed := time.Unix(1000, 0) + mgr, err := NewManager( + mockClient, + "tbl", + WithNow(func() time.Time { return fixed }), + WithTokenGenerator(func() string { return "tok" }), + WithLockSortKey("CUSTOM"), + WithKeyAttributeNames("partition", "sort"), + WithLeaseAttributeNames("token", "expires_at", "expires_ttl"), + WithIncludeTTL(false), + ) + require.NoError(t, err) + + mockClient. + On( + "PutItem", + mock.Anything, + mock.MatchedBy(func(in *dynamodb.PutItemInput) bool { + if in == nil || in.TableName == nil || *in.TableName != "tbl" { + return false + } + + pkAV, ok := in.Item["partition"].(*types.AttributeValueMemberS) + if !ok || pkAV.Value != "CACHE#A" { + return false + } + skAV, ok := in.Item["sort"].(*types.AttributeValueMemberS) + if !ok || skAV.Value != "CUSTOM" { + return false + } + + tokenAV, ok := in.Item["token"].(*types.AttributeValueMemberS) + if !ok || tokenAV.Value != "tok" { + return false + } + expAV, ok := in.Item["expires_at"].(*types.AttributeValueMemberN) + if !ok || expAV.Value != "1030" { + return false + } + _, hasTTL := in.Item["expires_ttl"] + if hasTTL { + return false + } + + if in.ConditionExpression == nil || *in.ConditionExpression != "attribute_not_exists(#pk) OR #lease_expires_at <= :now" { + return false + } + if in.ExpressionAttributeNames["#pk"] != "partition" { + return false + } + if in.ExpressionAttributeNames["#lease_expires_at"] != "expires_at" { + return false + } + + nowAV, ok := in.ExpressionAttributeValues[":now"].(*types.AttributeValueMemberN) + if !ok || nowAV.Value != "1000" { + return false + } + + return true + }), + mock.Anything, + ). + Return(&dynamodb.PutItemOutput{}, nil). + Once() + + lease, err := mgr.Acquire(context.Background(), "CACHE#A", 30*time.Second) + require.NoError(t, err) + require.Equal(t, "CUSTOM", lease.Key.SK) + mockClient.AssertExpectations(t) +} + +func TestManager_AcquireKey_ValidationToggle(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + mgr, err := NewManager(mockClient, "tbl", WithValidateInput(true)) + require.NoError(t, err) + _, err = mgr.AcquireKey(context.Background(), Key{PK: "A", SK: "B"}, 0) + require.Error(t, err) + + fixed := time.Unix(1000, 0) + mgr, err = NewManager( + mockClient, + "tbl", + WithNow(func() time.Time { return fixed }), + WithTokenGenerator(func() string { return "tok" }), + WithValidateInput(false), + ) + require.NoError(t, err) + + mockClient. + On("PutItem", mock.Anything, mock.Anything, mock.Anything). + Return(&dynamodb.PutItemOutput{}, nil). + Once() + + lease, err := mgr.AcquireKey(context.Background(), Key{PK: "CACHE#A", SK: "LOCK"}, 0) + require.NoError(t, err) + require.Equal(t, int64(1000), lease.ExpiresAt) + mockClient.AssertExpectations(t) +} + +func TestLeaseErrors_AreTypedAndDetectable(t *testing.T) { + held := &LeaseHeldError{Key: Key{PK: "A", SK: "B"}} + require.Equal(t, "lease held", held.Error()) + require.True(t, IsLeaseHeld(held)) + + notOwned := &LeaseNotOwnedError{Key: Key{PK: "A", SK: "B"}} + require.Equal(t, "lease not owned", notOwned.Error()) + require.True(t, IsLeaseNotOwned(notOwned)) +} + +func TestManager_Refresh_ReturnsNotOwnedOnConditionalFailure(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + mgr, err := NewManager(mockClient, "tbl") + require.NoError(t, err) + + mockClient. + On("UpdateItem", mock.Anything, mock.Anything, mock.Anything). + Return((*dynamodb.UpdateItemOutput)(nil), &types.ConditionalCheckFailedException{}). + Once() + + _, err = mgr.Refresh( + context.Background(), + Lease{Key: Key{PK: "CACHE#A", SK: DefaultLockSortKey}, Token: "tok"}, + 30*time.Second, + ) + require.Error(t, err) + require.True(t, IsLeaseNotOwned(err)) + mockClient.AssertExpectations(t) +} + +func TestManager_Release_ValidatesToken(t *testing.T) { + mockClient := new(mocks.MockDynamoDBClient) + + mgr, err := NewManager(mockClient, "tbl", WithValidateInput(true)) + require.NoError(t, err) + + err = mgr.Release(context.Background(), Lease{Key: Key{PK: "CACHE#A", SK: DefaultLockSortKey}}) + require.Error(t, err) +} diff --git a/py/README.md b/py/README.md index 11f3224..e7e27b2 100644 --- a/py/README.md +++ b/py/README.md @@ -47,6 +47,7 @@ class Note: pk: str = theorydb_field(roles=["pk"]) sk: str = theorydb_field(roles=["sk"]) value: int = theorydb_field() + note: str = theorydb_field(omitempty=True, default="") client = boto3.client( @@ -76,7 +77,7 @@ page2 = table.query("A", cursor=page1.next_cursor) if page1.next_cursor else Non ## Batch + transactions ```python -from theorydb_py import TransactUpdate +from theorydb_py import TransactUpdate, UpdateAdd, UpdateSetIfNotExists table.batch_write(puts=[Note(pk="A", sk="2", value=1)], deletes=[("A", "1")]) @@ -85,10 +86,10 @@ table.transact_write( TransactUpdate( pk="A", sk="2", - updates={"value": 2}, - condition_expression="#v = :expected", + updates={"value": UpdateAdd(1), "note": UpdateSetIfNotExists("first")}, + condition_expression="attribute_not_exists(#v) OR #v < :max_allowed", expression_attribute_names={"#v": "value"}, - expression_attribute_values={":expected": 1}, + expression_attribute_values={":max_allowed": 100}, ) ] ) diff --git a/py/src/theorydb_py/__init__.py b/py/src/theorydb_py/__init__.py index e41e8a6..f1c84d8 100644 --- a/py/src/theorydb_py/__init__.py +++ b/py/src/theorydb_py/__init__.py @@ -10,6 +10,8 @@ BatchRetryExceededError, ConditionFailedError, EncryptionNotConfiguredError, + LeaseHeldError, + LeaseNotOwnedError, NotFoundError, TheorydbPyError, TransactionCanceledError, @@ -33,6 +35,8 @@ TransactPut, TransactUpdate, TransactWriteAction, + UpdateAdd, + UpdateSetIfNotExists, ) if TYPE_CHECKING: @@ -49,6 +53,7 @@ sum_field, ) from .dms import assert_model_definition_equivalent_to_dms, get_dms_model, parse_dms_document + from .lease import Lease, LeaseKey, LeaseManager from .multiaccount import AccountConfig, MultiAccountSessions from .optimizer import QueryOptimizer, QueryPlan, QueryShape, ScanShape from .protection import ConcurrencyLimiter, SimpleLimiter @@ -187,6 +192,10 @@ def __getattr__(name: str) -> Any: from . import validation return getattr(validation, name) + if name in {"Lease", "LeaseKey", "LeaseManager"}: + from . import lease + + return getattr(lease, name) raise AttributeError(name) @@ -201,6 +210,8 @@ def __getattr__(name: str) -> Any: "BatchRetryExceededError", "build_create_table_request", "ConditionFailedError", + "LeaseHeldError", + "LeaseNotOwnedError", "count_distinct", "create_lambda_boto3_config", "create_table", @@ -225,6 +236,9 @@ def __getattr__(name: str) -> Any: "FilterCondition", "FilterGroup", "ConcurrencyLimiter", + "Lease", + "LeaseKey", + "LeaseManager", "MaxExpressionLength", "MaxFieldNameLength", "MaxNestedDepth", @@ -250,6 +264,8 @@ def __getattr__(name: str) -> Any: "TransactPut", "TransactUpdate", "TransactWriteAction", + "UpdateAdd", + "UpdateSetIfNotExists", "TransactionCanceledError", "Table", "ValidationError", diff --git a/py/src/theorydb_py/errors.py b/py/src/theorydb_py/errors.py index fc70ca1..c8e4cee 100644 --- a/py/src/theorydb_py/errors.py +++ b/py/src/theorydb_py/errors.py @@ -9,6 +9,14 @@ class ConditionFailedError(TheorydbPyError): pass +class LeaseHeldError(ConditionFailedError): + pass + + +class LeaseNotOwnedError(ConditionFailedError): + pass + + class NotFoundError(TheorydbPyError): pass diff --git a/py/src/theorydb_py/lease.py b/py/src/theorydb_py/lease.py new file mode 100644 index 0000000..47300d4 --- /dev/null +++ b/py/src/theorydb_py/lease.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import time +import uuid +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any + +from botocore.exceptions import ClientError + +from .aws_errors import map_client_error +from .errors import ConditionFailedError, LeaseHeldError, LeaseNotOwnedError + + +@dataclass(frozen=True) +class LeaseKey: + pk: str + sk: str + + +@dataclass(frozen=True) +class Lease: + key: LeaseKey + token: str + expires_at: int + + +class LeaseManager: + def __init__( + self, + *, + client: Any, + table_name: str, + now: Callable[[], float] | None = None, + token: Callable[[], str] | None = None, + pk_attr: str = "pk", + sk_attr: str = "sk", + token_attr: str = "lease_token", + expires_at_attr: str = "lease_expires_at", + ttl_attr: str = "ttl", + ttl_buffer_seconds: int = 60 * 60, + ) -> None: + if client is None: + raise ValueError("client is required") + if not table_name: + raise ValueError("table_name is required") + + self._client = client + self._table_name = table_name + + self._now = now or time.time + self._token = token or (lambda: str(uuid.uuid4())) + + self._pk_attr = pk_attr + self._sk_attr = sk_attr + self._token_attr = token_attr + self._expires_at_attr = expires_at_attr + self._ttl_attr = ttl_attr + self._ttl_buffer_seconds = int(ttl_buffer_seconds) + + def lock_key(self, pk: str, sk: str = "LOCK") -> LeaseKey: + return LeaseKey(pk=pk, sk=sk) + + def acquire(self, key: LeaseKey, *, lease_seconds: int) -> Lease: + if not key.pk or not key.sk: + raise ValueError("key.pk and key.sk are required") + if lease_seconds <= 0: + raise ValueError("lease_seconds must be > 0") + + now = int(self._now()) + expires_at = now + int(lease_seconds) + token = self._token() + + item: dict[str, Any] = { + self._pk_attr: {"S": key.pk}, + self._sk_attr: {"S": key.sk}, + self._token_attr: {"S": token}, + self._expires_at_attr: {"N": str(expires_at)}, + } + if self._ttl_attr and self._ttl_buffer_seconds > 0: + ttl = expires_at + self._ttl_buffer_seconds + item[self._ttl_attr] = {"N": str(ttl)} + + try: + self._client.put_item( + TableName=self._table_name, + Item=item, + ConditionExpression="attribute_not_exists(#pk) OR #exp <= :now", + ExpressionAttributeNames={"#pk": self._pk_attr, "#exp": self._expires_at_attr}, + ExpressionAttributeValues={":now": {"N": str(now)}}, + ) + except ClientError as err: # pragma: no cover (depends on AWS error shape) + mapped = map_client_error(err) + if isinstance(mapped, ConditionFailedError): + raise LeaseHeldError(str(mapped)) from err + raise mapped from err + + return Lease(key=key, token=token, expires_at=expires_at) + + def refresh(self, lease: Lease, *, lease_seconds: int) -> Lease: + if not lease.key.pk or not lease.key.sk: + raise ValueError("lease.key.pk and lease.key.sk are required") + if not lease.token: + raise ValueError("lease.token is required") + if lease_seconds <= 0: + raise ValueError("lease_seconds must be > 0") + + now = int(self._now()) + expires_at = now + int(lease_seconds) + + names: dict[str, str] = { + "#tok": self._token_attr, + "#exp": self._expires_at_attr, + } + values: dict[str, Any] = { + ":tok": {"S": lease.token}, + ":now": {"N": str(now)}, + ":exp": {"N": str(expires_at)}, + } + + update_expression = "SET #exp = :exp" + if self._ttl_attr and self._ttl_buffer_seconds > 0: + ttl = expires_at + self._ttl_buffer_seconds + names["#ttl"] = self._ttl_attr + values[":ttl"] = {"N": str(ttl)} + update_expression = update_expression + ", #ttl = :ttl" + + try: + self._client.update_item( + TableName=self._table_name, + Key={self._pk_attr: {"S": lease.key.pk}, self._sk_attr: {"S": lease.key.sk}}, + UpdateExpression=update_expression, + ConditionExpression="#tok = :tok AND #exp > :now", + ExpressionAttributeNames=names, + ExpressionAttributeValues=values, + ) + except ClientError as err: # pragma: no cover (depends on AWS error shape) + mapped = map_client_error(err) + if isinstance(mapped, ConditionFailedError): + raise LeaseNotOwnedError(str(mapped)) from err + raise mapped from err + + return Lease(key=lease.key, token=lease.token, expires_at=expires_at) + + def release(self, lease: Lease) -> None: + if not lease.key.pk or not lease.key.sk: + raise ValueError("lease.key.pk and lease.key.sk are required") + if not lease.token: + raise ValueError("lease.token is required") + + try: + self._client.delete_item( + TableName=self._table_name, + Key={self._pk_attr: {"S": lease.key.pk}, self._sk_attr: {"S": lease.key.sk}}, + ConditionExpression="#tok = :tok", + ExpressionAttributeNames={"#tok": self._token_attr}, + ExpressionAttributeValues={":tok": {"S": lease.token}}, + ) + except ClientError as err: # pragma: no cover (depends on AWS error shape) + mapped = map_client_error(err) + if isinstance(mapped, ConditionFailedError): + return # best-effort + raise mapped from err diff --git a/py/src/theorydb_py/table.py b/py/src/theorydb_py/table.py index e4d82d4..2d6d9e3 100644 --- a/py/src/theorydb_py/table.py +++ b/py/src/theorydb_py/table.py @@ -36,6 +36,8 @@ TransactPut, TransactUpdate, TransactWriteAction, + UpdateAdd, + UpdateSetIfNotExists, ) if TYPE_CHECKING: @@ -854,6 +856,14 @@ def _build_update_request( update_values: dict[str, Any] = {} set_parts: list[str] = [] remove_parts: list[str] = [] + add_parts: list[str] = [] + + def normalize_set(value: Any) -> set[Any]: + if isinstance(value, set): + return value + if isinstance(value, (list, tuple)): + return set(value) + return {value} for field_name, value in updates.items(): if field_name not in self._model.attributes: @@ -867,6 +877,30 @@ def _build_update_request( name_ref = f"#d_{field_name}" update_names[name_ref] = attr_def.attribute_name + if isinstance(value, UpdateAdd): + if attr_def.encrypted: + raise ValidationError(f"encrypted fields cannot be used in ADD: {field_name}") + + value_ref = f":d_{field_name}" + if attr_def.set: + update_values[value_ref] = self._serialize_attr_value( + attr_def, + normalize_set(value.value), + ) + else: + if not isinstance(value.value, (int, float, Decimal)): + raise ValidationError("ADD requires a numeric value for non-set fields") + update_values[value_ref] = self._serializer.serialize(value.value) + + add_parts.append(f"{name_ref} {value_ref}") + continue + + if isinstance(value, UpdateSetIfNotExists): + value_ref = f":d_{field_name}" + update_values[value_ref] = self._serialize_attr_value(attr_def, value.default_value) + set_parts.append(f"{name_ref} = if_not_exists({name_ref}, {value_ref})") + continue + if value is None: remove_parts.append(name_ref) continue @@ -880,6 +914,8 @@ def _build_update_request( expr_parts.append("SET " + ", ".join(set_parts)) if remove_parts: expr_parts.append("REMOVE " + ", ".join(remove_parts)) + if add_parts: + expr_parts.append("ADD " + ", ".join(add_parts)) if not expr_parts: raise ValidationError("no updates provided") diff --git a/py/src/theorydb_py/transaction.py b/py/src/theorydb_py/transaction.py index 560a9c5..f12d823 100644 --- a/py/src/theorydb_py/transaction.py +++ b/py/src/theorydb_py/transaction.py @@ -5,6 +5,16 @@ from typing import Any +@dataclass(frozen=True) +class UpdateAdd: + value: Any + + +@dataclass(frozen=True) +class UpdateSetIfNotExists: + default_value: Any + + @dataclass(frozen=True) class TransactPut[T]: item: T diff --git a/py/src/theorydb_py/version.json b/py/src/theorydb_py/version.json index a15e91f..28b5fa5 100644 --- a/py/src/theorydb_py/version.json +++ b/py/src/theorydb_py/version.json @@ -1,3 +1,3 @@ { - "version": "1.2.0" + "version": "1.3.0-rc.1" } diff --git a/py/tests/integration/test_lease.py b/py/tests/integration/test_lease.py new file mode 100644 index 0000000..ebd6f54 --- /dev/null +++ b/py/tests/integration/test_lease.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import os +import uuid + +import boto3 +import pytest + +from theorydb_py.errors import LeaseHeldError, LeaseNotOwnedError +from theorydb_py.lease import LeaseManager + + +def _dynamodb_endpoint() -> str: + return os.environ.get("DYNAMODB_ENDPOINT", "http://localhost:8000") + + +def _client(): + return boto3.client( + "dynamodb", + endpoint_url=_dynamodb_endpoint(), + region_name=os.environ.get("AWS_REGION", "us-east-1"), + aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "dummy"), + aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "dummy"), + ) + + +def test_lease_two_contenders() -> None: + table_name = f"theorydb_py_lease_{uuid.uuid4().hex[:12]}" + client = _client() + + client.create_table( + TableName=table_name, + KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}, {"AttributeName": "sk", "KeyType": "RANGE"}], + AttributeDefinitions=[ + {"AttributeName": "pk", "AttributeType": "S"}, + {"AttributeName": "sk", "AttributeType": "S"}, + ], + BillingMode="PAY_PER_REQUEST", + ) + waiter = client.get_waiter("table_exists") + waiter.wait(TableName=table_name) + + try: + now = 1000.0 + + def clock() -> float: + return now + + mgr1 = LeaseManager( + client=client, + table_name=table_name, + now=clock, + token=lambda: "tok1", + ttl_buffer_seconds=10, + ) + mgr2 = LeaseManager( + client=client, + table_name=table_name, + now=clock, + token=lambda: "tok2", + ttl_buffer_seconds=10, + ) + + key = mgr1.lock_key(pk="CACHE#py", sk="LOCK") + l1 = mgr1.acquire(key, lease_seconds=30) + assert l1.token == "tok1" + + with pytest.raises(LeaseHeldError): + mgr2.acquire(key, lease_seconds=30) + + now = 2000.0 + l2 = mgr2.acquire(key, lease_seconds=30) + assert l2.token == "tok2" + + with pytest.raises(LeaseNotOwnedError): + mgr1.refresh(l1, lease_seconds=30) + finally: + client.delete_table(TableName=table_name) diff --git a/py/tests/unit/test_lease.py b/py/tests/unit/test_lease.py new file mode 100644 index 0000000..629d962 --- /dev/null +++ b/py/tests/unit/test_lease.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +from botocore.exceptions import ClientError + +from theorydb_py.errors import LeaseHeldError, LeaseNotOwnedError +from theorydb_py.lease import Lease, LeaseKey, LeaseManager + + +class _StubClient: + def __init__(self) -> None: + self.calls: list[tuple[str, dict]] = [] + self._handlers: dict[str, object] = {} + + def on(self, op: str, handler: object) -> None: + self._handlers[op] = handler + + def put_item(self, **req): + self.calls.append(("put_item", req)) + handler = self._handlers.get("put_item") + if callable(handler): + return handler(req) + return {} + + def update_item(self, **req): + self.calls.append(("update_item", req)) + handler = self._handlers.get("update_item") + if callable(handler): + return handler(req) + return {} + + def delete_item(self, **req): + self.calls.append(("delete_item", req)) + handler = self._handlers.get("delete_item") + if callable(handler): + return handler(req) + return {} + + +def test_lease_manager_acquire_builds_conditional_put() -> None: + client = _StubClient() + mgr = LeaseManager( + client=client, + table_name="tbl", + now=lambda: 1000.0, + token=lambda: "tok", + ttl_buffer_seconds=10, + ) + + lease = mgr.acquire(LeaseKey(pk="CACHE#A", sk="LOCK"), lease_seconds=30) + assert lease == Lease(key=LeaseKey(pk="CACHE#A", sk="LOCK"), token="tok", expires_at=1030) + + op, req = client.calls[0] + assert op == "put_item" + assert req["TableName"] == "tbl" + assert req["ConditionExpression"] == "attribute_not_exists(#pk) OR #exp <= :now" + assert req["ExpressionAttributeNames"] == {"#pk": "pk", "#exp": "lease_expires_at"} + assert req["ExpressionAttributeValues"] == {":now": {"N": "1000"}} + assert req["Item"]["pk"] == {"S": "CACHE#A"} + assert req["Item"]["sk"] == {"S": "LOCK"} + assert req["Item"]["lease_token"] == {"S": "tok"} + assert req["Item"]["lease_expires_at"] == {"N": "1030"} + assert req["Item"]["ttl"] == {"N": "1040"} + + +def test_lease_manager_acquire_raises_lease_held() -> None: + client = _StubClient() + err = ClientError( + {"Error": {"Code": "ConditionalCheckFailedException", "Message": "no"}}, + "PutItem", + ) + + def handler(_: dict) -> object: + raise err + + client.on("put_item", handler) + mgr = LeaseManager(client=client, table_name="tbl", now=lambda: 1000.0, token=lambda: "tok") + + try: + mgr.acquire(LeaseKey(pk="CACHE#A", sk="LOCK"), lease_seconds=30) + raise AssertionError("expected LeaseHeldError") + except LeaseHeldError: + pass + + +def test_lease_manager_refresh_updates_when_owned() -> None: + client = _StubClient() + mgr = LeaseManager(client=client, table_name="tbl", now=lambda: 1000.0, ttl_buffer_seconds=10) + + out = mgr.refresh( + Lease(key=LeaseKey(pk="CACHE#A", sk="LOCK"), token="tok", expires_at=0), + lease_seconds=60, + ) + assert out.expires_at == 1060 + + op, req = client.calls[0] + assert op == "update_item" + assert req["TableName"] == "tbl" + assert req["UpdateExpression"] == "SET #exp = :exp, #ttl = :ttl" + assert req["ConditionExpression"] == "#tok = :tok AND #exp > :now" + assert req["ExpressionAttributeNames"]["#tok"] == "lease_token" + assert req["ExpressionAttributeNames"]["#exp"] == "lease_expires_at" + assert req["ExpressionAttributeNames"]["#ttl"] == "ttl" + assert req["ExpressionAttributeValues"][":tok"] == {"S": "tok"} + assert req["ExpressionAttributeValues"][":now"] == {"N": "1000"} + assert req["ExpressionAttributeValues"][":exp"] == {"N": "1060"} + assert req["ExpressionAttributeValues"][":ttl"] == {"N": "1070"} + + +def test_lease_manager_refresh_raises_not_owned() -> None: + client = _StubClient() + err = ClientError( + {"Error": {"Code": "ConditionalCheckFailedException", "Message": "no"}}, + "UpdateItem", + ) + + def handler(_: dict) -> object: + raise err + + client.on("update_item", handler) + mgr = LeaseManager(client=client, table_name="tbl", now=lambda: 1000.0) + + try: + mgr.refresh( + Lease(key=LeaseKey(pk="CACHE#A", sk="LOCK"), token="tok", expires_at=0), + lease_seconds=60, + ) + raise AssertionError("expected LeaseNotOwnedError") + except LeaseNotOwnedError: + pass + + +def test_lease_manager_release_is_best_effort() -> None: + client = _StubClient() + err = ClientError( + {"Error": {"Code": "ConditionalCheckFailedException", "Message": "no"}}, + "DeleteItem", + ) + + def handler(_: dict) -> object: + raise err + + client.on("delete_item", handler) + mgr = LeaseManager(client=client, table_name="tbl", now=lambda: 1000.0) + + mgr.release(Lease(key=LeaseKey(pk="CACHE#A", sk="LOCK"), token="tok", expires_at=0)) diff --git a/py/tests/unit/test_table_additional_coverage.py b/py/tests/unit/test_table_additional_coverage.py index b4a8f88..00ee705 100644 --- a/py/tests/unit/test_table_additional_coverage.py +++ b/py/tests/unit/test_table_additional_coverage.py @@ -14,6 +14,8 @@ SortKeyCondition, Table, TransactionCanceledError, + UpdateAdd, + UpdateSetIfNotExists, ValidationError, theorydb_field, ) @@ -306,6 +308,33 @@ def test_transact_write_builds_requests_for_all_action_types() -> None: assert {next(iter(i.keys())) for i in transact_items} == {"Put", "Delete", "Update", "ConditionCheck"} +def test_transact_update_supports_add_and_if_not_exists() -> None: + model = ModelDefinition.from_dataclass(Item, table_name="tbl") + stub = _StubClient() + table: Table[Item] = Table(model, client=stub) + + table.transact_write( + [ + TransactUpdate( + pk="A", + sk="1", + updates={ + "value": UpdateAdd(1), + "note": UpdateSetIfNotExists("first"), + }, + condition_expression="attribute_not_exists(#v) OR #v < :max", + expression_attribute_names={"#v": "value"}, + expression_attribute_values={":max": 100}, + ) + ] + ) + + update = stub.transact_write_reqs[0]["TransactItems"][0]["Update"] + assert "ADD" in update["UpdateExpression"] + assert "if_not_exists" in update["UpdateExpression"] + assert update["ExpressionAttributeValues"][":d_value"]["N"] == "1" + + def test_put_delete_update_expression_attribute_maps_and_build_request_merges() -> None: model = ModelDefinition.from_dataclass(Item, table_name="tbl") stub = _StubClient() diff --git a/tests/integration/lease_helper_test.go b/tests/integration/lease_helper_test.go new file mode 100644 index 0000000..de4bc30 --- /dev/null +++ b/tests/integration/lease_helper_test.go @@ -0,0 +1,81 @@ +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/theory-cloud/tabletheory/pkg/lease" +) + +var leaseContractTableName string + +type leaseContractModel struct { + _ struct{} `theorydb:"naming:snake_case"` + + PK string `theorydb:"pk,attr:pk" json:"pk"` + SK string `theorydb:"sk,attr:sk" json:"sk"` + + LeaseToken string `theorydb:"attr:lease_token" json:"lease_token"` + LeaseExpiresAt int64 `theorydb:"attr:lease_expires_at" json:"lease_expires_at"` + TTL int64 `theorydb:"ttl,attr:ttl,omitempty" json:"ttl,omitempty"` +} + +func (leaseContractModel) TableName() string { + return leaseContractTableName +} + +func TestLeaseManager_TwoContenders(t *testing.T) { + tc := InitTestDB(t) + + leaseContractTableName = fmt.Sprintf("lease_contract_%d", time.Now().UnixNano()) + tc.CreateTable(t, &leaseContractModel{}) + + ctx := context.Background() + + mgr1, err := lease.NewManager( + tc.DynamoDBClient, + leaseContractTableName, + lease.WithNow(func() time.Time { return time.Unix(1000, 0) }), + lease.WithTokenGenerator(func() string { return "tok1" }), + lease.WithTTLBuffer(10*time.Second), + ) + require.NoError(t, err) + + mgr2, err := lease.NewManager( + tc.DynamoDBClient, + leaseContractTableName, + lease.WithNow(func() time.Time { return time.Unix(1000, 0) }), + lease.WithTokenGenerator(func() string { return "tok2" }), + lease.WithTTLBuffer(10*time.Second), + ) + require.NoError(t, err) + + got1, err := mgr1.Acquire(ctx, "CACHE#A", 30*time.Second) + require.NoError(t, err) + require.Equal(t, "tok1", got1.Token) + + _, err = mgr2.Acquire(ctx, "CACHE#A", 30*time.Second) + require.Error(t, err) + require.True(t, lease.IsLeaseHeld(err)) + + mgr2Late, err := lease.NewManager( + tc.DynamoDBClient, + leaseContractTableName, + lease.WithNow(func() time.Time { return time.Unix(2000, 0) }), + lease.WithTokenGenerator(func() string { return "tok2" }), + lease.WithTTLBuffer(10*time.Second), + ) + require.NoError(t, err) + + got2, err := mgr2Late.Acquire(ctx, "CACHE#A", 30*time.Second) + require.NoError(t, err) + require.Equal(t, "tok2", got2.Token) + + _, err = mgr1.Refresh(ctx, *got1, 30*time.Second) + require.Error(t, err) + require.True(t, lease.IsLeaseNotOwned(err)) +} diff --git a/ts/README.md b/ts/README.md index 4657f1d..badeb9c 100644 --- a/ts/README.md +++ b/ts/README.md @@ -115,6 +115,19 @@ await db.transactWrite([ item: { PK: 'U#1', SK: 'TX' }, ifNotExists: true, }, + { + kind: 'update', + model: 'User', + key: { PK: 'U#1', SK: 'TX' }, + updateExpression: 'SET #ws = if_not_exists(#ws, :ws) ADD #count :inc', + conditionExpression: 'attribute_not_exists(#count) OR #count < :maxAllowed', + expressionAttributeNames: { '#ws': 'WindowStart', '#count': 'Count' }, + expressionAttributeValues: { + ':ws': { S: '2026-01-23T00:00:00Z' }, + ':inc': { N: '1' }, + ':maxAllowed': { N: '100' }, + }, + }, ]); ``` diff --git a/ts/docs/core-patterns.md b/ts/docs/core-patterns.md index 231b4c2..0d33c97 100644 --- a/ts/docs/core-patterns.md +++ b/ts/docs/core-patterns.md @@ -78,6 +78,19 @@ await db.transactWrite([ item: { PK: 'U#1', SK: 'TX' }, ifNotExists: true, }, + { + kind: 'update', + model: 'User', + key: { PK: 'U#1', SK: 'TX' }, + updateExpression: 'SET #ws = if_not_exists(#ws, :ws) ADD #count :inc', + conditionExpression: 'attribute_not_exists(#count) OR #count < :maxAllowed', + expressionAttributeNames: { '#ws': 'WindowStart', '#count': 'Count' }, + expressionAttributeValues: { + ':ws': { S: '2026-01-23T00:00:00Z' }, + ':inc': { N: '1' }, + ':maxAllowed': { N: '100' }, + }, + }, ]); ``` diff --git a/ts/package-lock.json b/ts/package-lock.json index 2588532..a86131a 100644 --- a/ts/package-lock.json +++ b/ts/package-lock.json @@ -1,12 +1,12 @@ { "name": "@theory-cloud/tabletheory-ts", - "version": "1.2.0", + "version": "1.3.0-rc.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@theory-cloud/tabletheory-ts", - "version": "1.2.0", + "version": "1.3.0-rc.1", "license": "Apache-2.0", "dependencies": { "@aws-sdk/client-dynamodb": "^3.971.0", diff --git a/ts/package.json b/ts/package.json index 48d5dc6..23018d8 100644 --- a/ts/package.json +++ b/ts/package.json @@ -1,6 +1,6 @@ { "name": "@theory-cloud/tabletheory-ts", - "version": "1.2.0", + "version": "1.3.0-rc.1", "license": "Apache-2.0", "private": true, "type": "module", diff --git a/ts/src/client.ts b/ts/src/client.ts index e5a7901..32933d9 100644 --- a/ts/src/client.ts +++ b/ts/src/client.ts @@ -11,6 +11,7 @@ import { type ConditionCheck, type Delete, type Put, + type Update, type TransactWriteItem, type WriteRequest, } from '@aws-sdk/client-dynamodb'; @@ -474,6 +475,37 @@ export class TheorydbClient { transactItems.push({ Put: put }); break; } + case 'update': { + const update: Update = { + TableName: model.tableName, + Key: marshalKey(model, a.key), + UpdateExpression: '', + }; + + if ('updateFn' in a) { + const builder = new UpdateBuilder( + this.ddb, + model, + a.key, + provider, + this.sendOptions, + ); + await a.updateFn(builder); + const built = await builder.build(); + update.UpdateExpression = built.updateExpression; + update.ConditionExpression = built.conditionExpression; + update.ExpressionAttributeNames = built.expressionAttributeNames; + update.ExpressionAttributeValues = built.expressionAttributeValues; + } else { + update.UpdateExpression = a.updateExpression; + update.ConditionExpression = a.conditionExpression; + update.ExpressionAttributeNames = a.expressionAttributeNames; + update.ExpressionAttributeValues = a.expressionAttributeValues; + } + + transactItems.push({ Update: update }); + break; + } case 'delete': transactItems.push({ Delete: { diff --git a/ts/src/errors.ts b/ts/src/errors.ts index c647190..3494431 100644 --- a/ts/src/errors.ts +++ b/ts/src/errors.ts @@ -1,6 +1,8 @@ export type ErrorCode = | 'ErrItemNotFound' | 'ErrConditionFailed' + | 'ErrLeaseHeld' + | 'ErrLeaseNotOwned' | 'ErrInvalidModel' | 'ErrMissingPrimaryKey' | 'ErrInvalidOperator' diff --git a/ts/src/index.ts b/ts/src/index.ts index 90c3986..4ab8d92 100644 --- a/ts/src/index.ts +++ b/ts/src/index.ts @@ -18,3 +18,4 @@ export * from './multiaccount.js'; export * from './send-options.js'; export * from './validation.js'; export * from './protection.js'; +export * from './lease.js'; diff --git a/ts/src/lease.ts b/ts/src/lease.ts new file mode 100644 index 0000000..af461e7 --- /dev/null +++ b/ts/src/lease.ts @@ -0,0 +1,205 @@ +import { + DeleteItemCommand, + PutItemCommand, + UpdateItemCommand, + type DynamoDBClient, +} from '@aws-sdk/client-dynamodb'; +import { randomUUID } from 'node:crypto'; + +import { mapDynamoError } from './dynamo-error.js'; +import { TheorydbError } from './errors.js'; +import type { SendOptions } from './send-options.js'; + +export type LeaseKey = { + pk: string; + sk: string; +}; + +export type Lease = { + key: LeaseKey; + token: string; + expiresAt: number; +}; + +export class LeaseManager { + private readonly now: () => number; + private readonly token: () => string; + private readonly pkAttr: string; + private readonly skAttr: string; + private readonly tokenAttr: string; + private readonly expiresAtAttr: string; + private readonly ttlAttr: string; + private readonly ttlBufferSeconds: number; + private readonly sendOptions: SendOptions | undefined; + + constructor( + private readonly ddb: DynamoDBClient, + private readonly tableName: string, + opts: { + now?: () => number; + token?: () => string; + pkAttr?: string; + skAttr?: string; + tokenAttr?: string; + expiresAtAttr?: string; + ttlAttr?: string; + ttlBufferSeconds?: number; + sendOptions?: SendOptions; + } = {}, + ) { + if (!tableName) throw new Error('tableName is required'); + + this.now = opts.now ?? (() => Math.floor(Date.now() / 1000)); + this.token = opts.token ?? (() => randomUUID()); + + this.pkAttr = opts.pkAttr ?? 'pk'; + this.skAttr = opts.skAttr ?? 'sk'; + this.tokenAttr = opts.tokenAttr ?? 'lease_token'; + this.expiresAtAttr = opts.expiresAtAttr ?? 'lease_expires_at'; + this.ttlAttr = opts.ttlAttr ?? 'ttl'; + this.ttlBufferSeconds = opts.ttlBufferSeconds ?? 60 * 60; + this.sendOptions = opts.sendOptions; + } + + lockKey(pk: string, sk = 'LOCK'): LeaseKey { + return { pk, sk }; + } + + async acquire(key: LeaseKey, opts: { leaseSeconds: number }): Promise { + if (!key?.pk || !key?.sk) throw new Error('key.pk and key.sk are required'); + if (!Number.isFinite(opts.leaseSeconds) || opts.leaseSeconds <= 0) { + throw new Error('leaseSeconds must be > 0'); + } + + const now = this.now(); + const expiresAt = now + Math.ceil(opts.leaseSeconds); + const token = this.token(); + const ttl = expiresAt + Math.max(0, Math.ceil(this.ttlBufferSeconds)); + + const cmd = new PutItemCommand({ + TableName: this.tableName, + Item: { + [this.pkAttr]: { S: key.pk }, + [this.skAttr]: { S: key.sk }, + [this.tokenAttr]: { S: token }, + [this.expiresAtAttr]: { N: String(expiresAt) }, + ...(this.ttlAttr && this.ttlBufferSeconds > 0 + ? { [this.ttlAttr]: { N: String(ttl) } } + : {}), + }, + ConditionExpression: 'attribute_not_exists(#pk) OR #exp <= :now', + ExpressionAttributeNames: { + '#pk': this.pkAttr, + '#exp': this.expiresAtAttr, + }, + ExpressionAttributeValues: { + ':now': { N: String(now) }, + }, + }); + + try { + await this.ddb.send(cmd, this.sendOptions); + return { key, token, expiresAt }; + } catch (err) { + const mapped = mapDynamoError(err); + if ( + mapped instanceof TheorydbError && + mapped.code === 'ErrConditionFailed' + ) { + throw new TheorydbError('ErrLeaseHeld', 'Lease held', { cause: err }); + } + throw mapped; + } + } + + async refresh(lease: Lease, opts: { leaseSeconds: number }): Promise { + if (!lease?.key?.pk || !lease?.key?.sk) { + throw new Error('lease.key.pk and lease.key.sk are required'); + } + if (!lease?.token) throw new Error('lease.token is required'); + if (!Number.isFinite(opts.leaseSeconds) || opts.leaseSeconds <= 0) { + throw new Error('leaseSeconds must be > 0'); + } + + const now = this.now(); + const expiresAt = now + Math.ceil(opts.leaseSeconds); + const ttl = expiresAt + Math.max(0, Math.ceil(this.ttlBufferSeconds)); + + const updateExpression = + this.ttlAttr && this.ttlBufferSeconds > 0 + ? 'SET #exp = :exp, #ttl = :ttl' + : 'SET #exp = :exp'; + + const cmd = new UpdateItemCommand({ + TableName: this.tableName, + Key: { + [this.pkAttr]: { S: lease.key.pk }, + [this.skAttr]: { S: lease.key.sk }, + }, + UpdateExpression: updateExpression, + ConditionExpression: '#tok = :tok AND #exp > :now', + ExpressionAttributeNames: { + '#tok': this.tokenAttr, + '#exp': this.expiresAtAttr, + ...(this.ttlAttr && this.ttlBufferSeconds > 0 + ? { '#ttl': this.ttlAttr } + : {}), + }, + ExpressionAttributeValues: { + ':tok': { S: lease.token }, + ':now': { N: String(now) }, + ':exp': { N: String(expiresAt) }, + ...(this.ttlAttr && this.ttlBufferSeconds > 0 + ? { ':ttl': { N: String(ttl) } } + : {}), + }, + }); + + try { + await this.ddb.send(cmd, this.sendOptions); + return { ...lease, expiresAt }; + } catch (err) { + const mapped = mapDynamoError(err); + if ( + mapped instanceof TheorydbError && + mapped.code === 'ErrConditionFailed' + ) { + throw new TheorydbError('ErrLeaseNotOwned', 'Lease not owned', { + cause: err, + }); + } + throw mapped; + } + } + + async release(lease: Lease): Promise { + if (!lease?.key?.pk || !lease?.key?.sk) { + throw new Error('lease.key.pk and lease.key.sk are required'); + } + if (!lease?.token) throw new Error('lease.token is required'); + + const cmd = new DeleteItemCommand({ + TableName: this.tableName, + Key: { + [this.pkAttr]: { S: lease.key.pk }, + [this.skAttr]: { S: lease.key.sk }, + }, + ConditionExpression: '#tok = :tok', + ExpressionAttributeNames: { '#tok': this.tokenAttr }, + ExpressionAttributeValues: { ':tok': { S: lease.token } }, + }); + + try { + await this.ddb.send(cmd, this.sendOptions); + } catch (err) { + const mapped = mapDynamoError(err); + if ( + mapped instanceof TheorydbError && + mapped.code === 'ErrConditionFailed' + ) { + return; // best-effort + } + throw mapped; + } + } +} diff --git a/ts/src/transaction.ts b/ts/src/transaction.ts index 3c8dca6..f45a013 100644 --- a/ts/src/transaction.ts +++ b/ts/src/transaction.ts @@ -1,5 +1,29 @@ import type { AttributeValue } from '@aws-sdk/client-dynamodb'; +import type { UpdateBuilder } from './update-builder.js'; + +type TransactUpdateRaw = { + kind: 'update'; + model: string; + key: Record; + updateExpression: string; + conditionExpression?: string; + expressionAttributeNames?: Record; + expressionAttributeValues?: Record; + updateFn?: never; +}; + +type TransactUpdateWithBuilder = { + kind: 'update'; + model: string; + key: Record; + updateFn: (builder: UpdateBuilder) => void | Promise; + updateExpression?: never; + conditionExpression?: never; + expressionAttributeNames?: never; + expressionAttributeValues?: never; +}; + export type TransactAction = | { kind: 'put'; @@ -7,6 +31,8 @@ export type TransactAction = item: Record; ifNotExists?: boolean; } + | TransactUpdateRaw + | TransactUpdateWithBuilder | { kind: 'delete'; model: string; diff --git a/ts/src/update-builder.ts b/ts/src/update-builder.ts index a3956c1..4214190 100644 --- a/ts/src/update-builder.ts +++ b/ts/src/update-builder.ts @@ -372,7 +372,12 @@ export class UpdateBuilder { return this; } - async execute(): Promise | undefined> { + async build(): Promise<{ + updateExpression: string; + conditionExpression?: string; + expressionAttributeNames?: Record; + expressionAttributeValues?: Record; + }> { if (this.updateOps.length === 0) { throw new TheorydbError('ErrInvalidOperator', 'No updates provided'); } @@ -406,15 +411,28 @@ export class UpdateBuilder { values[k] = v; } + return { + updateExpression, + ...(cond.expression ? { conditionExpression: cond.expression } : {}), + ...(Object.keys(names).length ? { expressionAttributeNames: names } : {}), + ...(Object.keys(values).length + ? { expressionAttributeValues: values } + : {}), + }; + } + + async execute(): Promise | undefined> { + const built = await this.build(); + const cmd = new UpdateItemCommand({ TableName: this.model.tableName, Key: marshalKey(this.model, this.key), - UpdateExpression: updateExpression, - ...(cond.expression ? { ConditionExpression: cond.expression } : {}), - ExpressionAttributeNames: Object.keys(names).length ? names : undefined, - ExpressionAttributeValues: Object.keys(values).length - ? values - : undefined, + UpdateExpression: built.updateExpression, + ...(built.conditionExpression + ? { ConditionExpression: built.conditionExpression } + : {}), + ExpressionAttributeNames: built.expressionAttributeNames, + ExpressionAttributeValues: built.expressionAttributeValues, ReturnValues: this.returnValuesOpt, }); diff --git a/ts/test/integration/lease.test.ts b/ts/test/integration/lease.test.ts new file mode 100644 index 0000000..20f1318 --- /dev/null +++ b/ts/test/integration/lease.test.ts @@ -0,0 +1,103 @@ +import assert from 'node:assert/strict'; + +import { + CreateTableCommand, + DescribeTableCommand, + DynamoDBClient, + ResourceInUseException, +} from '@aws-sdk/client-dynamodb'; + +import { TheorydbError } from '../../src/errors.js'; +import { LeaseManager } from '../../src/lease.js'; + +const endpoint = process.env.DYNAMODB_ENDPOINT ?? 'http://localhost:8000'; + +const ddb = new DynamoDBClient({ + region: process.env.AWS_REGION ?? 'us-east-1', + endpoint, + credentials: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? 'dummy', + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? 'dummy', + }, +}); + +try { + await ensureLeaseTable(ddb); + + const pk = `CACHE#ts-${Date.now()}`; + const key = { pk, sk: 'LOCK' }; + + const mgr1 = new LeaseManager(ddb, 'lease_contract', { + now: () => 1000, + token: () => 'tok1', + ttlBufferSeconds: 10, + }); + + const mgr2 = new LeaseManager(ddb, 'lease_contract', { + now: () => 1000, + token: () => 'tok2', + ttlBufferSeconds: 10, + }); + + const l1 = await mgr1.acquire(key, { leaseSeconds: 30 }); + assert.equal(l1.token, 'tok1'); + + await assert.rejects( + () => mgr2.acquire(key, { leaseSeconds: 30 }), + (e) => e instanceof TheorydbError && e.code === 'ErrLeaseHeld', + ); + + const mgr2Late = new LeaseManager(ddb, 'lease_contract', { + now: () => 2000, + token: () => 'tok2', + ttlBufferSeconds: 10, + }); + + const l2 = await mgr2Late.acquire(key, { leaseSeconds: 30 }); + assert.equal(l2.token, 'tok2'); + + await assert.rejects( + () => mgr1.refresh(l1, { leaseSeconds: 30 }), + (e) => e instanceof TheorydbError && e.code === 'ErrLeaseNotOwned', + ); +} finally { + ddb.destroy(); +} + +async function ensureLeaseTable(client: DynamoDBClient): Promise { + const tableName = 'lease_contract'; + try { + await client.send(new DescribeTableCommand({ TableName: tableName })); + return; + } catch { + // continue + } + + try { + await client.send( + new CreateTableCommand({ + TableName: tableName, + AttributeDefinitions: [ + { AttributeName: 'pk', AttributeType: 'S' }, + { AttributeName: 'sk', AttributeType: 'S' }, + ], + KeySchema: [ + { AttributeName: 'pk', KeyType: 'HASH' }, + { AttributeName: 'sk', KeyType: 'RANGE' }, + ], + ProvisionedThroughput: { ReadCapacityUnits: 1, WriteCapacityUnits: 1 }, + }), + ); + } catch (err) { + if (err instanceof ResourceInUseException) return; + if ( + typeof err === 'object' && + err !== null && + 'name' in err && + (err as { name?: unknown }).name === 'ResourceInUseException' + ) { + return; + } + throw err; + } +} diff --git a/ts/test/run-integration.ts b/ts/test/run-integration.ts index af9ca59..daddb54 100644 --- a/ts/test/run-integration.ts +++ b/ts/test/run-integration.ts @@ -2,3 +2,4 @@ await import('./integration/dynamodb-local.test.js'); await import('./integration/p0.test.js'); await import('./integration/query.test.js'); await import('./integration/batch-tx.test.js'); +await import('./integration/lease.test.js'); diff --git a/ts/test/run-unit.ts b/ts/test/run-unit.ts index d10ef77..fc87cde 100644 --- a/ts/test/run-unit.ts +++ b/ts/test/run-unit.ts @@ -20,3 +20,4 @@ await import('./unit/lambda.test.js'); await import('./unit/multiaccount.test.js'); await import('./unit/validation.test.js'); await import('./unit/protection.test.js'); +await import('./unit/lease.test.js'); diff --git a/ts/test/unit/client.test.ts b/ts/test/unit/client.test.ts index 57ec580..7b7b616 100644 --- a/ts/test/unit/client.test.ts +++ b/ts/test/unit/client.test.ts @@ -249,6 +249,15 @@ class StubDdb { ifNotExists: true, }, { kind: 'delete', model: 'User', key: { PK: 'A', SK: '1' } }, + { + kind: 'update', + model: 'User', + key: { PK: 'A', SK: '1' }, + updateExpression: 'ADD #v :inc', + conditionExpression: 'attribute_exists(PK)', + expressionAttributeNames: { '#v': 'version' }, + expressionAttributeValues: { ':inc': { N: '1' } }, + }, { kind: 'condition', model: 'User', @@ -259,6 +268,43 @@ class StubDdb { assert.ok(ddb.sent[0] instanceof TransactWriteItemsCommand); } +{ + const ddb = new StubDdb((cmd) => { + if (cmd instanceof TransactWriteItemsCommand) return {}; + throw new Error('unexpected'); + }); + const client = new TheorydbClient(ddb as unknown as DynamoDBClient).register( + User, + ); + + await client.transactWrite([ + { + kind: 'update', + model: 'User', + key: { PK: 'A', SK: '1' }, + updateFn: (u) => { + u.add('version', 1); + u.setIfNotExists( + 'createdAt', + undefined, + '2026-01-23T00:00:00.000000000Z', + ); + u.conditionNotExists('version').orCondition('version', '<', 100); + }, + }, + ]); + + const cmd = ddb.sent[0]; + assert.ok(cmd instanceof TransactWriteItemsCommand); + assert.equal(cmd.input.TransactItems?.length, 1); + const update = cmd.input.TransactItems?.[0]?.Update; + assert.equal(update?.TableName, 'users_contract'); + assert.ok(update?.UpdateExpression?.includes('ADD')); + assert.ok(update?.UpdateExpression?.includes('if_not_exists')); + assert.ok(update?.ConditionExpression?.includes('attribute_not_exists')); + assert.ok(update?.ConditionExpression?.includes('<')); +} + { const ddb = new StubDdb((cmd) => { if (cmd instanceof QueryCommand) return { Items: [] }; diff --git a/ts/test/unit/lease.test.ts b/ts/test/unit/lease.test.ts new file mode 100644 index 0000000..4411bfa --- /dev/null +++ b/ts/test/unit/lease.test.ts @@ -0,0 +1,139 @@ +import assert from 'node:assert/strict'; + +import { + ConditionalCheckFailedException, + DeleteItemCommand, + PutItemCommand, + UpdateItemCommand, + type DynamoDBClient, +} from '@aws-sdk/client-dynamodb'; + +import { TheorydbError } from '../../src/errors.js'; +import { LeaseManager } from '../../src/lease.js'; + +class StubDdb { + sent: unknown[] = []; + + constructor(private readonly handler: (cmd: unknown) => unknown) {} + + async send(cmd: unknown): Promise { + this.sent.push(cmd); + return this.handler(cmd); + } +} + +{ + const ddb = new StubDdb(() => ({})); + const mgr = new LeaseManager(ddb as unknown as DynamoDBClient, 'tbl', { + now: () => 1000, + token: () => 'tok', + ttlBufferSeconds: 10, + }); + + const lease = await mgr.acquire( + { pk: 'CACHE#A', sk: 'LOCK' }, + { leaseSeconds: 30 }, + ); + assert.equal(lease.token, 'tok'); + assert.equal(lease.expiresAt, 1030); + + const cmd = ddb.sent[0]; + assert.ok(cmd instanceof PutItemCommand); + assert.equal(cmd.input.TableName, 'tbl'); + assert.equal( + cmd.input.ConditionExpression, + 'attribute_not_exists(#pk) OR #exp <= :now', + ); + assert.deepEqual(cmd.input.ExpressionAttributeNames, { + '#pk': 'pk', + '#exp': 'lease_expires_at', + }); + assert.equal(cmd.input.ExpressionAttributeValues?.[':now']?.N, '1000'); + assert.equal(cmd.input.Item?.pk?.S, 'CACHE#A'); + assert.equal(cmd.input.Item?.sk?.S, 'LOCK'); + assert.equal(cmd.input.Item?.lease_token?.S, 'tok'); + assert.equal(cmd.input.Item?.lease_expires_at?.N, '1030'); + assert.equal(cmd.input.Item?.ttl?.N, '1040'); +} + +{ + const cfe = new ConditionalCheckFailedException({ + $metadata: {}, + message: 'no', + }); + const ddb = new StubDdb(() => { + throw cfe; + }); + const mgr = new LeaseManager(ddb as unknown as DynamoDBClient, 'tbl', { + now: () => 1000, + token: () => 'tok', + }); + + await assert.rejects( + () => mgr.acquire({ pk: 'CACHE#A', sk: 'LOCK' }, { leaseSeconds: 30 }), + (e) => e instanceof TheorydbError && e.code === 'ErrLeaseHeld', + ); +} + +{ + const ddb = new StubDdb(() => ({})); + const mgr = new LeaseManager(ddb as unknown as DynamoDBClient, 'tbl', { + now: () => 1000, + ttlBufferSeconds: 10, + }); + + const lease = await mgr.refresh( + { key: { pk: 'CACHE#A', sk: 'LOCK' }, token: 'tok', expiresAt: 0 }, + { leaseSeconds: 60 }, + ); + assert.equal(lease.expiresAt, 1060); + + const cmd = ddb.sent[0]; + assert.ok(cmd instanceof UpdateItemCommand); + assert.equal(cmd.input.TableName, 'tbl'); + assert.equal(cmd.input.ConditionExpression, '#tok = :tok AND #exp > :now'); + assert.equal(cmd.input.ExpressionAttributeValues?.[':tok']?.S, 'tok'); + assert.equal(cmd.input.ExpressionAttributeValues?.[':now']?.N, '1000'); + assert.equal(cmd.input.ExpressionAttributeValues?.[':exp']?.N, '1060'); + assert.equal(cmd.input.ExpressionAttributeValues?.[':ttl']?.N, '1070'); +} + +{ + const cfe = new ConditionalCheckFailedException({ + $metadata: {}, + message: 'no', + }); + const ddb = new StubDdb(() => { + throw cfe; + }); + const mgr = new LeaseManager(ddb as unknown as DynamoDBClient, 'tbl', { + now: () => 1000, + }); + + await assert.rejects( + () => + mgr.refresh( + { key: { pk: 'CACHE#A', sk: 'LOCK' }, token: 'tok', expiresAt: 0 }, + { leaseSeconds: 60 }, + ), + (e) => e instanceof TheorydbError && e.code === 'ErrLeaseNotOwned', + ); +} + +{ + const cfe = new ConditionalCheckFailedException({ + $metadata: {}, + message: 'no', + }); + const ddb = new StubDdb(() => { + throw cfe; + }); + const mgr = new LeaseManager(ddb as unknown as DynamoDBClient, 'tbl'); + + await mgr.release({ + key: { pk: 'CACHE#A', sk: 'LOCK' }, + token: 'tok', + expiresAt: 0, + }); + assert.ok(ddb.sent[0] instanceof DeleteItemCommand); +}