Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,37 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =

try:
pool.cachedNow = Moment.now()
# Open all `minSize` connections concurrently. `allFutures` waits for
# every connect to settle (success or failure) without short-circuiting,
# so a failure in one does not abandon the others mid-handshake — the
# server observes a clean Terminate for each socket that did come up.
# Successful connections are parked in `idle`; the first failure (if any)
# is raised so the except branch closes the ones that succeeded.
var connectFuts: seq[Future[PgConnection]]
for i in 0 ..< cfg.minSize:
let conn = await connect(cfg.connConfig)
conn.ownerPool = pool
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
connectFuts.add(connect(cfg.connConfig))
await allFutures(connectFuts)
var firstErr: ref CatchableError = nil
for f in connectFuts:
if f.failed():
if firstErr == nil:
# `connect` only raises `CatchableError`, so the stored exception is
# always safe to downcast from `ref Exception` (asyncdispatch) /
# `ref CatchableError` (chronos) to the typed `ref CatchableError`.
firstErr = cast[ref CatchableError](f.error)
else:
let conn = f.read()
conn.ownerPool = pool
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
if firstErr != nil:
raise firstErr
except CatchableError as e:
var closeFuts: seq[Future[void]]
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
await pool.tracedClose(pc.conn)
closeFuts.add(pool.tracedClose(pc.conn))
await allFutures(closeFuts)
raise e

pool.maintenanceTask = maintenanceLoop(pool)
Expand Down Expand Up @@ -2051,11 +2073,15 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
while pool.active > 0 and Moment.now() < deadline:
await sleepAsync(milliseconds(50))

# Close all idle connections
# Close all idle connections in parallel. `tracedClose` swallows its own
# errors (routing them to the tracer), so a failure in one close does not
# short-circuit the rest or escape this proc.
var closeFuts: seq[Future[void]]
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
pool.metrics.closeCount.inc
await pool.tracedClose(pc.conn)
closeFuts.add(pool.tracedClose(pc.conn))
await allFutures(closeFuts)

# Yield once after closing idle connections and before draining background
# tasks, but only when a borrow is still outstanding. A conn handed off to a
Expand Down
117 changes: 117 additions & 0 deletions tests/test_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2801,3 +2801,120 @@ suite "Pool replenish close-race":
await closeServer(ms)

waitFor t()

suite "Pool warmup parallelization":
test "newPool opens minSize connections in parallel":
# `newPool` should open all `minSize` connections concurrently (via
# `allFutures`), not sequentially. The server handler accepts that many
# handshakes; if warmup were serial this would still pass, so the assertion
# is on the count and createCount rather than timing — but the parallel
# path is what makes the open non-blocking under concurrent handshakes.
var idleAfter = -1
var createCount: int64 = -1

proc t() {.async.} =
let ms = startMockServer()
proc serverHandler() {.async.} =
var clients: seq[MockClient]
for i in 0 ..< 3:
try:
clients.add(await acceptAndReady(ms))
except CatchableError:
break
await sleepAsync(milliseconds(100))
for c in clients:
await closeClient(c)

let serverFut = serverHandler()
let cfg = initPoolConfig(mockConfig(ms.port), minSize = 3, maxSize = 5)
let pool = await newPool(cfg)
idleAfter = pool.idle.len
createCount = pool.metrics.createCount
await pool.close()
await serverFut
await closeServer(ms)

waitFor t()
check idleAfter == 3
check createCount == 3

test "newPool raises when all initial connects fail":
# When every warmup connect fails, `newPool` must raise the first error
# (and the empty-idle cleanup loop is a no-op). Connects target a port we
# freed by closing a mock server so they get ECONNREFUSED.
var raised = false

proc t() {.async.} =
let ms = startMockServer()
let freePort = ms.port
await closeServer(ms)
var cfg = initPoolConfig(
ConnConfig(
host: "127.0.0.1",
port: freePort,
user: "t",
database: "t",
sslMode: sslDisable,
),
minSize = 2,
maxSize = 5,
)
cfg.connConfig.connectTimeout = milliseconds(300)
try:
discard await newPool(cfg)
except CatchableError:
raised = true

waitFor t()
check raised

test "newPool issues warmup connects concurrently (gate-based)":
# Prove that `newPool` opens all `minSize` connects in parallel, not
# serially. The server accepts client #1 and drains its startup message
# but withholds the handshake until client #2 also connects. If warmup
# were serial, client #2's connect would never start while #1's handshake
# is pending — the second `accept` would hang until `connectTimeout` fires
# and `newPool` would raise. With parallel warmup both TCP connections are
# up immediately, the server sees #2, both handshakes complete, and
# `newPool` returns within the timeout.
var ok = true
var idleAfter = -1

proc t() {.async.} =
let ms = startMockServer()
proc serverHandler() {.async.} =
var clients: seq[MockClient]
try:
let c1 = await ms.accept()
clients.add(c1)
await drainStartupMessage(c1)
# Gate: withhold c1's handshake until c2 also connects. Under serial
# warmup this accept never completes and the test fails via timeout.
let c2 = await ms.accept()
clients.add(c2)
await drainStartupMessage(c2)
await sendFullHandshake(c1)
await sendFullHandshake(c2)
except CatchableError:
discard
for c in clients:
try:
await closeClient(c)
except CatchableError:
discard

let serverFut = serverHandler()
var cfg = initPoolConfig(mockConfig(ms.port), minSize = 2, maxSize = 4)
cfg.connConfig.connectTimeout = milliseconds(500)
try:
let pool = await newPool(cfg)
idleAfter = pool.idle.len
await pool.close()
except CatchableError:
ok = false
await closeServer(ms)
await serverFut

waitFor t()
check ok
check idleAfter == 2