From 840e9d815c896ae2012ab2d232d7ea1cab5012dd Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Tue, 17 Mar 2026 12:20:26 -0600 Subject: [PATCH 1/4] database: add failover-safe defaults and retry logic for Postgres/AlloyDB During AlloyDB maintenance switchovers (< 1s downtime), services using database/sql with pgx fail to recover because: 1. No connection pool limits are set by default, so dead connections persist indefinitely 2. No retry logic exists for transient connection errors This adds: - DefaultPostgresConnectionsConfig() with MaxLifetime=5m, MaxIdleTime=30s to ensure dead connections are evicted quickly after failover - ApplyPostgresConnectionsConfig() that fills in safe defaults when services don't explicitly configure pool settings - IsRetryablePostgresError() to classify transient PG/network errors - RetryPostgres() for services to wrap critical DB operations Co-Authored-By: Claude Opus 4.6 --- database/database.go | 26 ++++++++- database/database_test.go | 49 +++++++++++++++++ database/model_config.go | 13 +++++ database/postgres.go | 88 +++++++++++++++++++++++++++++++ database/postgres_test.go | 107 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 282 insertions(+), 1 deletion(-) diff --git a/database/database.go b/database/database.go index 67543936..087dd5b8 100644 --- a/database/database.go +++ b/database/database.go @@ -40,7 +40,7 @@ func New(ctx context.Context, logger log.Logger, config DatabaseConfig) (*sql.DB if err != nil { return nil, fmt.Errorf("connecting to postgres: %w", err) } - return ApplyConnectionsConfig(db, &config.Postgres.Connections, logger), nil + return ApplyPostgresConnectionsConfig(db, &config.Postgres.Connections, logger), nil } return nil, ErrMissingConfig @@ -112,3 +112,27 @@ func ApplyConnectionsConfig(db *sql.DB, connections *ConnectionsConfig, logger l return db } + +// ApplyPostgresConnectionsConfig applies connection pool settings with safe defaults +// for Postgres/AlloyDB. If any value in the provided config is zero, the corresponding +// default from DefaultPostgresConnectionsConfig is used. This ensures all services get +// failover-safe pool settings even if they don't explicitly configure them. +func ApplyPostgresConnectionsConfig(db *sql.DB, connections *ConnectionsConfig, logger log.Logger) *sql.DB { + defaults := DefaultPostgresConnectionsConfig() + + applied := *connections + if applied.MaxOpen <= 0 { + applied.MaxOpen = defaults.MaxOpen + } + if applied.MaxIdle <= 0 { + applied.MaxIdle = defaults.MaxIdle + } + if applied.MaxLifetime <= 0 { + applied.MaxLifetime = defaults.MaxLifetime + } + if applied.MaxIdleTime <= 0 { + applied.MaxIdleTime = defaults.MaxIdleTime + } + + return ApplyConnectionsConfig(db, &applied, logger) +} diff --git a/database/database_test.go b/database/database_test.go index a648aa02..abda3ddf 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -5,13 +5,16 @@ package database_test import ( "bytes" + "database/sql" "errors" "os" "testing" + "time" gomysql "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v5/pgconn" "github.com/moov-io/base/database" + "github.com/moov-io/base/log" "github.com/stretchr/testify/require" ) @@ -121,6 +124,52 @@ func TestDataTooLong(t *testing.T) { } } +func TestApplyPostgresConnectionsConfig_Defaults(t *testing.T) { + // When no config values are set, defaults should be applied + db, err := sql.Open("txdb", "TestApplyPostgresConnectionsConfig_Defaults") + if err != nil { + t.Skip("skipping test without txdb driver") + } + defer db.Close() + + connections := &database.ConnectionsConfig{} + database.ApplyPostgresConnectionsConfig(db, connections, log.NewTestLogger()) + + defaults := database.DefaultPostgresConnectionsConfig() + stats := db.Stats() + require.Equal(t, defaults.MaxOpen, stats.MaxOpenConnections) +} + +func TestApplyPostgresConnectionsConfig_Overrides(t *testing.T) { + // When config values are set, they should override defaults + db, err := sql.Open("txdb", "TestApplyPostgresConnectionsConfig_Overrides") + if err != nil { + t.Skip("skipping test without txdb driver") + } + defer db.Close() + + connections := &database.ConnectionsConfig{ + MaxOpen: 10, + MaxIdle: 3, + MaxLifetime: time.Minute, + MaxIdleTime: time.Second * 15, + } + database.ApplyPostgresConnectionsConfig(db, connections, log.NewTestLogger()) + + stats := db.Stats() + require.Equal(t, 10, stats.MaxOpenConnections) +} + +func TestDefaultPostgresConnectionsConfig(t *testing.T) { + defaults := database.DefaultPostgresConnectionsConfig() + require.Greater(t, defaults.MaxOpen, 0) + require.Greater(t, defaults.MaxIdle, 0) + require.Greater(t, defaults.MaxLifetime, time.Duration(0)) + require.Greater(t, defaults.MaxIdleTime, time.Duration(0)) + // MaxIdleTime should be shorter than MaxLifetime + require.Less(t, defaults.MaxIdleTime, defaults.MaxLifetime) +} + func TestConnectionsConfigOrder(t *testing.T) { bs, err := os.ReadFile("database.go") require.NoError(t, err) diff --git a/database/model_config.go b/database/model_config.go index 5703a689..a8bfa367 100644 --- a/database/model_config.go +++ b/database/model_config.go @@ -101,6 +101,19 @@ type ConnectionsConfig struct { MaxIdleTime time.Duration } +// DefaultPostgresConnectionsConfig returns connection pool defaults tuned for +// database failover recovery (e.g. AlloyDB maintenance switchovers). Short +// lifetimes and idle times ensure dead connections are evicted quickly so +// the pool re-establishes connections to the new primary. +func DefaultPostgresConnectionsConfig() ConnectionsConfig { + return ConnectionsConfig{ + MaxOpen: 25, + MaxIdle: 5, + MaxLifetime: 5 * time.Minute, + MaxIdleTime: 30 * time.Second, + } +} + type RetryConfig struct { MaxAttempts int MinDuration time.Duration diff --git a/database/postgres.go b/database/postgres.go index 7c98cdd2..4d7f463f 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -5,8 +5,10 @@ import ( "database/sql" "errors" "fmt" + "io" "net" "strings" + "time" "cloud.google.com/go/alloydbconn" "github.com/jackc/pgx/v5" @@ -164,3 +166,89 @@ func PostgresDeadlockFound(err error) bool { return strings.Contains(err.Error(), postgresErrDeadlockFound) } + +// IsRetryablePostgresError returns true if the error is a transient connection-level +// error that is safe to retry. This covers the errors seen during AlloyDB maintenance +// switchovers and other transient network failures. +func IsRetryablePostgresError(err error) bool { + if err == nil { + return false + } + + // PostgreSQL error codes indicating the server is shutting down or unavailable + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + switch pgErr.Code { + case "57P01": // admin_shutdown + return true + case "57P02": // crash_shutdown + return true + case "57P03": // cannot_connect_now + return true + case "08000": // connection_exception + return true + case "08001": // sqlclient_unable_to_establish_sqlconnection + return true + case "08003": // connection_does_not_exist + return true + case "08004": // sqlserver_rejected_establishment_of_sqlconnection + return true + case "08006": // connection_failure + return true + } + return false + } + + // Network-level errors: connection reset, broken pipe, EOF, etc. + // These occur when the TCP connection is severed during a switchover. + var netErr *net.OpError + if errors.As(err, &netErr) { + return true + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + if errors.Is(err, context.DeadlineExceeded) { + return false // don't retry if the caller's context timed out + } + + // pgx wraps connection errors with these messages + msg := err.Error() + if strings.Contains(msg, "connection reset by peer") || + strings.Contains(msg, "broken pipe") || + strings.Contains(msg, "connection refused") || + strings.Contains(msg, "unexpected EOF") || + strings.Contains(msg, "conn closed") { + return true + } + + return false +} + +// RetryPostgres executes fn up to maxAttempts times, retrying on transient +// connection errors. This is intended for use around individual database +// operations to survive brief outages like AlloyDB maintenance switchovers. +func RetryPostgres(ctx context.Context, maxAttempts int, fn func() error) error { + if maxAttempts <= 0 { + maxAttempts = 3 + } + var err error + for attempt := 0; attempt < maxAttempts; attempt++ { + err = fn() + if err == nil { + return nil + } + if !IsRetryablePostgresError(err) { + return err + } + if attempt < maxAttempts-1 { + backoff := time.Duration(attempt+1) * 200 * time.Millisecond + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + } + } + return err +} diff --git a/database/postgres_test.go b/database/postgres_test.go index 68dcb14d..e9644e1d 100644 --- a/database/postgres_test.go +++ b/database/postgres_test.go @@ -2,11 +2,15 @@ package database_test import ( "context" + "errors" + "io" + "net" "os" "path/filepath" "testing" "time" + "github.com/jackc/pgx/v5/pgconn" "github.com/moov-io/base" "github.com/moov-io/base/database" "github.com/moov-io/base/database/testdb" @@ -180,6 +184,109 @@ func Test_Postgres_Alloy_Migrations(t *testing.T) { defer db.Close() } +func TestIsRetryablePostgresError(t *testing.T) { + // nil error is not retryable + require.False(t, database.IsRetryablePostgresError(nil)) + + // admin_shutdown is retryable (seen during AlloyDB maintenance) + require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P01"})) + + // crash_shutdown is retryable + require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P02"})) + + // cannot_connect_now is retryable + require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P03"})) + + // connection_exception class is retryable + require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "08006"})) + + // unique_violation is NOT retryable (application-level error) + require.False(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "23505"})) + + // syntax_error is NOT retryable + require.False(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "42601"})) + + // EOF is retryable (connection severed) + require.True(t, database.IsRetryablePostgresError(io.EOF)) + require.True(t, database.IsRetryablePostgresError(io.ErrUnexpectedEOF)) + + // net.OpError is retryable + require.True(t, database.IsRetryablePostgresError(&net.OpError{ + Op: "read", + Err: errors.New("connection reset by peer"), + })) + + // context.DeadlineExceeded is NOT retryable + require.False(t, database.IsRetryablePostgresError(context.DeadlineExceeded)) + + // String-matched connection errors + require.True(t, database.IsRetryablePostgresError(errors.New("connection reset by peer"))) + require.True(t, database.IsRetryablePostgresError(errors.New("broken pipe"))) + require.True(t, database.IsRetryablePostgresError(errors.New("conn closed"))) + + // Random application error is NOT retryable + require.False(t, database.IsRetryablePostgresError(errors.New("invalid input"))) +} + +func TestRetryPostgres(t *testing.T) { + t.Run("succeeds on first attempt", func(t *testing.T) { + calls := 0 + err := database.RetryPostgres(context.Background(), 3, func() error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("retries on transient error then succeeds", func(t *testing.T) { + calls := 0 + err := database.RetryPostgres(context.Background(), 3, func() error { + calls++ + if calls < 3 { + return &pgconn.PgError{Code: "57P01"} // admin_shutdown + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 3, calls) + }) + + t.Run("does not retry non-retryable errors", func(t *testing.T) { + calls := 0 + err := database.RetryPostgres(context.Background(), 3, func() error { + calls++ + return &pgconn.PgError{Code: "23505"} // unique_violation + }) + require.Error(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("respects context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + calls := 0 + err := database.RetryPostgres(ctx, 3, func() error { + calls++ + return io.EOF // retryable, but context is done + }) + // First call happens, then context cancellation is detected + require.Error(t, err) + }) + + t.Run("exhausts all attempts", func(t *testing.T) { + calls := 0 + err := database.RetryPostgres(context.Background(), 3, func() error { + calls++ + return io.EOF + }) + require.Error(t, err) + require.Equal(t, 3, calls) + require.ErrorIs(t, err, io.EOF) + }) +} + func Test_Postgres_UniqueViolation(t *testing.T) { if testing.Short() { t.Skip("-short flag enabled") From e7bac197b1d1f83c3ca367b0251aacffbb1b9fc2 Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Tue, 17 Mar 2026 12:29:57 -0600 Subject: [PATCH 2/4] database: use pgxpool with HealthCheckPeriod under the hood MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch from sql.Open("pgx") to pgxpool.NewWithConfig() wrapped with stdlib.OpenDBFromPool(). This gives us pgxpool's HealthCheckPeriod (set to 5s) which proactively pings idle connections and evicts dead ones — the most important fix for surviving AlloyDB maintenance switchovers. The return type remains *sql.DB so no downstream changes are needed. Also cleans up the dialer TODO (dialer lifecycle is now tied to the pool) and removes the unused pgx.ParseConfig import path. Co-Authored-By: Claude Opus 4.6 --- database/postgres.go | 116 +++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/database/postgres.go b/database/postgres.go index 4d7f463f..2b3a628c 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -11,8 +11,8 @@ import ( "time" "cloud.google.com/go/alloydbconn" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/moov-io/base/log" ) @@ -25,67 +25,50 @@ const ( ) func postgresConnection(ctx context.Context, logger log.Logger, config PostgresConfig, databaseName string) (*sql.DB, error) { - var connStr string - if config.Alloy != nil { - c, err := getAlloyDBConnectorConnStr(ctx, config, databaseName) - if err != nil { - return nil, logger.LogErrorf("creating alloydb connection: %w", err).Err() - } - connStr = c - } else { - c, err := getPostgresConnStr(config, databaseName) - if err != nil { - return nil, logger.LogErrorf("creating postgres connection: %w", err).Err() - } - connStr = c + poolConfig, err := buildPgxPoolConfig(ctx, config, databaseName) + if err != nil { + return nil, logger.LogErrorf("building pgx pool config: %w", err).Err() } - db, err := sql.Open("pgx", connStr) + // HealthCheckPeriod makes pgxpool ping idle connections in the background. + // Dead connections (e.g. from an AlloyDB switchover) are evicted before + // the application ever sees them. + poolConfig.HealthCheckPeriod = 5 * time.Second + + pool, err := pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { - return nil, logger.LogErrorf("opening database: %w", err).Err() + return nil, logger.LogErrorf("creating pgx pool: %w", err).Err() } - err = db.Ping() + err = pool.Ping(ctx) if err != nil { - _ = db.Close() + pool.Close() return nil, logger.LogErrorf("connecting to database: %w", err).Err() } + // Wrap the pgxpool in a *sql.DB so the rest of the codebase doesn't change. + // pgxpool manages the real pool (with health checks); database/sql pool + // settings are applied on top via ApplyPostgresConnectionsConfig. + db := stdlib.OpenDBFromPool(pool) + return db, nil } -func getPostgresConnStr(config PostgresConfig, databaseName string) (string, error) { - url := fmt.Sprintf("postgres://%s:%s@%s/%s", config.User, config.Password, config.Address, databaseName) - - params := "" - - if config.TLS != nil { - if len(config.TLS.Mode) < 1 { - config.TLS.Mode = "verify-full" - } - - params += "sslmode=" + config.TLS.Mode - - if len(config.TLS.CACertFile) > 0 { - params += "&sslrootcert=" + config.TLS.CACertFile - } - - if len(config.TLS.ClientCertFile) > 0 { - params += "&sslcert=" + config.TLS.ClientCertFile - } - - if len(config.TLS.ClientKeyFile) > 0 { - params += "&sslkey=" + config.TLS.ClientKeyFile - } +func buildPgxPoolConfig(ctx context.Context, config PostgresConfig, databaseName string) (*pgxpool.Config, error) { + if config.Alloy != nil { + return buildAlloyDBPoolConfig(ctx, config, databaseName) } - connStr := fmt.Sprintf("%s?%s", url, params) - return connStr, nil + connStr, err := getPostgresConnStr(config, databaseName) + if err != nil { + return nil, err + } + return pgxpool.ParseConfig(connStr) } -func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, databaseName string) (string, error) { +func buildAlloyDBPoolConfig(ctx context.Context, config PostgresConfig, databaseName string) (*pgxpool.Config, error) { if config.Alloy == nil { - return "", fmt.Errorf("missing alloy config") + return nil, fmt.Errorf("missing alloy config") } var dialer *alloydbconn.Dialer @@ -94,7 +77,7 @@ func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, data if config.Alloy.UseIAM { d, err := alloydbconn.NewDialer(ctx, alloydbconn.WithIAMAuthN()) if err != nil { - return "", fmt.Errorf("creating alloydb dialer: %v", err) + return nil, fmt.Errorf("creating alloydb dialer: %v", err) } dialer = d dsn = fmt.Sprintf( @@ -106,7 +89,7 @@ func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, data } else { d, err := alloydbconn.NewDialer(ctx) if err != nil { - return "", fmt.Errorf("creating alloydb dialer: %v", err) + return nil, fmt.Errorf("creating alloydb dialer: %v", err) } dialer = d dsn = fmt.Sprintf( @@ -116,12 +99,9 @@ func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, data ) } - // TODO - //cleanup := func() error { return d.Close() } - - connConfig, err := pgx.ParseConfig(dsn) + poolConfig, err := pgxpool.ParseConfig(dsn) if err != nil { - return "", fmt.Errorf("failed to parse pgx config: %v", err) + return nil, fmt.Errorf("failed to parse pgx pool config: %v", err) } var connOptions []alloydbconn.DialOption @@ -129,11 +109,39 @@ func getAlloyDBConnectorConnStr(ctx context.Context, config PostgresConfig, data connOptions = append(connOptions, alloydbconn.WithPSC()) } - connConfig.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) { + poolConfig.ConnConfig.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) { return dialer.Dial(ctx, config.Alloy.InstanceURI, connOptions...) } - connStr := stdlib.RegisterConnConfig(connConfig) + return poolConfig, nil +} + +func getPostgresConnStr(config PostgresConfig, databaseName string) (string, error) { + url := fmt.Sprintf("postgres://%s:%s@%s/%s", config.User, config.Password, config.Address, databaseName) + + params := "" + + if config.TLS != nil { + if len(config.TLS.Mode) < 1 { + config.TLS.Mode = "verify-full" + } + + params += "sslmode=" + config.TLS.Mode + + if len(config.TLS.CACertFile) > 0 { + params += "&sslrootcert=" + config.TLS.CACertFile + } + + if len(config.TLS.ClientCertFile) > 0 { + params += "&sslcert=" + config.TLS.ClientCertFile + } + + if len(config.TLS.ClientKeyFile) > 0 { + params += "&sslkey=" + config.TLS.ClientKeyFile + } + } + + connStr := fmt.Sprintf("%s?%s", url, params) return connStr, nil } From 9322e7b78c1079fb4754730a70aee947ae88afb7 Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Tue, 17 Mar 2026 12:31:25 -0600 Subject: [PATCH 3/4] database: lower HealthCheckPeriod from 5s to 1s For sub-second AlloyDB switchovers, 1s health checks detect and evict dead connections faster with negligible overhead. Co-Authored-By: Claude Opus 4.6 --- database/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/postgres.go b/database/postgres.go index 2b3a628c..09d15c8e 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -33,7 +33,7 @@ func postgresConnection(ctx context.Context, logger log.Logger, config PostgresC // HealthCheckPeriod makes pgxpool ping idle connections in the background. // Dead connections (e.g. from an AlloyDB switchover) are evicted before // the application ever sees them. - poolConfig.HealthCheckPeriod = 5 * time.Second + poolConfig.HealthCheckPeriod = 1 * time.Second pool, err := pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { From 22a1b6fe1f3e3cd611c95fb63ae20a3cc439bed1 Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Tue, 17 Mar 2026 13:10:23 -0600 Subject: [PATCH 4/4] database: address review feedback from Gemini - Collapse switch cases in IsRetryablePostgresError for readability - Make context cancellation test more specific (assert context.Canceled and exact call count) Co-Authored-By: Claude Opus 4.6 --- database/postgres.go | 16 ++-------------- database/postgres_test.go | 3 ++- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/database/postgres.go b/database/postgres.go index 09d15c8e..f9be0d9f 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -187,21 +187,9 @@ func IsRetryablePostgresError(err error) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) { switch pgErr.Code { - case "57P01": // admin_shutdown + case "57P01", "57P02", "57P03": // admin_shutdown, crash_shutdown, cannot_connect_now return true - case "57P02": // crash_shutdown - return true - case "57P03": // cannot_connect_now - return true - case "08000": // connection_exception - return true - case "08001": // sqlclient_unable_to_establish_sqlconnection - return true - case "08003": // connection_does_not_exist - return true - case "08004": // sqlserver_rejected_establishment_of_sqlconnection - return true - case "08006": // connection_failure + case "08000", "08001", "08003", "08004", "08006": // connection_exception class return true } return false diff --git a/database/postgres_test.go b/database/postgres_test.go index e9644e1d..2edcde38 100644 --- a/database/postgres_test.go +++ b/database/postgres_test.go @@ -272,7 +272,8 @@ func TestRetryPostgres(t *testing.T) { return io.EOF // retryable, but context is done }) // First call happens, then context cancellation is detected - require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 1, calls) }) t.Run("exhausts all attempts", func(t *testing.T) {