Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions docs/module/db.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# DB

SQLAlchemy async session management with transactions, table locking, and row-change polling.
SQLAlchemy async session management with transactions, table locking, advisory locking, and row-change polling.

!!! info
This module has been coded and tested to be compatible with PostgreSQL only.

## Overview

The `db` module provides helpers to create FastAPI dependencies and context managers for `AsyncSession`, along with utilities for nested transactions, table lock and polling for row changes.
The `db` module provides helpers to create FastAPI dependencies and context managers for `AsyncSession`, along with utilities for nested transactions, table locks, advisory locks, and polling for row changes.

## Session dependency

Expand Down Expand Up @@ -69,6 +69,40 @@ async with lock_tables(session_maker=session_maker, tables=[User], mode=LockMode

Available lock modes are defined in [`LockMode`](../reference/db.md#fastapi_toolsets.db.LockMode): `ACCESS_SHARE`, `ROW_SHARE`, `ROW_EXCLUSIVE`, `SHARE_UPDATE_EXCLUSIVE`, `SHARE`, `SHARE_ROW_EXCLUSIVE`, `EXCLUSIVE`, `ACCESS_EXCLUSIVE`.

## Advisory locking

[`advisory_lock`](../reference/db.md#fastapi_toolsets.db.advisory_lock) acquires a PostgreSQL session-level advisory lock — an application-defined numeric mutex not tied to any row or table. The lock is released explicitly when the context exits, regardless of whether the transaction has committed.

Common use cases: serializing background jobs, preventing concurrent imports, or any critical section that spans multiple transactions.

```python
from fastapi_toolsets.db import advisory_lock

# Blocking exclusive lock — waits until the lock is free
async with advisory_lock(session=session, key=42):
...

# Non-blocking — yields False immediately if already held
async with advisory_lock(session=session, key=42, nowait=True) as acquired:
if not acquired:
raise HTTPException(409, "Resource is locked")

# Blocking with a timeout — raises DBAPIError if not acquired in time
async with advisory_lock(session=session, key=42, timeout="5s"):
...

# Shared — multiple readers allowed simultaneously, blocks exclusive writers
async with advisory_lock(session=session, key=42, shared=True):
...

# Two-integer key for namespacing (e.g. lock_type + resource_id)
async with advisory_lock(session=session, key=(1, user_id)):
...
```

!!! note
Advisory locks use PostgreSQL session-level functions (`pg_advisory_lock` / `pg_advisory_unlock`). The lock is tied to the database connection, not the SQLAlchemy transaction — it is released when the context exits, even if the surrounding transaction is still open.

## Row-change polling

[`wait_for_row_change`](../reference/db.md#fastapi_toolsets.db.wait_for_row_change) polls a row until a specific column changes value, useful for waiting on async side effects:
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/db.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ You can import them directly from `fastapi_toolsets.db`:
```python
from fastapi_toolsets.db import (
LockMode,
advisory_lock,
cleanup_tables,
create_database,
create_db_dependency,
Expand All @@ -30,6 +31,8 @@ from fastapi_toolsets.db import (

## ::: fastapi_toolsets.db.lock_tables

## ::: fastapi_toolsets.db.advisory_lock

## ::: fastapi_toolsets.db.wait_for_row_change

## ::: fastapi_toolsets.db.create_database
Expand Down
73 changes: 73 additions & 0 deletions src/fastapi_toolsets/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

__all__ = [
"LockMode",
"advisory_lock",
"cleanup_tables",
"create_database",
"create_db_context",
Expand Down Expand Up @@ -204,6 +205,78 @@ async def _lock() -> AsyncGenerator[_SessionT, None]:
return _lock()


@asynccontextmanager
async def advisory_lock(
session: AsyncSession,
key: int | tuple[int, int],
*,
shared: bool = False,
nowait: bool = False,
timeout: str | None = None,
) -> AsyncGenerator[bool, None]:
"""Acquire a PostgreSQL session-level advisory lock.

Args:
session: AsyncSession instance.
key: Lock key — a single ``int`` (bigint) or a ``(int, int)`` pair for namespacing.
shared: Acquire a shared lock (multiple holders allowed). Default is exclusive.
nowait: Return ``False`` immediately if the lock is unavailable instead of waiting.
timeout: Maximum wait time (e.g. ``"5s"``, ``"500ms"``). Raises ``DBAPIError``
if exceeded. Ignored when *nowait* is ``True``.

Yields:
``True`` if the lock was acquired, ``False`` if *nowait* is ``True`` and the lock
is already held.

Raises:
sqlalchemy.exc.DBAPIError: If *timeout* is set and the lock cannot be acquired
in time.

Example:
```python
from fastapi_toolsets.db import advisory_lock

async with advisory_lock(session, 42):
...

async with advisory_lock(session, 42, nowait=True) as acquired:
if not acquired:
raise HTTPException(409, "Resource is locked")

async with advisory_lock(session, 42, timeout="5s"):
...

async with advisory_lock(session, (1, user_id), shared=True):
...
```
"""
suffix = "_shared" if shared else ""
acquire_fn = f"{'pg_try_advisory_lock' if nowait else 'pg_advisory_lock'}{suffix}"
release_fn = f"pg_advisory_unlock{suffix}"

if isinstance(key, tuple):
k1, k2 = key
args = "CAST(:k1 AS integer), CAST(:k2 AS integer)"
params: dict[str, int] = {"k1": k1, "k2": k2}
else:
args = ":k"
params = {"k": key}

acquire_sql = text(f"SELECT {acquire_fn}({args})")
release_sql = text(f"SELECT {release_fn}({args})")

if timeout is not None and not nowait:
await session.execute(text(f"SET LOCAL lock_timeout='{timeout}'"))

result = await session.execute(acquire_sql, params)
acquired = result.scalar() if nowait else True
try:
yield acquired
finally:
if acquired:
await session.execute(release_sql, params)


async def create_database(
db_name: str,
*,
Expand Down
90 changes: 90 additions & 0 deletions tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from fastapi_toolsets.db import (
LockMode,
advisory_lock,
cleanup_tables,
create_database,
create_db_context,
Expand Down Expand Up @@ -324,6 +325,95 @@ async def test_lock_rollback_on_exception(self, session_maker):
assert result is None


class TestAdvisoryLock:
"""Tests for advisory_lock context manager (PostgreSQL-specific)."""

@pytest.mark.anyio
async def test_blocking_exclusive_acquires(self, db_session: AsyncSession):
"""Blocking exclusive lock acquires and yields True."""
async with advisory_lock(db_session, 1001) as acquired:
assert acquired is True

@pytest.mark.anyio
async def test_nowait_returns_true_when_free(self, db_session: AsyncSession):
"""nowait=True yields True when the lock is available."""
async with advisory_lock(db_session, 1002, nowait=True) as acquired:
assert acquired is True

@pytest.mark.anyio
async def test_nowait_returns_false_when_contended(self, session_maker):
"""nowait=True yields False when another session holds the lock."""
async with session_maker() as holder:
async with holder.begin():
async with advisory_lock(holder, 1003):
async with session_maker() as contender:
async with contender.begin():
async with advisory_lock(
contender, 1003, nowait=True
) as acquired:
assert acquired is False

@pytest.mark.anyio
async def test_shared_allows_concurrent_readers(self, session_maker):
"""Two shared locks on the same key are both acquired."""
async with session_maker() as s1, session_maker() as s2:
async with s1.begin(), s2.begin():
async with advisory_lock(s1, 1004, shared=True) as a1:
async with advisory_lock(s2, 1004, shared=True, nowait=True) as a2:
assert a1 is True
assert a2 is True

@pytest.mark.anyio
async def test_tuple_key(self, db_session: AsyncSession):
"""(int, int) key variant acquires the lock."""
async with advisory_lock(db_session, (7, 42)) as acquired:
assert acquired is True

@pytest.mark.anyio
async def test_tuple_key_nowait_contended(self, session_maker):
"""Tuple key nowait returns False when contended."""
async with session_maker() as holder:
async with holder.begin():
async with advisory_lock(holder, (7, 99)):
async with session_maker() as contender:
async with contender.begin():
async with advisory_lock(
contender, (7, 99), nowait=True
) as acquired:
assert acquired is False

@pytest.mark.anyio
async def test_lock_released_at_context_exit(self, session_maker):
"""Lock is released when the context exits, even while the transaction is still open."""
async with session_maker() as s1:
async with s1.begin():
async with advisory_lock(s1, 1005):
pass # lock released here — transaction still active

async with session_maker() as s2:
async with s2.begin():
async with advisory_lock(s2, 1005, nowait=True) as acquired:
assert (
acquired is True
) # s1 still in transaction but lock is free

@pytest.mark.anyio
async def test_timeout_raises_when_contended(self, session_maker):
"""timeout= raises when the lock cannot be acquired within the interval."""
from sqlalchemy.exc import DBAPIError

async with session_maker() as holder:
async with holder.begin():
async with advisory_lock(holder, 1006):
async with session_maker() as contender:
async with contender.begin():
with pytest.raises(DBAPIError):
async with advisory_lock(
contender, 1006, timeout="10ms"
):
pass


class TestWaitForRowChange:
"""Tests for wait_for_row_change polling function."""

Expand Down