diff --git a/.changeset/fair-seals-check.md b/.changeset/fair-seals-check.md new file mode 100644 index 0000000000..f1ee8041bb --- /dev/null +++ b/.changeset/fair-seals-check.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +Fix PostgreSQL shard advisory lock collisions across disjoint shard groups. diff --git a/.gitignore b/.gitignore index c0723a42a0..2860cf441f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,9 @@ scratchpad/**/* # Codex .agents/ + +# OpenCode +.opencode/session-logs +.opencode/plugins +.opencode/package-lock.json +opencode.jsonc diff --git a/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts b/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts index 8cfb747cea..2928f310da 100644 --- a/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts +++ b/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts @@ -283,15 +283,18 @@ export const make = Effect.fnUntraced(function*(options: { return Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray) { const [conn, pid] = yield* lockConn!.await const acquiredShardIds: Array = [] - const toAcquire = new Map(shardIds.map((shardId) => [lockNumbers.get(shardId)!, shardId])) - const takenLocks = yield* conn.executeValues( - `SELECT objid FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = ${pid} ORDER BY objid`, - [] + const toAcquire = new Set(shardIds) + const takenLocks = yield* conn.executeUnprepared( + `SELECT ${pgHeldLocks(shardIds, pid)}`, + [], + undefined ) - for (let i = 0; i < takenLocks.length; i++) { - const lockNum = takenLocks[i][0] as number - acquiredShardIds.push(lockNumbersReverse.get(lockNum)!) - toAcquire.delete(lockNum) + const heldLocks = takenLocks[0] as Record + for (const shardId in heldLocks) { + if (heldLocks[shardId]) { + acquiredShardIds.push(shardId) + toAcquire.delete(shardId) + } } if (toAcquire.size === 0) { return acquiredShardIds @@ -389,19 +392,6 @@ export const make = Effect.fnUntraced(function*(options: { } }) - const lockNumbers = new Map() - const lockNumbersReverse = new Map() - for (let i = 0; i < config.shardGroups.length; i++) { - const group = config.shardGroups[i] - const base = (i + 1) * 1000000 - for (let shard = 1; shard <= config.shardsPerGroup; shard++) { - const shardId = ShardId.make(group, shard).toString() - const lockNum = base + shard - lockNumbers.set(shardId, lockNum) - lockNumbersReverse.set(lockNum, shardId) - } - } - const shardIdsIndex = new Map() const lockNames = new Map() const lockNamesReverse = new Map() @@ -419,11 +409,22 @@ export const make = Effect.fnUntraced(function*(options: { } } - const pgLocks = (shardIdsMap: Map) => - Array.from( - shardIdsMap.entries(), - ([lockNum, shardId]) => `pg_try_advisory_lock(${lockNum}) AS "${shardId}"` - ).join(", ") + const pgLockSeed = 1337 + const pgLockKey = (shardId: string) => `hashtextextended(${wrapString(shardId)}, ${pgLockSeed})` + const pgLockExists = (shardId: string, pid: number | "pg_backend_pid()") => + `EXISTS ( + SELECT 1 FROM pg_locks + WHERE locktype = 'advisory' + AND granted = true + AND pid = ${pid} + AND ((classid::bigint << 32) | objid::bigint) = ${pgLockKey(shardId)} + )` + + const pgLocks = (shardIds: ReadonlySet) => + Array.from(shardIds, (shardId) => `pg_try_advisory_lock(${pgLockKey(shardId)}) AS "${shardId}"`).join(", ") + + const pgHeldLocks = (shardIds: ReadonlyArray, pid: number) => + shardIds.map((shardId) => `${pgLockExists(shardId, pid)} AS "${shardId}"`).join(", ") const mysqlLocks = (shardIds: ReadonlyArray) => shardIds.map((shardId) => `GET_LOCK('${lockNames.get(shardId)!}', 0) AS "${shardId}"`).join(", ") @@ -551,12 +552,11 @@ export const make = Effect.fnUntraced(function*(options: { } return Effect.fnUntraced( function*(_address, shardId) { - const lockNum = lockNumbers.get(shardId)! for (let i = 0; i < 5; i++) { const [conn] = yield* lockConn!.await - yield* conn.executeRaw(`SELECT pg_advisory_unlock(${lockNum})`, []) + yield* conn.executeRaw(`SELECT pg_advisory_unlock(${pgLockKey(shardId)})`, []) const takenLocks = yield* conn.executeValues( - `SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = pg_backend_pid() AND objid = ${lockNum}`, + `SELECT 1 WHERE ${pgLockExists(shardId, "pg_backend_pid()")}`, [] ) if (takenLocks.length === 0) return diff --git a/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts b/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts index 913899875e..52a1fbe223 100644 --- a/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts +++ b/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts @@ -1,7 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" import { SqliteClient } from "@effect/sql-sqlite-node" -import { describe, expect, it } from "@effect/vitest" -import { Effect, FileSystem, Layer } from "effect" +import { assert, describe, expect, it } from "@effect/vitest" +import { Context, Effect, FileSystem, Layer } from "effect" import { Runner, RunnerAddress, @@ -88,9 +88,42 @@ describe("SqlRunnerStorage", () => { })) }) }) + + it.effect("pg advisory locks do not collide across shard groups", () => + Effect.scoped(Effect.gen(function*() { + const memoMap = yield* Layer.makeMemoMap + const scope = yield* Effect.scope + const pgLayer = Layer.orDie(PgContainer.layerClient) + const alphaLayer = StorageLive.pipe( + Layer.provideMerge(pgLayer), + Layer.provide(ShardingConfig.layer({ + shardGroups: ["alpha"], + shardsPerGroup: 1 + })) + ) + const bravoLayer = StorageLive.pipe( + Layer.provideMerge(pgLayer), + Layer.provide(ShardingConfig.layer({ + shardGroups: ["bravo"], + shardsPerGroup: 1 + })) + ) + + const alphaContext = yield* Layer.buildWithMemoMap(alphaLayer, memoMap, scope) + const bravoContext = yield* Layer.buildWithMemoMap(bravoLayer, memoMap, scope) + const alphaStorage = Context.get(alphaContext, RunnerStorage.RunnerStorage) + const bravoStorage = Context.get(bravoContext, RunnerStorage.RunnerStorage) + + const alphaAcquired = yield* alphaStorage.acquire(runnerAddress1, [ShardId.make("alpha", 1)]) + const bravoAcquired = yield* bravoStorage.acquire(runnerAddress2, [ShardId.make("bravo", 1)]) + + assert.deepStrictEqual(alphaAcquired.map((_) => _.toString()), ["alpha:1"]) + assert.deepStrictEqual(bravoAcquired.map((_) => _.toString()), ["bravo:1"]) + }))) }) const runnerAddress1 = RunnerAddress.make("localhost", 1234) +const runnerAddress2 = RunnerAddress.make("localhost", 1235) const SqliteLayer = Effect.gen(function*() { const fs = yield* FileSystem.FileSystem