From cbe6c8c1034cf804aef93d8d77a0e098d5ebda4c Mon Sep 17 00:00:00 2001 From: Lucas Machado Date: Wed, 27 May 2026 23:22:43 +0200 Subject: [PATCH] fix: avoid stale serve db connections --- internal/web/runners.go | 16 +++++ internal/web/runners_test.go | 115 +++++++++++++++++++++++++++++++++++ internal/web/session.go | 17 ++++++ 3 files changed, 148 insertions(+) diff --git a/internal/web/runners.go b/internal/web/runners.go index 2b6e2dd..ac180a9 100644 --- a/internal/web/runners.go +++ b/internal/web/runners.go @@ -103,6 +103,14 @@ func (s *Server) runSeed(ctx context.Context, sess *Session, req SeedRequest, jc log.Info().Str("order", strings.Join(targetTables, " → ")).Msg("Seed order resolved") conn := sess.Conn() + if !req.DryRun { + runConn, err := sess.OpenRunConn(ctx) + if err != nil { + return nil, err + } + defer runConn.Close() + conn = runConn + } if req.Truncate && !req.DryRun { jc.Phase("truncate") @@ -216,6 +224,14 @@ func (s *Server) runGaps(ctx context.Context, sess *Session, req GapsRequest, jc } conn := sess.Conn() + if req.Fill && !req.DryRun { + runConn, err := sess.OpenRunConn(ctx) + if err != nil { + return nil, err + } + defer runConn.Close() + conn = runConn + } jc.Phase("scan") log.Info().Int("tables", len(allSorted)).Msg("Scanning row counts") counts, err := db.GetTableRowCounts(ctx, conn, sess.DBType, allSorted) diff --git a/internal/web/runners_test.go b/internal/web/runners_test.go index 6a70c99..737569f 100644 --- a/internal/web/runners_test.go +++ b/internal/web/runners_test.go @@ -2,8 +2,13 @@ package web import ( "context" + "database/sql" + "database/sql/driver" + "errors" + "io" "reflect" "strings" + "sync" "testing" "github.com/AxeForging/seedstorm/internal/schema" @@ -153,6 +158,46 @@ func TestRunSeedDryRunHandlesHardSelfReference(t *testing.T) { } } +func TestRunSeedUsesFreshConnectionForMutatingServeJob(t *testing.T) { + registerServeRunnerTestDriver() + staleConn, err := sql.Open(serveRunnerTestDriverName, "stale") + if err != nil { + t.Fatalf("open stale conn: %v", err) + } + defer staleConn.Close() + + oldOpen := sqlOpen + sqlOpen = func(driverName, dataSourceName string) (*sql.DB, error) { + if driverName != "pgx" || dataSourceName != "fresh" { + t.Fatalf("sqlOpen called with %s %s, want pgx fresh", driverName, dataSourceName) + } + return sql.Open(serveRunnerTestDriverName, "fresh") + } + defer func() { sqlOpen = oldOpen }() + + srv, err := New(Options{Addr: "127.0.0.1:0"}) + if err != nil { + t.Fatalf("New: %v", err) + } + sess := &Session{ + DBType: "pgx", + DSN: "fresh", + conn: staleConn, + schema: enumInsertSchema(), + } + + result, err := srv.runSeed(context.Background(), sess, SeedRequest{ + Rows: 1, + BatchSize: 100, + }, testJobControl{}) + if err != nil { + t.Fatalf("runSeed should use fresh run connection instead of stale session connection: %v", err) + } + if got := result["totalRows"]; got != 2 { + t.Fatalf("totalRows = %v, want 2", got) + } +} + func TestRunGenerateHandlesHardSelfReference(t *testing.T) { srv, err := New(Options{Addr: "127.0.0.1:0"}) if err != nil { @@ -190,6 +235,63 @@ func containsAll(value string, parts ...string) bool { return true } +const serveRunnerTestDriverName = "seedstorm_web_runner_test" + +var registerServeRunnerDriverOnce sync.Once + +func registerServeRunnerTestDriver() { + registerServeRunnerDriverOnce.Do(func() { + sql.Register(serveRunnerTestDriverName, serveRunnerTestDriver{}) + }) +} + +type serveRunnerTestDriver struct{} + +func (serveRunnerTestDriver) Open(name string) (driver.Conn, error) { + return &serveRunnerTestConn{name: name}, nil +} + +type serveRunnerTestConn struct { + name string +} + +func (c *serveRunnerTestConn) Prepare(string) (driver.Stmt, error) { + return nil, errors.New("prepare not implemented") +} + +func (c *serveRunnerTestConn) Close() error { return nil } + +func (c *serveRunnerTestConn) Begin() (driver.Tx, error) { + return nil, errors.New("transactions not implemented") +} + +func (c *serveRunnerTestConn) Ping(context.Context) error { return nil } + +func (c *serveRunnerTestConn) Query(query string, args []driver.Value) (driver.Rows, error) { + return &serveRunnerRows{columns: []string{"id"}}, nil +} + +func (c *serveRunnerTestConn) QueryContext(context.Context, string, []driver.NamedValue) (driver.Rows, error) { + return &serveRunnerRows{columns: []string{"id"}}, nil +} + +func (c *serveRunnerTestConn) ExecContext(_ context.Context, query string, _ []driver.NamedValue) (driver.Result, error) { + if c.name == "stale" && strings.Contains(query, "INSERT") { + return nil, errors.New("cache lookup failed for type 34868 (SQLSTATE XX000)") + } + return driver.RowsAffected(1), nil +} + +type serveRunnerRows struct { + columns []string +} + +func (r *serveRunnerRows) Columns() []string { return r.columns } +func (r *serveRunnerRows) Close() error { return nil } +func (r *serveRunnerRows) Next([]driver.Value) error { + return io.EOF +} + func runnerRowCountSchema() *schema.Schema { return &schema.Schema{ Tables: map[string]schema.Table{ @@ -209,6 +311,19 @@ func runnerRowCountSchema() *schema.Schema { } } +func enumInsertSchema() *schema.Schema { + return &schema.Schema{ + Tables: map[string]schema.Table{ + "purchase_orders": { + Columns: map[string]schema.Column{ + "id": {Type: "integer", PK: true}, + "status": {Type: "po_status", Faker: "randomstring(draft,submitted)"}, + }, + }, + }, + } +} + func hardSelfReferenceSchema() *schema.Schema { return &schema.Schema{ Tables: map[string]schema.Table{ diff --git a/internal/web/session.go b/internal/web/session.go index 2556ccf..5ce929b 100644 --- a/internal/web/session.go +++ b/internal/web/session.go @@ -18,6 +18,8 @@ import ( const sessionCookieName = "seedstorm_session" +var sqlOpen = sql.Open + // ConnectionInfo is the non-secret view of an active connection, safe to // surface in templates and logs. type ConnectionInfo struct { @@ -138,6 +140,21 @@ func (r *SessionRegistry) Close(id string) { // Conn returns the live *sql.DB. func (s *Session) Conn() *sql.DB { return s.conn } +// OpenRunConn opens a short-lived database handle for mutating background jobs. +// Long-lived Serve sessions can outlive DDL resets that recreate enum/user +// types; a fresh handle avoids stale driver-side type metadata during inserts. +func (s *Session) OpenRunConn(ctx context.Context) (*sql.DB, error) { + conn, err := sqlOpen(s.DBType, s.DSN) + if err != nil { + return nil, fmt.Errorf("open run connection: %w", err) + } + if err := conn.PingContext(ctx); err != nil { + _ = conn.Close() + return nil, fmt.Errorf("ping run connection: %w", err) + } + return conn, nil +} + // Schema returns the cached schema, introspecting if needed. func (s *Session) Schema(force bool) (*schema.Schema, error) { s.mu.Lock()