From 5436e3cf2158bd0d32c7f4fcec3256a8699aebbc Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Sat, 16 May 2026 11:40:22 +0200 Subject: [PATCH 1/3] fix cluster advisory locks and oxc plugin exports --- .changeset/fair-seals-check.md | 6 ++ .../src/unstable/cluster/SqlRunnerStorage.ts | 58 +++++++++---------- .../test/cluster/SqlRunnerStorage.test.ts | 37 +++++++++++- packages/tools/oxc/package.json | 10 +++- 4 files changed, 78 insertions(+), 33 deletions(-) create mode 100644 .changeset/fair-seals-check.md diff --git a/.changeset/fair-seals-check.md b/.changeset/fair-seals-check.md new file mode 100644 index 0000000000..fb38feade4 --- /dev/null +++ b/.changeset/fair-seals-check.md @@ -0,0 +1,6 @@ +--- +"effect": patch +"@effect/oxc": patch +--- + +Fix PostgreSQL shard advisory lock collisions across disjoint shard groups and restore workspace `oxlint` plugin loading from built exports. diff --git a/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts b/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts index 8cfb747cea..2928f310da 100644 --- a/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts +++ b/packages/effect/src/unstable/cluster/SqlRunnerStorage.ts @@ -283,15 +283,18 @@ export const make = Effect.fnUntraced(function*(options: { return Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray) { const [conn, pid] = yield* lockConn!.await const acquiredShardIds: Array = [] - const toAcquire = new Map(shardIds.map((shardId) => [lockNumbers.get(shardId)!, shardId])) - const takenLocks = yield* conn.executeValues( - `SELECT objid FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = ${pid} ORDER BY objid`, - [] + const toAcquire = new Set(shardIds) + const takenLocks = yield* conn.executeUnprepared( + `SELECT ${pgHeldLocks(shardIds, pid)}`, + [], + undefined ) - for (let i = 0; i < takenLocks.length; i++) { - const lockNum = takenLocks[i][0] as number - acquiredShardIds.push(lockNumbersReverse.get(lockNum)!) - toAcquire.delete(lockNum) + const heldLocks = takenLocks[0] as Record + for (const shardId in heldLocks) { + if (heldLocks[shardId]) { + acquiredShardIds.push(shardId) + toAcquire.delete(shardId) + } } if (toAcquire.size === 0) { return acquiredShardIds @@ -389,19 +392,6 @@ export const make = Effect.fnUntraced(function*(options: { } }) - const lockNumbers = new Map() - const lockNumbersReverse = new Map() - for (let i = 0; i < config.shardGroups.length; i++) { - const group = config.shardGroups[i] - const base = (i + 1) * 1000000 - for (let shard = 1; shard <= config.shardsPerGroup; shard++) { - const shardId = ShardId.make(group, shard).toString() - const lockNum = base + shard - lockNumbers.set(shardId, lockNum) - lockNumbersReverse.set(lockNum, shardId) - } - } - const shardIdsIndex = new Map() const lockNames = new Map() const lockNamesReverse = new Map() @@ -419,11 +409,22 @@ export const make = Effect.fnUntraced(function*(options: { } } - const pgLocks = (shardIdsMap: Map) => - Array.from( - shardIdsMap.entries(), - ([lockNum, shardId]) => `pg_try_advisory_lock(${lockNum}) AS "${shardId}"` - ).join(", ") + const pgLockSeed = 1337 + const pgLockKey = (shardId: string) => `hashtextextended(${wrapString(shardId)}, ${pgLockSeed})` + const pgLockExists = (shardId: string, pid: number | "pg_backend_pid()") => + `EXISTS ( + SELECT 1 FROM pg_locks + WHERE locktype = 'advisory' + AND granted = true + AND pid = ${pid} + AND ((classid::bigint << 32) | objid::bigint) = ${pgLockKey(shardId)} + )` + + const pgLocks = (shardIds: ReadonlySet) => + Array.from(shardIds, (shardId) => `pg_try_advisory_lock(${pgLockKey(shardId)}) AS "${shardId}"`).join(", ") + + const pgHeldLocks = (shardIds: ReadonlyArray, pid: number) => + shardIds.map((shardId) => `${pgLockExists(shardId, pid)} AS "${shardId}"`).join(", ") const mysqlLocks = (shardIds: ReadonlyArray) => shardIds.map((shardId) => `GET_LOCK('${lockNames.get(shardId)!}', 0) AS "${shardId}"`).join(", ") @@ -551,12 +552,11 @@ export const make = Effect.fnUntraced(function*(options: { } return Effect.fnUntraced( function*(_address, shardId) { - const lockNum = lockNumbers.get(shardId)! for (let i = 0; i < 5; i++) { const [conn] = yield* lockConn!.await - yield* conn.executeRaw(`SELECT pg_advisory_unlock(${lockNum})`, []) + yield* conn.executeRaw(`SELECT pg_advisory_unlock(${pgLockKey(shardId)})`, []) const takenLocks = yield* conn.executeValues( - `SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND granted = true AND pid = pg_backend_pid() AND objid = ${lockNum}`, + `SELECT 1 WHERE ${pgLockExists(shardId, "pg_backend_pid()")}`, [] ) if (takenLocks.length === 0) return diff --git a/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts b/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts index 913899875e..52a1fbe223 100644 --- a/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts +++ b/packages/platform-node/test/cluster/SqlRunnerStorage.test.ts @@ -1,7 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" import { SqliteClient } from "@effect/sql-sqlite-node" -import { describe, expect, it } from "@effect/vitest" -import { Effect, FileSystem, Layer } from "effect" +import { assert, describe, expect, it } from "@effect/vitest" +import { Context, Effect, FileSystem, Layer } from "effect" import { Runner, RunnerAddress, @@ -88,9 +88,42 @@ describe("SqlRunnerStorage", () => { })) }) }) + + it.effect("pg advisory locks do not collide across shard groups", () => + Effect.scoped(Effect.gen(function*() { + const memoMap = yield* Layer.makeMemoMap + const scope = yield* Effect.scope + const pgLayer = Layer.orDie(PgContainer.layerClient) + const alphaLayer = StorageLive.pipe( + Layer.provideMerge(pgLayer), + Layer.provide(ShardingConfig.layer({ + shardGroups: ["alpha"], + shardsPerGroup: 1 + })) + ) + const bravoLayer = StorageLive.pipe( + Layer.provideMerge(pgLayer), + Layer.provide(ShardingConfig.layer({ + shardGroups: ["bravo"], + shardsPerGroup: 1 + })) + ) + + const alphaContext = yield* Layer.buildWithMemoMap(alphaLayer, memoMap, scope) + const bravoContext = yield* Layer.buildWithMemoMap(bravoLayer, memoMap, scope) + const alphaStorage = Context.get(alphaContext, RunnerStorage.RunnerStorage) + const bravoStorage = Context.get(bravoContext, RunnerStorage.RunnerStorage) + + const alphaAcquired = yield* alphaStorage.acquire(runnerAddress1, [ShardId.make("alpha", 1)]) + const bravoAcquired = yield* bravoStorage.acquire(runnerAddress2, [ShardId.make("bravo", 1)]) + + assert.deepStrictEqual(alphaAcquired.map((_) => _.toString()), ["alpha:1"]) + assert.deepStrictEqual(bravoAcquired.map((_) => _.toString()), ["bravo:1"]) + }))) }) const runnerAddress1 = RunnerAddress.make("localhost", 1234) +const runnerAddress2 = RunnerAddress.make("localhost", 1235) const SqliteLayer = Effect.gen(function*() { const fs = yield* FileSystem.FileSystem diff --git a/packages/tools/oxc/package.json b/packages/tools/oxc/package.json index 632bfac95e..89aca9bd67 100644 --- a/packages/tools/oxc/package.json +++ b/packages/tools/oxc/package.json @@ -29,8 +29,14 @@ ], "exports": { "./package.json": "./package.json", - "./oxlint": "./src/oxlint/index.ts", - "./oxlint/rules/*": "./src/oxlint/rules/*.ts", + "./oxlint": { + "types": "./dist/oxlint/index.d.ts", + "default": "./dist/oxlint/index.js" + }, + "./oxlint/rules/*": { + "types": "./dist/oxlint/rules/*.d.ts", + "default": "./dist/oxlint/rules/*.js" + }, "./oxlintrc.json": "./oxlintrc.json" }, "files": [ From 150fd7993255b042b152a63ecaf2b5c4a18f63dd Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Sat, 16 May 2026 12:07:02 +0200 Subject: [PATCH 2/3] revert oxc lint plugin export change --- .changeset/fair-seals-check.md | 3 +-- packages/tools/oxc/package.json | 10 ++-------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/.changeset/fair-seals-check.md b/.changeset/fair-seals-check.md index fb38feade4..f1ee8041bb 100644 --- a/.changeset/fair-seals-check.md +++ b/.changeset/fair-seals-check.md @@ -1,6 +1,5 @@ --- "effect": patch -"@effect/oxc": patch --- -Fix PostgreSQL shard advisory lock collisions across disjoint shard groups and restore workspace `oxlint` plugin loading from built exports. +Fix PostgreSQL shard advisory lock collisions across disjoint shard groups. diff --git a/packages/tools/oxc/package.json b/packages/tools/oxc/package.json index 89aca9bd67..632bfac95e 100644 --- a/packages/tools/oxc/package.json +++ b/packages/tools/oxc/package.json @@ -29,14 +29,8 @@ ], "exports": { "./package.json": "./package.json", - "./oxlint": { - "types": "./dist/oxlint/index.d.ts", - "default": "./dist/oxlint/index.js" - }, - "./oxlint/rules/*": { - "types": "./dist/oxlint/rules/*.d.ts", - "default": "./dist/oxlint/rules/*.js" - }, + "./oxlint": "./src/oxlint/index.ts", + "./oxlint/rules/*": "./src/oxlint/rules/*.ts", "./oxlintrc.json": "./oxlintrc.json" }, "files": [ From 7146854390ccc1e1824307149aae38aa06f17886 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Sat, 16 May 2026 12:10:12 +0200 Subject: [PATCH 3/3] chore: ignore opencode local files --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index c0723a42a0..2860cf441f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,9 @@ scratchpad/**/* # Codex .agents/ + +# OpenCode +.opencode/session-logs +.opencode/plugins +.opencode/package-lock.json +opencode.jsonc