Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 87 additions & 81 deletions async_postgres/pg_advisory_lock.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,75 +60,96 @@ import async_backend, pg_types, pg_connection, pg_client

privateAccess(PgConnection)

# Internal body templates
#
# The acquire/try/unlock procedures below differ only in the SQL function they
# call and how they touch ``heldSessionLocks``. These templates hold the shared
# body so each public proc collapses to a single documented call. They expand
# inside ``{.async.}`` procs, so the ``await`` runs in the calling proc.

template acquireSessionLock(
conn: PgConnection, sql: string, params: seq[PgParam], t: Duration
) =
discard await conn.queryValue(sql, params, timeout = t)
inc conn.heldSessionLocks

template trySessionLock(
conn: PgConnection, sql: string, params: seq[PgParam], t: Duration
): bool =
let acquired = await conn.queryValue(bool, sql, params, timeout = t)
if acquired:
inc conn.heldSessionLocks
acquired

template unlockSessionLock(
conn: PgConnection, sql: string, params: seq[PgParam], t: Duration
): bool =
let released = await conn.queryValue(bool, sql, params, timeout = t)
if released and conn.heldSessionLocks > 0:
dec conn.heldSessionLocks
released

template acquireXactLock(
conn: PgConnection, sql: string, params: seq[PgParam], t: Duration
) =
discard await conn.queryValue(sql, params, timeout = t)

template tryXactLock(
conn: PgConnection, sql: string, params: seq[PgParam], t: Duration
): bool =
await conn.queryValue(bool, sql, params, timeout = t)

# Session-level exclusive locks

proc advisoryLock*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a session-level exclusive advisory lock, blocking until available.
discard await conn.queryValue(
"SELECT pg_advisory_lock($1)", @[toPgParam(key)], timeout = timeout
)
inc conn.heldSessionLocks
acquireSessionLock(conn, "SELECT pg_advisory_lock($1)", @[toPgParam(key)], timeout)

proc advisoryTryLock*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a session-level exclusive advisory lock without blocking.
## Returns ``true`` if the lock was acquired.
let acquired = await conn.queryValue(
bool, "SELECT pg_try_advisory_lock($1)", @[toPgParam(key)], timeout = timeout
)
if acquired:
inc conn.heldSessionLocks
return acquired
return
trySessionLock(conn, "SELECT pg_try_advisory_lock($1)", @[toPgParam(key)], timeout)

proc advisoryUnlock*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Release a session-level exclusive advisory lock.
## Returns ``true`` if the lock was held and successfully released.
let released = await conn.queryValue(
bool, "SELECT pg_advisory_unlock($1)", @[toPgParam(key)], timeout = timeout
)
if released and conn.heldSessionLocks > 0:
dec conn.heldSessionLocks
return released
return
unlockSessionLock(conn, "SELECT pg_advisory_unlock($1)", @[toPgParam(key)], timeout)

# Session-level shared locks

proc advisoryLockShared*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a session-level shared advisory lock, blocking until available.
discard await conn.queryValue(
"SELECT pg_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout
acquireSessionLock(
conn, "SELECT pg_advisory_lock_shared($1)", @[toPgParam(key)], timeout
)
inc conn.heldSessionLocks

proc advisoryTryLockShared*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a session-level shared advisory lock without blocking.
## Returns ``true`` if the lock was acquired.
let acquired = await conn.queryValue(
bool, "SELECT pg_try_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout
return trySessionLock(
conn, "SELECT pg_try_advisory_lock_shared($1)", @[toPgParam(key)], timeout
)
if acquired:
inc conn.heldSessionLocks
return acquired

proc advisoryUnlockShared*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Release a session-level shared advisory lock.
## Returns ``true`` if the lock was held and successfully released.
let released = await conn.queryValue(
bool, "SELECT pg_advisory_unlock_shared($1)", @[toPgParam(key)], timeout = timeout
return unlockSessionLock(
conn, "SELECT pg_advisory_unlock_shared($1)", @[toPgParam(key)], timeout
)
if released and conn.heldSessionLocks > 0:
dec conn.heldSessionLocks
return released

proc advisoryUnlockAll*(
conn: PgConnection, timeout: Duration = ZeroDuration
Expand All @@ -144,17 +165,15 @@ proc advisoryLockXact*(
): Future[void] {.async.} =
## Acquire a transaction-level exclusive advisory lock, blocking until available.
## Automatically released at end of the current transaction.
discard await conn.queryValue(
"SELECT pg_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout
)
acquireXactLock(conn, "SELECT pg_advisory_xact_lock($1)", @[toPgParam(key)], timeout)

proc advisoryTryLockXact*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a transaction-level exclusive advisory lock without blocking.
## Returns ``true`` if the lock was acquired.
return await conn.queryValue(
bool, "SELECT pg_try_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout
return tryXactLock(
conn, "SELECT pg_try_advisory_xact_lock($1)", @[toPgParam(key)], timeout
)

# Transaction-level shared locks
Expand All @@ -164,20 +183,17 @@ proc advisoryLockXactShared*(
): Future[void] {.async.} =
## Acquire a transaction-level shared advisory lock, blocking until available.
## Automatically released at end of the current transaction.
discard await conn.queryValue(
"SELECT pg_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout = timeout
acquireXactLock(
conn, "SELECT pg_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout
)

proc advisoryTryLockXactShared*(
conn: PgConnection, key: int64, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a transaction-level shared advisory lock without blocking.
## Returns ``true`` if the lock was acquired.
return await conn.queryValue(
bool,
"SELECT pg_try_advisory_xact_lock_shared($1)",
@[toPgParam(key)],
timeout = timeout,
return tryXactLock(
conn, "SELECT pg_try_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout
)

# Two-key (int32, int32) variants — Session-level exclusive
Expand All @@ -186,103 +202,92 @@ proc advisoryLock*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a session-level exclusive advisory lock using two int32 keys.
discard await conn.queryValue(
acquireSessionLock(
conn,
"SELECT pg_advisory_lock($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
inc conn.heldSessionLocks

proc advisoryTryLock*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a session-level exclusive advisory lock (two int32 keys).
let acquired = await conn.queryValue(
bool,
return trySessionLock(
conn,
"SELECT pg_try_advisory_lock($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
if acquired:
inc conn.heldSessionLocks
return acquired

proc advisoryUnlock*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Release a session-level exclusive advisory lock (two int32 keys).
let released = await conn.queryValue(
bool,
return unlockSessionLock(
conn,
"SELECT pg_advisory_unlock($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
if released and conn.heldSessionLocks > 0:
dec conn.heldSessionLocks
return released

# Two-key (int32, int32) variants — Session-level shared

proc advisoryLockShared*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a session-level shared advisory lock using two int32 keys.
discard await conn.queryValue(
acquireSessionLock(
conn,
"SELECT pg_advisory_lock_shared($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
inc conn.heldSessionLocks

proc advisoryTryLockShared*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a session-level shared advisory lock (two int32 keys).
let acquired = await conn.queryValue(
bool,
return trySessionLock(
conn,
"SELECT pg_try_advisory_lock_shared($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
if acquired:
inc conn.heldSessionLocks
return acquired

proc advisoryUnlockShared*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Release a session-level shared advisory lock (two int32 keys).
let released = await conn.queryValue(
bool,
return unlockSessionLock(
conn,
"SELECT pg_advisory_unlock_shared($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)
if released and conn.heldSessionLocks > 0:
dec conn.heldSessionLocks
return released

# Two-key (int32, int32) variants — Transaction-level exclusive

proc advisoryLockXact*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a transaction-level exclusive advisory lock (two int32 keys).
discard await conn.queryValue(
acquireXactLock(
conn,
"SELECT pg_advisory_xact_lock($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)

proc advisoryTryLockXact*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a transaction-level exclusive advisory lock (two int32 keys).
return await conn.queryValue(
bool,
return tryXactLock(
conn,
"SELECT pg_try_advisory_xact_lock($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)

# Two-key (int32, int32) variants — Transaction-level shared
Expand All @@ -291,21 +296,22 @@ proc advisoryLockXactShared*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[void] {.async.} =
## Acquire a transaction-level shared advisory lock (two int32 keys).
discard await conn.queryValue(
acquireXactLock(
conn,
"SELECT pg_advisory_xact_lock_shared($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)

proc advisoryTryLockXactShared*(
conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration
): Future[bool] {.async.} =
## Try to acquire a transaction-level shared advisory lock (two int32 keys).
return await conn.queryValue(
bool,
return tryXactLock(
conn,
"SELECT pg_try_advisory_xact_lock_shared($1, $2)",
@[toPgParam(key1), toPgParam(key2)],
timeout = timeout,
timeout,
)

# Convenience macros — session-level
Expand Down