Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions hub/internal/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ func perTeamWriterCachePragma(maxOpen int) string {
return fmt.Sprintf("&_pragma=cache_size(-%d)", perKB)
}

// maxReadConns returns the generous reader-pool connection cap (default 64),
// overridable via HUB_DB_MAX_READ_CONNS. This is an FD-exhaustion safety
// bound, NOT a throughput cap — keep it generous.
func maxReadConns() int {
n := 64
if v := os.Getenv("HUB_DB_MAX_READ_CONNS"); v != "" {
if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 {
n = parsed
}
}
return n
}

// OpenDB opens the SQLite database at path and runs pending migrations.
// Callers must Close the returned *sql.DB.
func OpenDB(path string) (*sql.DB, error) {
Expand Down Expand Up @@ -133,8 +146,9 @@ func OpenDB(path string) (*sql.DB, error) {
return nil, fmt.Errorf("close migrations db: %w", err)
}

// The general/reader pool is uncapped, so it gets pragmaCommon WITHOUT the
// big writer cache (cache_size is per-connection — see the const doc).
// The general/reader pool gets pragmaCommon WITHOUT the big writer cache
// (cache_size is per-connection — see the const doc). It carries a generous
// FD-safety cap (see maxReadConns).
dsn := path + "?_pragma=foreign_keys(1)&" + pragmaCommon
db, err := sql.Open("sqlite", dsn)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions hub/internal/server/handlers_admin_teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ func (s *Server) handleAdminRotateTeamToken(w http.ResponseWriter, r *http.Reque
s.writeDBErr(w, err)
return
}
defer rows.Close()
var staleIDs []string
var handle string
for rows.Next() {
var id, scopeJSON string
if err := rows.Scan(&id, &scopeJSON); err != nil {
rows.Close()
s.writeDBErr(w, err)
return
}
Expand All @@ -145,7 +145,6 @@ func (s *Server) handleAdminRotateTeamToken(w http.ResponseWriter, r *http.Reque
handle = sc.Handle // first match = newest (ORDER BY DESC)
}
}
rows.Close()
if err := rows.Err(); err != nil {
s.writeDBErr(w, err)
return
Expand Down
3 changes: 1 addition & 2 deletions hub/internal/server/handlers_criteria.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ func (s *Server) cascadeDeliverableRatified(
if err != nil {
return nil, err
}
defer rows.Close()
type pending struct {
id string
body map[string]any
Expand All @@ -467,7 +468,6 @@ func (s *Server) cascadeDeliverableRatified(
var id, bodyStr string
var delivNS sql.NullString
if err := rows.Scan(&id, &bodyStr, &delivNS); err != nil {
rows.Close()
return nil, err
}
body := map[string]any{}
Expand All @@ -478,7 +478,6 @@ func (s *Server) cascadeDeliverableRatified(
}
candidates = append(candidates, pending{id: id, body: body, deliv: deliv})
}
rows.Close()

fired := []string{}
for _, p := range candidates {
Expand Down
3 changes: 1 addition & 2 deletions hub/internal/server/handlers_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,7 @@ func (s *Server) fillInsightsByProject(
if err != nil {
return err
}
defer rows.Close()
type projRow struct {
id, name, phase, status, history, templateID string
}
Expand All @@ -1050,12 +1051,10 @@ func (s *Server) fillInsightsByProject(
&p.id, &p.name, &p.phase, &p.status,
&p.history, &p.templateID,
); err != nil {
rows.Close()
return err
}
projects = append(projects, p)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion hub/internal/server/loop_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ func (s *Server) onPreAgentIdle(ctx context.Context, agentID string) {
if err != nil {
return
}
defer rows.Close()
var open []string
for rows.Next() {
var title string
if rows.Scan(&title) == nil {
open = append(open, title)
}
}
rows.Close()
if len(open) == 0 {
return
}
Expand Down
2 changes: 1 addition & 1 deletion hub/internal/server/loop_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (s *Server) bumpLoopProgress(ctx context.Context, agentID string) {
s.log.Warn("loop sweep: bump progress list failed", "agent", agentID, "err", err)
return
}
defer rows.Close()
type taskRef struct{ id, projectID string }
var refs []taskRef
for rows.Next() {
Expand All @@ -177,7 +178,6 @@ func (s *Server) bumpLoopProgress(ctx context.Context, agentID string) {
refs = append(refs, r)
}
}
rows.Close()
now := time.Now().UTC()
for _, r := range refs {
inactivity, _ := s.loopBudgets(ctx, r.projectID)
Expand Down
3 changes: 1 addition & 2 deletions hub/internal/server/phase_completion_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *Server) cascadeDeliverableUnratified(
if err != nil {
return nil, err
}
defer rows.Close()
type pending struct {
id string
body map[string]any
Expand All @@ -77,7 +78,6 @@ func (s *Server) cascadeDeliverableUnratified(
var id, bodyStr string
var delivNS sql.NullString
if err := rows.Scan(&id, &bodyStr, &delivNS); err != nil {
rows.Close()
return nil, err
}
body := map[string]any{}
Expand All @@ -88,7 +88,6 @@ func (s *Server) cascadeDeliverableUnratified(
}
candidates = append(candidates, pending{id: id, body: body, deliv: deliv})
}
rows.Close()

repended := []string{}
for _, p := range candidates {
Expand Down
3 changes: 1 addition & 2 deletions hub/internal/server/seed_demo_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,15 @@ func ResetLifecycleDemo(ctx context.Context, db *sql.DB) (deleted bool, err erro
if err != nil {
return false, fmt.Errorf("lookup lifecycle demos: %w", err)
}
defer rows.Close()
var projectIDs []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
rows.Close()
return false, err
}
projectIDs = append(projectIDs, id)
}
rows.Close()
if len(projectIDs) == 0 {
return false, nil
}
Expand Down
7 changes: 5 additions & 2 deletions hub/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Config struct {

type Server struct {
cfg Config
db *sql.DB // control reader pool — uncapped; reads + tests
db *sql.DB // control reader pool — generous cap (HUB_DB_MAX_READ_CONNS); reads + tests
writeDB *sql.DB // control writer pool — SetMaxOpenConns(1); all writes, see New()
// Store separation + per-team sharding (ADR-045 D2 — the event log + the
// derived digest are separate data classes from the control plane, and both
Expand Down Expand Up @@ -142,9 +142,12 @@ func New(cfg Config) (*Server, error) {
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxReadConns())
db.SetMaxIdleConns(maxReadConns())
// Writer/reader pool split — docs/discussions/hub-scaling-storage-and-
// concurrency.md §6 lever 3. SQLite is a single-writer store. s.db stays
// the uncapped general/reader pool (WAL lets readers run concurrently);
// the general/reader pool (WAL lets readers run concurrently; generous
// FD-safety cap via HUB_DB_MAX_READ_CONNS);
// s.writeDB is a dedicated ONE-connection writer pool that ALL writes go
// through, so writes queue cheaply in Go instead of colliding on the
// write lock and exhausting busy_timeout under many concurrent agents
Expand Down
11 changes: 7 additions & 4 deletions hub/internal/server/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ func storePathsFor(controlPath string) (eventsPath, digestPath string) {
return filepath.Join(dir, "events.db"), filepath.Join(dir, "digest.db")
}

// openStorePool opens a reader (uncapped) or single-writer pool against an
// openStorePool opens a reader (generous cap) or single-writer pool against an
// already-schema'd store file. Mirrors the control reader/writer split (New(),
// OpenWriterDB): writes serialize through one connection so they queue in Go
// instead of colliding on SQLite's per-file write lock. The moving tables carry
// no foreign keys post-split, but foreign_keys(1) is harmless and consistent.
//
// The writer cache (cachePragma) is applied only to the single-conn writer pool
// — the uncapped reader pool gets pragmaCommon, so a per-connection cache can't
// multiply across concurrent readers (db.go const doc). cachePragma is the
// budget-divided per-team value (perTeamWriterCachePragma) threaded in by the
// — the reader pool gets pragmaCommon (with a generous FD-safety cap), so a
// per-connection cache can't multiply across concurrent readers (db.go const
// doc). cachePragma is the budget-divided per-team value (perTeamWriterCachePragma) threaded in by the
// registry; an empty string means no extra cache (reader pools pass "").
func openStorePool(path string, writer bool, cachePragma string) (*sql.DB, error) {
dsn := path + "?_pragma=foreign_keys(1)&" + pragmaCommon
Expand All @@ -181,6 +181,9 @@ func openStorePool(path string, writer bool, cachePragma string) (*sql.DB, error
if writer {
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
} else {
db.SetMaxOpenConns(maxReadConns())
db.SetMaxIdleConns(maxReadConns())
}
return db, nil
}
Expand Down
Loading