diff --git a/docs/module/db.md b/docs/module/db.md index 0c3bd9d..352abd2 100644 --- a/docs/module/db.md +++ b/docs/module/db.md @@ -1,13 +1,13 @@ # DB -SQLAlchemy async session management with transactions, table locking, and row-change polling. +SQLAlchemy async session management with transactions, table locking, advisory locking, and row-change polling. !!! info This module has been coded and tested to be compatible with PostgreSQL only. ## Overview -The `db` module provides helpers to create FastAPI dependencies and context managers for `AsyncSession`, along with utilities for nested transactions, table lock and polling for row changes. +The `db` module provides helpers to create FastAPI dependencies and context managers for `AsyncSession`, along with utilities for nested transactions, table locks, advisory locks, and polling for row changes. ## Session dependency @@ -69,6 +69,40 @@ async with lock_tables(session_maker=session_maker, tables=[User], mode=LockMode Available lock modes are defined in [`LockMode`](../reference/db.md#fastapi_toolsets.db.LockMode): `ACCESS_SHARE`, `ROW_SHARE`, `ROW_EXCLUSIVE`, `SHARE_UPDATE_EXCLUSIVE`, `SHARE`, `SHARE_ROW_EXCLUSIVE`, `EXCLUSIVE`, `ACCESS_EXCLUSIVE`. +## Advisory locking + +[`advisory_lock`](../reference/db.md#fastapi_toolsets.db.advisory_lock) acquires a PostgreSQL session-level advisory lock — an application-defined numeric mutex not tied to any row or table. The lock is released explicitly when the context exits, regardless of whether the transaction has committed. + +Common use cases: serializing background jobs, preventing concurrent imports, or any critical section that spans multiple transactions. + +```python +from fastapi_toolsets.db import advisory_lock + +# Blocking exclusive lock — waits until the lock is free +async with advisory_lock(session=session, key=42): + ... + +# Non-blocking — yields False immediately if already held +async with advisory_lock(session=session, key=42, nowait=True) as acquired: + if not acquired: + raise HTTPException(409, "Resource is locked") + +# Blocking with a timeout — raises DBAPIError if not acquired in time +async with advisory_lock(session=session, key=42, timeout="5s"): + ... + +# Shared — multiple readers allowed simultaneously, blocks exclusive writers +async with advisory_lock(session=session, key=42, shared=True): + ... + +# Two-integer key for namespacing (e.g. lock_type + resource_id) +async with advisory_lock(session=session, key=(1, user_id)): + ... +``` + +!!! note + Advisory locks use PostgreSQL session-level functions (`pg_advisory_lock` / `pg_advisory_unlock`). The lock is tied to the database connection, not the SQLAlchemy transaction — it is released when the context exits, even if the surrounding transaction is still open. + ## Row-change polling [`wait_for_row_change`](../reference/db.md#fastapi_toolsets.db.wait_for_row_change) polls a row until a specific column changes value, useful for waiting on async side effects: diff --git a/docs/reference/db.md b/docs/reference/db.md index 4517814..66b3204 100644 --- a/docs/reference/db.md +++ b/docs/reference/db.md @@ -7,6 +7,7 @@ You can import them directly from `fastapi_toolsets.db`: ```python from fastapi_toolsets.db import ( LockMode, + advisory_lock, cleanup_tables, create_database, create_db_dependency, @@ -30,6 +31,8 @@ from fastapi_toolsets.db import ( ## ::: fastapi_toolsets.db.lock_tables +## ::: fastapi_toolsets.db.advisory_lock + ## ::: fastapi_toolsets.db.wait_for_row_change ## ::: fastapi_toolsets.db.create_database diff --git a/src/fastapi_toolsets/db.py b/src/fastapi_toolsets/db.py index 783c40a..ae413e4 100644 --- a/src/fastapi_toolsets/db.py +++ b/src/fastapi_toolsets/db.py @@ -16,6 +16,7 @@ __all__ = [ "LockMode", + "advisory_lock", "cleanup_tables", "create_database", "create_db_context", @@ -204,6 +205,78 @@ async def _lock() -> AsyncGenerator[_SessionT, None]: return _lock() +@asynccontextmanager +async def advisory_lock( + session: AsyncSession, + key: int | tuple[int, int], + *, + shared: bool = False, + nowait: bool = False, + timeout: str | None = None, +) -> AsyncGenerator[bool, None]: + """Acquire a PostgreSQL session-level advisory lock. + + Args: + session: AsyncSession instance. + key: Lock key — a single ``int`` (bigint) or a ``(int, int)`` pair for namespacing. + shared: Acquire a shared lock (multiple holders allowed). Default is exclusive. + nowait: Return ``False`` immediately if the lock is unavailable instead of waiting. + timeout: Maximum wait time (e.g. ``"5s"``, ``"500ms"``). Raises ``DBAPIError`` + if exceeded. Ignored when *nowait* is ``True``. + + Yields: + ``True`` if the lock was acquired, ``False`` if *nowait* is ``True`` and the lock + is already held. + + Raises: + sqlalchemy.exc.DBAPIError: If *timeout* is set and the lock cannot be acquired + in time. + + Example: + ```python + from fastapi_toolsets.db import advisory_lock + + async with advisory_lock(session, 42): + ... + + async with advisory_lock(session, 42, nowait=True) as acquired: + if not acquired: + raise HTTPException(409, "Resource is locked") + + async with advisory_lock(session, 42, timeout="5s"): + ... + + async with advisory_lock(session, (1, user_id), shared=True): + ... + ``` + """ + suffix = "_shared" if shared else "" + acquire_fn = f"{'pg_try_advisory_lock' if nowait else 'pg_advisory_lock'}{suffix}" + release_fn = f"pg_advisory_unlock{suffix}" + + if isinstance(key, tuple): + k1, k2 = key + args = "CAST(:k1 AS integer), CAST(:k2 AS integer)" + params: dict[str, int] = {"k1": k1, "k2": k2} + else: + args = ":k" + params = {"k": key} + + acquire_sql = text(f"SELECT {acquire_fn}({args})") + release_sql = text(f"SELECT {release_fn}({args})") + + if timeout is not None and not nowait: + await session.execute(text(f"SET LOCAL lock_timeout='{timeout}'")) + + result = await session.execute(acquire_sql, params) + acquired = result.scalar() if nowait else True + try: + yield acquired + finally: + if acquired: + await session.execute(release_sql, params) + + async def create_database( db_name: str, *, diff --git a/tests/test_db.py b/tests/test_db.py index df7b4b9..01aeac6 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -27,6 +27,7 @@ from fastapi_toolsets.db import ( LockMode, + advisory_lock, cleanup_tables, create_database, create_db_context, @@ -324,6 +325,95 @@ async def test_lock_rollback_on_exception(self, session_maker): assert result is None +class TestAdvisoryLock: + """Tests for advisory_lock context manager (PostgreSQL-specific).""" + + @pytest.mark.anyio + async def test_blocking_exclusive_acquires(self, db_session: AsyncSession): + """Blocking exclusive lock acquires and yields True.""" + async with advisory_lock(db_session, 1001) as acquired: + assert acquired is True + + @pytest.mark.anyio + async def test_nowait_returns_true_when_free(self, db_session: AsyncSession): + """nowait=True yields True when the lock is available.""" + async with advisory_lock(db_session, 1002, nowait=True) as acquired: + assert acquired is True + + @pytest.mark.anyio + async def test_nowait_returns_false_when_contended(self, session_maker): + """nowait=True yields False when another session holds the lock.""" + async with session_maker() as holder: + async with holder.begin(): + async with advisory_lock(holder, 1003): + async with session_maker() as contender: + async with contender.begin(): + async with advisory_lock( + contender, 1003, nowait=True + ) as acquired: + assert acquired is False + + @pytest.mark.anyio + async def test_shared_allows_concurrent_readers(self, session_maker): + """Two shared locks on the same key are both acquired.""" + async with session_maker() as s1, session_maker() as s2: + async with s1.begin(), s2.begin(): + async with advisory_lock(s1, 1004, shared=True) as a1: + async with advisory_lock(s2, 1004, shared=True, nowait=True) as a2: + assert a1 is True + assert a2 is True + + @pytest.mark.anyio + async def test_tuple_key(self, db_session: AsyncSession): + """(int, int) key variant acquires the lock.""" + async with advisory_lock(db_session, (7, 42)) as acquired: + assert acquired is True + + @pytest.mark.anyio + async def test_tuple_key_nowait_contended(self, session_maker): + """Tuple key nowait returns False when contended.""" + async with session_maker() as holder: + async with holder.begin(): + async with advisory_lock(holder, (7, 99)): + async with session_maker() as contender: + async with contender.begin(): + async with advisory_lock( + contender, (7, 99), nowait=True + ) as acquired: + assert acquired is False + + @pytest.mark.anyio + async def test_lock_released_at_context_exit(self, session_maker): + """Lock is released when the context exits, even while the transaction is still open.""" + async with session_maker() as s1: + async with s1.begin(): + async with advisory_lock(s1, 1005): + pass # lock released here — transaction still active + + async with session_maker() as s2: + async with s2.begin(): + async with advisory_lock(s2, 1005, nowait=True) as acquired: + assert ( + acquired is True + ) # s1 still in transaction but lock is free + + @pytest.mark.anyio + async def test_timeout_raises_when_contended(self, session_maker): + """timeout= raises when the lock cannot be acquired within the interval.""" + from sqlalchemy.exc import DBAPIError + + async with session_maker() as holder: + async with holder.begin(): + async with advisory_lock(holder, 1006): + async with session_maker() as contender: + async with contender.begin(): + with pytest.raises(DBAPIError): + async with advisory_lock( + contender, 1006, timeout="10ms" + ): + pass + + class TestWaitForRowChange: """Tests for wait_for_row_change polling function."""