diff --git a/hub/internal/server/db.go b/hub/internal/server/db.go index c1096c14..3864c50a 100644 --- a/hub/internal/server/db.go +++ b/hub/internal/server/db.go @@ -106,6 +106,19 @@ func perTeamWriterCachePragma(maxOpen int) string { return fmt.Sprintf("&_pragma=cache_size(-%d)", perKB) } +// maxReadConns returns the generous reader-pool connection cap (default 64), +// overridable via HUB_DB_MAX_READ_CONNS. This is an FD-exhaustion safety +// bound, NOT a throughput cap — keep it generous. +func maxReadConns() int { + n := 64 + if v := os.Getenv("HUB_DB_MAX_READ_CONNS"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 { + n = parsed + } + } + return n +} + // OpenDB opens the SQLite database at path and runs pending migrations. // Callers must Close the returned *sql.DB. func OpenDB(path string) (*sql.DB, error) { @@ -133,8 +146,9 @@ func OpenDB(path string) (*sql.DB, error) { return nil, fmt.Errorf("close migrations db: %w", err) } - // The general/reader pool is uncapped, so it gets pragmaCommon WITHOUT the - // big writer cache (cache_size is per-connection — see the const doc). + // The general/reader pool gets pragmaCommon WITHOUT the big writer cache + // (cache_size is per-connection — see the const doc). It carries a generous + // FD-safety cap (see maxReadConns). dsn := path + "?_pragma=foreign_keys(1)&" + pragmaCommon db, err := sql.Open("sqlite", dsn) if err != nil { diff --git a/hub/internal/server/handlers_admin_teams.go b/hub/internal/server/handlers_admin_teams.go index 59b1b53e..c413a87a 100644 --- a/hub/internal/server/handlers_admin_teams.go +++ b/hub/internal/server/handlers_admin_teams.go @@ -123,12 +123,12 @@ func (s *Server) handleAdminRotateTeamToken(w http.ResponseWriter, r *http.Reque s.writeDBErr(w, err) return } + defer rows.Close() var staleIDs []string var handle string for rows.Next() { var id, scopeJSON string if err := rows.Scan(&id, &scopeJSON); err != nil { - rows.Close() s.writeDBErr(w, err) return } @@ -145,7 +145,6 @@ func (s *Server) handleAdminRotateTeamToken(w http.ResponseWriter, r *http.Reque handle = sc.Handle // first match = newest (ORDER BY DESC) } } - rows.Close() if err := rows.Err(); err != nil { s.writeDBErr(w, err) return diff --git a/hub/internal/server/handlers_criteria.go b/hub/internal/server/handlers_criteria.go index 458a54de..f2b178d3 100644 --- a/hub/internal/server/handlers_criteria.go +++ b/hub/internal/server/handlers_criteria.go @@ -457,6 +457,7 @@ func (s *Server) cascadeDeliverableRatified( if err != nil { return nil, err } + defer rows.Close() type pending struct { id string body map[string]any @@ -467,7 +468,6 @@ func (s *Server) cascadeDeliverableRatified( var id, bodyStr string var delivNS sql.NullString if err := rows.Scan(&id, &bodyStr, &delivNS); err != nil { - rows.Close() return nil, err } body := map[string]any{} @@ -478,7 +478,6 @@ func (s *Server) cascadeDeliverableRatified( } candidates = append(candidates, pending{id: id, body: body, deliv: deliv}) } - rows.Close() fired := []string{} for _, p := range candidates { diff --git a/hub/internal/server/handlers_insights.go b/hub/internal/server/handlers_insights.go index ddeaa7f5..e71c39d4 100644 --- a/hub/internal/server/handlers_insights.go +++ b/hub/internal/server/handlers_insights.go @@ -1040,6 +1040,7 @@ func (s *Server) fillInsightsByProject( if err != nil { return err } + defer rows.Close() type projRow struct { id, name, phase, status, history, templateID string } @@ -1050,12 +1051,10 @@ func (s *Server) fillInsightsByProject( &p.id, &p.name, &p.phase, &p.status, &p.history, &p.templateID, ); err != nil { - rows.Close() return err } projects = append(projects, p) } - rows.Close() if err := rows.Err(); err != nil { return err } diff --git a/hub/internal/server/loop_hooks.go b/hub/internal/server/loop_hooks.go index 6dfc8542..1bcb58cb 100644 --- a/hub/internal/server/loop_hooks.go +++ b/hub/internal/server/loop_hooks.go @@ -112,6 +112,7 @@ func (s *Server) onPreAgentIdle(ctx context.Context, agentID string) { if err != nil { return } + defer rows.Close() var open []string for rows.Next() { var title string @@ -119,7 +120,6 @@ func (s *Server) onPreAgentIdle(ctx context.Context, agentID string) { open = append(open, title) } } - rows.Close() if len(open) == 0 { return } diff --git a/hub/internal/server/loop_sweep.go b/hub/internal/server/loop_sweep.go index 58bf5e6c..dfea9bf0 100644 --- a/hub/internal/server/loop_sweep.go +++ b/hub/internal/server/loop_sweep.go @@ -169,6 +169,7 @@ func (s *Server) bumpLoopProgress(ctx context.Context, agentID string) { s.log.Warn("loop sweep: bump progress list failed", "agent", agentID, "err", err) return } + defer rows.Close() type taskRef struct{ id, projectID string } var refs []taskRef for rows.Next() { @@ -177,7 +178,6 @@ func (s *Server) bumpLoopProgress(ctx context.Context, agentID string) { refs = append(refs, r) } } - rows.Close() now := time.Now().UTC() for _, r := range refs { inactivity, _ := s.loopBudgets(ctx, r.projectID) diff --git a/hub/internal/server/phase_completion_gate.go b/hub/internal/server/phase_completion_gate.go index d1d0567d..c10990c0 100644 --- a/hub/internal/server/phase_completion_gate.go +++ b/hub/internal/server/phase_completion_gate.go @@ -67,6 +67,7 @@ func (s *Server) cascadeDeliverableUnratified( if err != nil { return nil, err } + defer rows.Close() type pending struct { id string body map[string]any @@ -77,7 +78,6 @@ func (s *Server) cascadeDeliverableUnratified( var id, bodyStr string var delivNS sql.NullString if err := rows.Scan(&id, &bodyStr, &delivNS); err != nil { - rows.Close() return nil, err } body := map[string]any{} @@ -88,7 +88,6 @@ func (s *Server) cascadeDeliverableUnratified( } candidates = append(candidates, pending{id: id, body: body, deliv: deliv}) } - rows.Close() repended := []string{} for _, p := range candidates { diff --git a/hub/internal/server/seed_demo_lifecycle.go b/hub/internal/server/seed_demo_lifecycle.go index a789a707..90fec08c 100644 --- a/hub/internal/server/seed_demo_lifecycle.go +++ b/hub/internal/server/seed_demo_lifecycle.go @@ -112,16 +112,15 @@ func ResetLifecycleDemo(ctx context.Context, db *sql.DB) (deleted bool, err erro if err != nil { return false, fmt.Errorf("lookup lifecycle demos: %w", err) } + defer rows.Close() var projectIDs []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { - rows.Close() return false, err } projectIDs = append(projectIDs, id) } - rows.Close() if len(projectIDs) == 0 { return false, nil } diff --git a/hub/internal/server/server.go b/hub/internal/server/server.go index c3460af4..83236448 100644 --- a/hub/internal/server/server.go +++ b/hub/internal/server/server.go @@ -57,7 +57,7 @@ type Config struct { type Server struct { cfg Config - db *sql.DB // control reader pool — uncapped; reads + tests + db *sql.DB // control reader pool — generous cap (HUB_DB_MAX_READ_CONNS); reads + tests writeDB *sql.DB // control writer pool — SetMaxOpenConns(1); all writes, see New() // Store separation + per-team sharding (ADR-045 D2 — the event log + the // derived digest are separate data classes from the control plane, and both @@ -142,9 +142,12 @@ func New(cfg Config) (*Server, error) { if err != nil { return nil, err } + db.SetMaxOpenConns(maxReadConns()) + db.SetMaxIdleConns(maxReadConns()) // Writer/reader pool split — docs/discussions/hub-scaling-storage-and- // concurrency.md §6 lever 3. SQLite is a single-writer store. s.db stays - // the uncapped general/reader pool (WAL lets readers run concurrently); + // the general/reader pool (WAL lets readers run concurrently; generous + // FD-safety cap via HUB_DB_MAX_READ_CONNS); // s.writeDB is a dedicated ONE-connection writer pool that ALL writes go // through, so writes queue cheaply in Go instead of colliding on the // write lock and exhausting busy_timeout under many concurrent agents diff --git a/hub/internal/server/store_split.go b/hub/internal/server/store_split.go index 14182318..54184a52 100644 --- a/hub/internal/server/store_split.go +++ b/hub/internal/server/store_split.go @@ -150,16 +150,16 @@ func storePathsFor(controlPath string) (eventsPath, digestPath string) { return filepath.Join(dir, "events.db"), filepath.Join(dir, "digest.db") } -// openStorePool opens a reader (uncapped) or single-writer pool against an +// openStorePool opens a reader (generous cap) or single-writer pool against an // already-schema'd store file. Mirrors the control reader/writer split (New(), // OpenWriterDB): writes serialize through one connection so they queue in Go // instead of colliding on SQLite's per-file write lock. The moving tables carry // no foreign keys post-split, but foreign_keys(1) is harmless and consistent. // // The writer cache (cachePragma) is applied only to the single-conn writer pool -// — the uncapped reader pool gets pragmaCommon, so a per-connection cache can't -// multiply across concurrent readers (db.go const doc). cachePragma is the -// budget-divided per-team value (perTeamWriterCachePragma) threaded in by the +// — the reader pool gets pragmaCommon (with a generous FD-safety cap), so a +// per-connection cache can't multiply across concurrent readers (db.go const +// doc). cachePragma is the budget-divided per-team value (perTeamWriterCachePragma) threaded in by the // registry; an empty string means no extra cache (reader pools pass ""). func openStorePool(path string, writer bool, cachePragma string) (*sql.DB, error) { dsn := path + "?_pragma=foreign_keys(1)&" + pragmaCommon @@ -181,6 +181,9 @@ func openStorePool(path string, writer bool, cachePragma string) (*sql.DB, error if writer { db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) + } else { + db.SetMaxOpenConns(maxReadConns()) + db.SetMaxIdleConns(maxReadConns()) } return db, nil }