diff --git a/async_postgres/pg_advisory_lock.nim b/async_postgres/pg_advisory_lock.nim index d29b293..d7ef202 100644 --- a/async_postgres/pg_advisory_lock.nim +++ b/async_postgres/pg_advisory_lock.nim @@ -60,40 +60,68 @@ import async_backend, pg_types, pg_connection, pg_client privateAccess(PgConnection) +# Internal body templates +# +# The acquire/try/unlock procedures below differ only in the SQL function they +# call and how they touch ``heldSessionLocks``. These templates hold the shared +# body so each public proc collapses to a single documented call. They expand +# inside ``{.async.}`` procs, so the ``await`` runs in the calling proc. + +template acquireSessionLock( + conn: PgConnection, sql: string, params: seq[PgParam], t: Duration +) = + discard await conn.queryValue(sql, params, timeout = t) + inc conn.heldSessionLocks + +template trySessionLock( + conn: PgConnection, sql: string, params: seq[PgParam], t: Duration +): bool = + let acquired = await conn.queryValue(bool, sql, params, timeout = t) + if acquired: + inc conn.heldSessionLocks + acquired + +template unlockSessionLock( + conn: PgConnection, sql: string, params: seq[PgParam], t: Duration +): bool = + let released = await conn.queryValue(bool, sql, params, timeout = t) + if released and conn.heldSessionLocks > 0: + dec conn.heldSessionLocks + released + +template acquireXactLock( + conn: PgConnection, sql: string, params: seq[PgParam], t: Duration +) = + discard await conn.queryValue(sql, params, timeout = t) + +template tryXactLock( + conn: PgConnection, sql: string, params: seq[PgParam], t: Duration +): bool = + await conn.queryValue(bool, sql, params, timeout = t) + # Session-level exclusive locks proc advisoryLock*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a session-level exclusive advisory lock, blocking until available. - discard await conn.queryValue( - "SELECT pg_advisory_lock($1)", @[toPgParam(key)], timeout = timeout - ) - inc conn.heldSessionLocks + acquireSessionLock(conn, "SELECT pg_advisory_lock($1)", @[toPgParam(key)], timeout) proc advisoryTryLock*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a session-level exclusive advisory lock without blocking. ## Returns ``true`` if the lock was acquired. - let acquired = await conn.queryValue( - bool, "SELECT pg_try_advisory_lock($1)", @[toPgParam(key)], timeout = timeout - ) - if acquired: - inc conn.heldSessionLocks - return acquired + return + trySessionLock(conn, "SELECT pg_try_advisory_lock($1)", @[toPgParam(key)], timeout) proc advisoryUnlock*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Release a session-level exclusive advisory lock. ## Returns ``true`` if the lock was held and successfully released. - let released = await conn.queryValue( - bool, "SELECT pg_advisory_unlock($1)", @[toPgParam(key)], timeout = timeout - ) - if released and conn.heldSessionLocks > 0: - dec conn.heldSessionLocks - return released + return + unlockSessionLock(conn, "SELECT pg_advisory_unlock($1)", @[toPgParam(key)], timeout) # Session-level shared locks @@ -101,34 +129,27 @@ proc advisoryLockShared*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a session-level shared advisory lock, blocking until available. - discard await conn.queryValue( - "SELECT pg_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout + acquireSessionLock( + conn, "SELECT pg_advisory_lock_shared($1)", @[toPgParam(key)], timeout ) - inc conn.heldSessionLocks proc advisoryTryLockShared*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a session-level shared advisory lock without blocking. ## Returns ``true`` if the lock was acquired. - let acquired = await conn.queryValue( - bool, "SELECT pg_try_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout + return trySessionLock( + conn, "SELECT pg_try_advisory_lock_shared($1)", @[toPgParam(key)], timeout ) - if acquired: - inc conn.heldSessionLocks - return acquired proc advisoryUnlockShared*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Release a session-level shared advisory lock. ## Returns ``true`` if the lock was held and successfully released. - let released = await conn.queryValue( - bool, "SELECT pg_advisory_unlock_shared($1)", @[toPgParam(key)], timeout = timeout + return unlockSessionLock( + conn, "SELECT pg_advisory_unlock_shared($1)", @[toPgParam(key)], timeout ) - if released and conn.heldSessionLocks > 0: - dec conn.heldSessionLocks - return released proc advisoryUnlockAll*( conn: PgConnection, timeout: Duration = ZeroDuration @@ -144,17 +165,15 @@ proc advisoryLockXact*( ): Future[void] {.async.} = ## Acquire a transaction-level exclusive advisory lock, blocking until available. ## Automatically released at end of the current transaction. - discard await conn.queryValue( - "SELECT pg_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout - ) + acquireXactLock(conn, "SELECT pg_advisory_xact_lock($1)", @[toPgParam(key)], timeout) proc advisoryTryLockXact*( conn: PgConnection, key: int64, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a transaction-level exclusive advisory lock without blocking. ## Returns ``true`` if the lock was acquired. - return await conn.queryValue( - bool, "SELECT pg_try_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout + return tryXactLock( + conn, "SELECT pg_try_advisory_xact_lock($1)", @[toPgParam(key)], timeout ) # Transaction-level shared locks @@ -164,8 +183,8 @@ proc advisoryLockXactShared*( ): Future[void] {.async.} = ## Acquire a transaction-level shared advisory lock, blocking until available. ## Automatically released at end of the current transaction. - discard await conn.queryValue( - "SELECT pg_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout = timeout + acquireXactLock( + conn, "SELECT pg_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout ) proc advisoryTryLockXactShared*( @@ -173,11 +192,8 @@ proc advisoryTryLockXactShared*( ): Future[bool] {.async.} = ## Try to acquire a transaction-level shared advisory lock without blocking. ## Returns ``true`` if the lock was acquired. - return await conn.queryValue( - bool, - "SELECT pg_try_advisory_xact_lock_shared($1)", - @[toPgParam(key)], - timeout = timeout, + return tryXactLock( + conn, "SELECT pg_try_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout ) # Two-key (int32, int32) variants — Session-level exclusive @@ -186,40 +202,34 @@ proc advisoryLock*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a session-level exclusive advisory lock using two int32 keys. - discard await conn.queryValue( + acquireSessionLock( + conn, "SELECT pg_advisory_lock($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - inc conn.heldSessionLocks proc advisoryTryLock*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a session-level exclusive advisory lock (two int32 keys). - let acquired = await conn.queryValue( - bool, + return trySessionLock( + conn, "SELECT pg_try_advisory_lock($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - if acquired: - inc conn.heldSessionLocks - return acquired proc advisoryUnlock*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Release a session-level exclusive advisory lock (two int32 keys). - let released = await conn.queryValue( - bool, + return unlockSessionLock( + conn, "SELECT pg_advisory_unlock($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - if released and conn.heldSessionLocks > 0: - dec conn.heldSessionLocks - return released # Two-key (int32, int32) variants — Session-level shared @@ -227,40 +237,34 @@ proc advisoryLockShared*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a session-level shared advisory lock using two int32 keys. - discard await conn.queryValue( + acquireSessionLock( + conn, "SELECT pg_advisory_lock_shared($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - inc conn.heldSessionLocks proc advisoryTryLockShared*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a session-level shared advisory lock (two int32 keys). - let acquired = await conn.queryValue( - bool, + return trySessionLock( + conn, "SELECT pg_try_advisory_lock_shared($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - if acquired: - inc conn.heldSessionLocks - return acquired proc advisoryUnlockShared*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Release a session-level shared advisory lock (two int32 keys). - let released = await conn.queryValue( - bool, + return unlockSessionLock( + conn, "SELECT pg_advisory_unlock_shared($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) - if released and conn.heldSessionLocks > 0: - dec conn.heldSessionLocks - return released # Two-key (int32, int32) variants — Transaction-level exclusive @@ -268,21 +272,22 @@ proc advisoryLockXact*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a transaction-level exclusive advisory lock (two int32 keys). - discard await conn.queryValue( + acquireXactLock( + conn, "SELECT pg_advisory_xact_lock($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) proc advisoryTryLockXact*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a transaction-level exclusive advisory lock (two int32 keys). - return await conn.queryValue( - bool, + return tryXactLock( + conn, "SELECT pg_try_advisory_xact_lock($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) # Two-key (int32, int32) variants — Transaction-level shared @@ -291,21 +296,22 @@ proc advisoryLockXactShared*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[void] {.async.} = ## Acquire a transaction-level shared advisory lock (two int32 keys). - discard await conn.queryValue( + acquireXactLock( + conn, "SELECT pg_advisory_xact_lock_shared($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) proc advisoryTryLockXactShared*( conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration ): Future[bool] {.async.} = ## Try to acquire a transaction-level shared advisory lock (two int32 keys). - return await conn.queryValue( - bool, + return tryXactLock( + conn, "SELECT pg_try_advisory_xact_lock_shared($1, $2)", @[toPgParam(key1), toPgParam(key2)], - timeout = timeout, + timeout, ) # Convenience macros — session-level