From 26807dc2e48eadb35732d1544fae0db650c9a4aa Mon Sep 17 00:00:00 2001 From: fox0430 Date: Fri, 26 Jun 2026 20:00:19 +0900 Subject: [PATCH 1/2] Parallelize pool warmup and close via allFutures --- async_postgres/pg_pool.nim | 40 ++++++++++--- tests/test_pool.nim | 115 +++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 7 deletions(-) diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index d11be58..1ca0b47 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -605,15 +605,37 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = try: pool.cachedNow = Moment.now() + # Open all `minSize` connections concurrently. `allFutures` waits for + # every connect to settle (success or failure) without short-circuiting, + # so a failure in one does not abandon the others mid-handshake — the + # server observes a clean Terminate for each socket that did come up. + # Successful connections are parked in `idle`; the first failure (if any) + # is raised so the except branch closes the ones that succeeded. + var connectFuts: seq[Future[PgConnection]] for i in 0 ..< cfg.minSize: - let conn = await connect(cfg.connConfig) - conn.ownerPool = pool - pool.metrics.createCount.inc - pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow)) + connectFuts.add(connect(cfg.connConfig)) + await allFutures(connectFuts) + var firstErr: ref CatchableError = nil + for f in connectFuts: + if f.failed(): + if firstErr == nil: + # `connect` only raises `CatchableError`, so the stored exception is + # always safe to downcast from `ref Exception` (asyncdispatch) / + # `ref CatchableError` (chronos) to the typed `ref CatchableError`. + firstErr = cast[ref CatchableError](f.error) + else: + let conn = f.read() + conn.ownerPool = pool + pool.metrics.createCount.inc + pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow)) + if firstErr != nil: + raise firstErr except CatchableError as e: + var closeFuts: seq[Future[void]] while pool.idle.len > 0: let pc = pool.idle.popFirst() - await pool.tracedClose(pc.conn) + closeFuts.add(pool.tracedClose(pc.conn)) + await allFutures(closeFuts) raise e pool.maintenanceTask = maintenanceLoop(pool) @@ -2051,11 +2073,15 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} = while pool.active > 0 and Moment.now() < deadline: await sleepAsync(milliseconds(50)) - # Close all idle connections + # Close all idle connections in parallel. `tracedClose` swallows its own + # errors (routing them to the tracer), so a failure in one close does not + # short-circuit the rest or escape this proc. + var closeFuts: seq[Future[void]] while pool.idle.len > 0: let pc = pool.idle.popFirst() pool.metrics.closeCount.inc - await pool.tracedClose(pc.conn) + closeFuts.add(pool.tracedClose(pc.conn)) + await allFutures(closeFuts) # Yield once after closing idle connections and before draining background # tasks, but only when a borrow is still outstanding. A conn handed off to a diff --git a/tests/test_pool.nim b/tests/test_pool.nim index 608747e..59c3799 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -2801,3 +2801,118 @@ suite "Pool replenish close-race": await closeServer(ms) waitFor t() + +suite "Pool warmup parallelization": + test "newPool opens minSize connections in parallel": + # `newPool` should open all `minSize` connections concurrently (via + # `allFutures`), not sequentially. The server handler accepts that many + # handshakes; if warmup were serial this would still pass, so the assertion + # is on the count and createCount rather than timing — but the parallel + # path is what makes the open non-blocking under concurrent handshakes. + var idleAfter = -1 + var createCount: int64 = -1 + + proc t() {.async.} = + let ms = startMockServer() + proc serverHandler() {.async.} = + var clients: seq[MockClient] + for i in 0 ..< 3: + try: + clients.add(await acceptAndReady(ms)) + except CatchableError: + break + await sleepAsync(milliseconds(100)) + for c in clients: + await closeClient(c) + + let serverFut = serverHandler() + let cfg = initPoolConfig(mockConfig(ms.port), minSize = 3, maxSize = 5) + let pool = await newPool(cfg) + idleAfter = pool.idle.len + createCount = pool.metrics.createCount + await pool.close() + await serverFut + await closeServer(ms) + + waitFor t() + check idleAfter == 3 + check createCount == 3 + + test "newPool raises when all initial connects fail": + # When every warmup connect fails, `newPool` must raise the first error + # (and the empty-idle cleanup loop is a no-op). Connects target a port we + # freed by closing a mock server so they get ECONNREFUSED. + var raised = false + + proc t() {.async.} = + let ms = startMockServer() + let freePort = ms.port + await closeServer(ms) + var cfg = initPoolConfig( + ConnConfig( + host: "127.0.0.1", + port: freePort, + user: "t", + database: "t", + sslMode: sslDisable, + ), + minSize = 2, + maxSize = 5, + ) + cfg.connConfig.connectTimeout = milliseconds(300) + try: + discard await newPool(cfg) + except CatchableError: + raised = true + + waitFor t() + check raised + + test "newPool issues warmup connects concurrently (gate-based)": + # Prove that `newPool` opens all `minSize` connects in parallel, not + # serially. The server accepts client #1 and drains its startup message + # but withholds the handshake until client #2 also connects. If warmup + # were serial, client #2's connect would never start while #1's handshake + # is pending — the second `accept` would hang until `connectTimeout` fires + # and `newPool` would raise. With parallel warmup both TCP connections are + # up immediately, the server sees #2, both handshakes complete, and + # `newPool` returns within the timeout. + var ok = true + var idleAfter = -1 + + proc t() {.async.} = + let ms = startMockServer() + proc serverHandler() {.async.} = + var clients: seq[MockClient] + try: + let c1 = await ms.accept() + clients.add(c1) + await drainStartupMessage(c1) + # Gate: withhold c1's handshake until c2 also connects. Under serial + # warmup this accept never completes and the test fails via timeout. + let c2 = await ms.accept() + clients.add(c2) + await drainStartupMessage(c2) + await sendFullHandshake(c1) + await sendFullHandshake(c2) + except CatchableError: + discard + for c in clients: + try: await closeClient(c) + except CatchableError: discard + + let serverFut = serverHandler() + var cfg = initPoolConfig(mockConfig(ms.port), minSize = 2, maxSize = 4) + cfg.connConfig.connectTimeout = milliseconds(500) + try: + let pool = await newPool(cfg) + idleAfter = pool.idle.len + await pool.close() + except CatchableError: + ok = false + await closeServer(ms) + await serverFut + + waitFor t() + check ok + check idleAfter == 2 From 4f67b8cc5a88c4177cfb564b2cb90ac3a8bad7d8 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Fri, 26 Jun 2026 20:02:08 +0900 Subject: [PATCH 2/2] nph --- tests/test_pool.nim | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_pool.nim b/tests/test_pool.nim index 59c3799..6f05271 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -2898,8 +2898,10 @@ suite "Pool warmup parallelization": except CatchableError: discard for c in clients: - try: await closeClient(c) - except CatchableError: discard + try: + await closeClient(c) + except CatchableError: + discard let serverFut = serverHandler() var cfg = initPoolConfig(mockConfig(ms.port), minSize = 2, maxSize = 4)