Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions internal/web/runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ func (s *Server) runSeed(ctx context.Context, sess *Session, req SeedRequest, jc
log.Info().Str("order", strings.Join(targetTables, " → ")).Msg("Seed order resolved")

conn := sess.Conn()
if !req.DryRun {
runConn, err := sess.OpenRunConn(ctx)
if err != nil {
return nil, err
}
defer runConn.Close()
conn = runConn
}

if req.Truncate && !req.DryRun {
jc.Phase("truncate")
Expand Down Expand Up @@ -216,6 +224,14 @@ func (s *Server) runGaps(ctx context.Context, sess *Session, req GapsRequest, jc
}

conn := sess.Conn()
if req.Fill && !req.DryRun {
runConn, err := sess.OpenRunConn(ctx)
if err != nil {
return nil, err
}
defer runConn.Close()
conn = runConn
}
jc.Phase("scan")
log.Info().Int("tables", len(allSorted)).Msg("Scanning row counts")
counts, err := db.GetTableRowCounts(ctx, conn, sess.DBType, allSorted)
Expand Down
115 changes: 115 additions & 0 deletions internal/web/runners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package web

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"io"
"reflect"
"strings"
"sync"
"testing"

"github.com/AxeForging/seedstorm/internal/schema"
Expand Down Expand Up @@ -153,6 +158,46 @@ func TestRunSeedDryRunHandlesHardSelfReference(t *testing.T) {
}
}

func TestRunSeedUsesFreshConnectionForMutatingServeJob(t *testing.T) {
registerServeRunnerTestDriver()
staleConn, err := sql.Open(serveRunnerTestDriverName, "stale")
if err != nil {
t.Fatalf("open stale conn: %v", err)
}
defer staleConn.Close()

oldOpen := sqlOpen
sqlOpen = func(driverName, dataSourceName string) (*sql.DB, error) {
if driverName != "pgx" || dataSourceName != "fresh" {
t.Fatalf("sqlOpen called with %s %s, want pgx fresh", driverName, dataSourceName)
}
return sql.Open(serveRunnerTestDriverName, "fresh")
}
defer func() { sqlOpen = oldOpen }()

srv, err := New(Options{Addr: "127.0.0.1:0"})
if err != nil {
t.Fatalf("New: %v", err)
}
sess := &Session{
DBType: "pgx",
DSN: "fresh",
conn: staleConn,
schema: enumInsertSchema(),
}

result, err := srv.runSeed(context.Background(), sess, SeedRequest{
Rows: 1,
BatchSize: 100,
}, testJobControl{})
if err != nil {
t.Fatalf("runSeed should use fresh run connection instead of stale session connection: %v", err)
}
if got := result["totalRows"]; got != 2 {
t.Fatalf("totalRows = %v, want 2", got)
}
}

func TestRunGenerateHandlesHardSelfReference(t *testing.T) {
srv, err := New(Options{Addr: "127.0.0.1:0"})
if err != nil {
Expand Down Expand Up @@ -190,6 +235,63 @@ func containsAll(value string, parts ...string) bool {
return true
}

const serveRunnerTestDriverName = "seedstorm_web_runner_test"

var registerServeRunnerDriverOnce sync.Once

func registerServeRunnerTestDriver() {
registerServeRunnerDriverOnce.Do(func() {
sql.Register(serveRunnerTestDriverName, serveRunnerTestDriver{})
})
}

type serveRunnerTestDriver struct{}

func (serveRunnerTestDriver) Open(name string) (driver.Conn, error) {
return &serveRunnerTestConn{name: name}, nil
}

type serveRunnerTestConn struct {
name string
}

func (c *serveRunnerTestConn) Prepare(string) (driver.Stmt, error) {
return nil, errors.New("prepare not implemented")
}

func (c *serveRunnerTestConn) Close() error { return nil }

func (c *serveRunnerTestConn) Begin() (driver.Tx, error) {
return nil, errors.New("transactions not implemented")
}

func (c *serveRunnerTestConn) Ping(context.Context) error { return nil }

func (c *serveRunnerTestConn) Query(query string, args []driver.Value) (driver.Rows, error) {
return &serveRunnerRows{columns: []string{"id"}}, nil
}

func (c *serveRunnerTestConn) QueryContext(context.Context, string, []driver.NamedValue) (driver.Rows, error) {
return &serveRunnerRows{columns: []string{"id"}}, nil
}

func (c *serveRunnerTestConn) ExecContext(_ context.Context, query string, _ []driver.NamedValue) (driver.Result, error) {
if c.name == "stale" && strings.Contains(query, "INSERT") {
return nil, errors.New("cache lookup failed for type 34868 (SQLSTATE XX000)")
}
return driver.RowsAffected(1), nil
}

type serveRunnerRows struct {
columns []string
}

func (r *serveRunnerRows) Columns() []string { return r.columns }
func (r *serveRunnerRows) Close() error { return nil }
func (r *serveRunnerRows) Next([]driver.Value) error {
return io.EOF
}

func runnerRowCountSchema() *schema.Schema {
return &schema.Schema{
Tables: map[string]schema.Table{
Expand All @@ -209,6 +311,19 @@ func runnerRowCountSchema() *schema.Schema {
}
}

func enumInsertSchema() *schema.Schema {
return &schema.Schema{
Tables: map[string]schema.Table{
"purchase_orders": {
Columns: map[string]schema.Column{
"id": {Type: "integer", PK: true},
"status": {Type: "po_status", Faker: "randomstring(draft,submitted)"},
},
},
},
}
}

func hardSelfReferenceSchema() *schema.Schema {
return &schema.Schema{
Tables: map[string]schema.Table{
Expand Down
17 changes: 17 additions & 0 deletions internal/web/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const sessionCookieName = "seedstorm_session"

var sqlOpen = sql.Open

// ConnectionInfo is the non-secret view of an active connection, safe to
// surface in templates and logs.
type ConnectionInfo struct {
Expand Down Expand Up @@ -138,6 +140,21 @@ func (r *SessionRegistry) Close(id string) {
// Conn returns the live *sql.DB.
func (s *Session) Conn() *sql.DB { return s.conn }

// OpenRunConn opens a short-lived database handle for mutating background jobs.
// Long-lived Serve sessions can outlive DDL resets that recreate enum/user
// types; a fresh handle avoids stale driver-side type metadata during inserts.
func (s *Session) OpenRunConn(ctx context.Context) (*sql.DB, error) {
conn, err := sqlOpen(s.DBType, s.DSN)
if err != nil {
return nil, fmt.Errorf("open run connection: %w", err)
}
if err := conn.PingContext(ctx); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("ping run connection: %w", err)
}
return conn, nil
}

// Schema returns the cached schema, introspecting if needed.
func (s *Session) Schema(force bool) (*schema.Schema, error) {
s.mu.Lock()
Expand Down
Loading