Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fair-seals-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

Fix PostgreSQL shard advisory lock collisions across disjoint shard groups.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ scratchpad/**/*

# Codex
.agents/

# OpenCode
.opencode/session-logs
.opencode/plugins
.opencode/package-lock.json
opencode.jsonc
58 changes: 29 additions & 29 deletions packages/effect/src/unstable/cluster/SqlRunnerStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,18 @@ export const make = Effect.fnUntraced(function*(options: {
return Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray<string>) {
const [conn, pid] = yield* lockConn!.await
const acquiredShardIds: Array<string> = []
const toAcquire = new Map(shardIds.map((shardId) => [lockNumbers.get(shardId)!, shardId]))
const takenLocks = yield* conn.executeValues(
`SELECT objid FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = ${pid} ORDER BY objid`,
[]
const toAcquire = new Set(shardIds)
const takenLocks = yield* conn.executeUnprepared(
`SELECT ${pgHeldLocks(shardIds, pid)}`,
[],
undefined
)
for (let i = 0; i < takenLocks.length; i++) {
const lockNum = takenLocks[i][0] as number
acquiredShardIds.push(lockNumbersReverse.get(lockNum)!)
toAcquire.delete(lockNum)
const heldLocks = takenLocks[0] as Record<string, boolean>
for (const shardId in heldLocks) {
if (heldLocks[shardId]) {
acquiredShardIds.push(shardId)
toAcquire.delete(shardId)
}
}
if (toAcquire.size === 0) {
return acquiredShardIds
Expand Down Expand Up @@ -389,19 +392,6 @@ export const make = Effect.fnUntraced(function*(options: {
}
})

const lockNumbers = new Map<string, number>()
const lockNumbersReverse = new Map<number, string>()
for (let i = 0; i < config.shardGroups.length; i++) {
const group = config.shardGroups[i]
const base = (i + 1) * 1000000
for (let shard = 1; shard <= config.shardsPerGroup; shard++) {
const shardId = ShardId.make(group, shard).toString()
const lockNum = base + shard
lockNumbers.set(shardId, lockNum)
lockNumbersReverse.set(lockNum, shardId)
}
}

const shardIdsIndex = new Map<string, number>()
const lockNames = new Map<string, string>()
const lockNamesReverse = new Map<string, string>()
Expand All @@ -419,11 +409,22 @@ export const make = Effect.fnUntraced(function*(options: {
}
}

const pgLocks = (shardIdsMap: Map<number, string>) =>
Array.from(
shardIdsMap.entries(),
([lockNum, shardId]) => `pg_try_advisory_lock(${lockNum}) AS "${shardId}"`
).join(", ")
const pgLockSeed = 1337
const pgLockKey = (shardId: string) => `hashtextextended(${wrapString(shardId)}, ${pgLockSeed})`
const pgLockExists = (shardId: string, pid: number | "pg_backend_pid()") =>
`EXISTS (
SELECT 1 FROM pg_locks
WHERE locktype = 'advisory'
AND granted = true
AND pid = ${pid}
AND ((classid::bigint << 32) | objid::bigint) = ${pgLockKey(shardId)}
)`

const pgLocks = (shardIds: ReadonlySet<string>) =>
Array.from(shardIds, (shardId) => `pg_try_advisory_lock(${pgLockKey(shardId)}) AS "${shardId}"`).join(", ")

const pgHeldLocks = (shardIds: ReadonlyArray<string>, pid: number) =>
shardIds.map((shardId) => `${pgLockExists(shardId, pid)} AS "${shardId}"`).join(", ")

const mysqlLocks = (shardIds: ReadonlyArray<string>) =>
shardIds.map((shardId) => `GET_LOCK('${lockNames.get(shardId)!}', 0) AS "${shardId}"`).join(", ")
Expand Down Expand Up @@ -551,12 +552,11 @@ export const make = Effect.fnUntraced(function*(options: {
}
return Effect.fnUntraced(
function*(_address, shardId) {
const lockNum = lockNumbers.get(shardId)!
for (let i = 0; i < 5; i++) {
const [conn] = yield* lockConn!.await
yield* conn.executeRaw(`SELECT pg_advisory_unlock(${lockNum})`, [])
yield* conn.executeRaw(`SELECT pg_advisory_unlock(${pgLockKey(shardId)})`, [])
const takenLocks = yield* conn.executeValues(
`SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = pg_backend_pid() AND objid = ${lockNum}`,
`SELECT 1 WHERE ${pgLockExists(shardId, "pg_backend_pid()")}`,
[]
)
if (takenLocks.length === 0) return
Expand Down
37 changes: 35 additions & 2 deletions packages/platform-node/test/cluster/SqlRunnerStorage.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { SqliteClient } from "@effect/sql-sqlite-node"
import { describe, expect, it } from "@effect/vitest"
import { Effect, FileSystem, Layer } from "effect"
import { assert, describe, expect, it } from "@effect/vitest"
import { Context, Effect, FileSystem, Layer } from "effect"
import {
Runner,
RunnerAddress,
Expand Down Expand Up @@ -88,9 +88,42 @@ describe("SqlRunnerStorage", () => {
}))
})
})

it.effect("pg advisory locks do not collide across shard groups", () =>
Effect.scoped(Effect.gen(function*() {
const memoMap = yield* Layer.makeMemoMap
const scope = yield* Effect.scope
const pgLayer = Layer.orDie(PgContainer.layerClient)
const alphaLayer = StorageLive.pipe(
Layer.provideMerge(pgLayer),
Layer.provide(ShardingConfig.layer({
shardGroups: ["alpha"],
shardsPerGroup: 1
}))
)
const bravoLayer = StorageLive.pipe(
Layer.provideMerge(pgLayer),
Layer.provide(ShardingConfig.layer({
shardGroups: ["bravo"],
shardsPerGroup: 1
}))
)

const alphaContext = yield* Layer.buildWithMemoMap(alphaLayer, memoMap, scope)
const bravoContext = yield* Layer.buildWithMemoMap(bravoLayer, memoMap, scope)
const alphaStorage = Context.get(alphaContext, RunnerStorage.RunnerStorage)
const bravoStorage = Context.get(bravoContext, RunnerStorage.RunnerStorage)

const alphaAcquired = yield* alphaStorage.acquire(runnerAddress1, [ShardId.make("alpha", 1)])
const bravoAcquired = yield* bravoStorage.acquire(runnerAddress2, [ShardId.make("bravo", 1)])

assert.deepStrictEqual(alphaAcquired.map((_) => _.toString()), ["alpha:1"])
assert.deepStrictEqual(bravoAcquired.map((_) => _.toString()), ["bravo:1"])
})))
})

const runnerAddress1 = RunnerAddress.make("localhost", 1234)
const runnerAddress2 = RunnerAddress.make("localhost", 1235)

const SqliteLayer = Effect.gen(function*() {
const fs = yield* FileSystem.FileSystem
Expand Down
Loading