Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
steps:
- run: |
docker create --name cockroach -p 26257:26257 \
cockroachdb/cockroach:latest-v25.2 start-single-node --insecure \
cockroachdb/cockroach:latest-v25.3 start-single-node --insecure \
|| true
docker start cockroach
name: Start CockroachDB
Expand Down Expand Up @@ -141,7 +141,7 @@ jobs:
node-version: "22"
- run: |
docker create --name cockroach -p 26257:26257 \
cockroachdb/cockroach:latest-v25.2 start-single-node --insecure
cockroachdb/cockroach:latest-v25.3 start-single-node --insecure
docker start cockroach
name: Start CockroachDB
- uses: browser-actions/setup-chrome@latest
Expand Down Expand Up @@ -253,7 +253,7 @@ jobs:
node-version: "22"
- run: |
docker create --name cockroach -p 26257:26257 \
cockroachdb/cockroach:latest-v25.2 start-single-node --insecure
cockroachdb/cockroach:latest-v25.3 start-single-node --insecure
docker start cockroach
name: Start CockroachDB
- uses: ory/ci/checkout@master
Expand Down
2 changes: 1 addition & 1 deletion oryx/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ resetdb:
docker rm -f hydra_test_database_cockroach || true
docker run --rm --name hydra_test_database_mysql -p 3444:3306 -e MYSQL_ROOT_PASSWORD=secret -d mysql:8.0
docker run --rm --name hydra_test_database_postgres -p 3445:5432 -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=hydra -d postgres:11.8
docker run --rm --name hydra_test_database_cockroach -p 3446:26257 -d cockroachdb/cockroach:latest-v25.2 start-single-node --insecure
docker run --rm --name hydra_test_database_cockroach -p 3446:26257 -d cockroachdb/cockroach:latest-v25.3 start-single-node --insecure

.PHONY: lint
lint: .bin/golangci-lint
Expand Down
64 changes: 26 additions & 38 deletions oryx/popx/migration_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package popx

import (
"database/sql"
"fmt"
"io/fs"
"path"
Expand Down Expand Up @@ -115,17 +116,15 @@ func WithTestdata(t *testing.T, testdata fs.FS) MigrationBoxOption {
DBType: flavor,
Direction: "up",
Type: "sql",
Runner: func(m Migration, _ *pop.Connection, tx *pop.Tx) error {
Runner: func(m Migration, c *pop.Connection) error {
b, err := fs.ReadFile(testdata, m.Path)
if err != nil {
return err
}
if isMigrationEmpty(string(b)) {
return nil
}
_, err = tx.Exec(string(b))
//match := match
//t.Logf("Ran test migration \"%s\" (%s, %+v) with error \"%v\" and content:\n %s", m.Path, m.DBType, match, err, string(b))
_, err = c.Store.SQLDB().Exec(string(b))
return err
},
})
Expand All @@ -137,7 +136,7 @@ func WithTestdata(t *testing.T, testdata fs.FS) MigrationBoxOption {
DBType: flavor,
Direction: "down",
Type: "sql",
Runner: func(m Migration, _ *pop.Connection, tx *pop.Tx) error { return nil },
Runner: func(m Migration, _ *pop.Connection) error { return nil },
})

return nil
Expand All @@ -151,6 +150,10 @@ func isMigrationEmpty(content string) bool {
return len(strings.ReplaceAll(emptySQLReplace.ReplaceAllString(content, ""), "\n", "")) == 0
}

type queryExecutor interface {
Exec(query string, args ...any) (sql.Result, error)
}

// NewMigrationBox creates a new migration box.
func NewMigrationBox(dir fs.FS, c *pop.Connection, l *logrusx.Logger, opts ...MigrationBoxOption) (*MigrationBox, error) {
mb := &MigrationBox{
Expand All @@ -163,8 +166,8 @@ func NewMigrationBox(dir fs.FS, c *pop.Connection, l *logrusx.Logger, opts ...Mi
o(mb)
}

txRunner := func(b []byte) func(Migration, *pop.Connection, *pop.Tx) error {
return func(mf Migration, c *pop.Connection, tx *pop.Tx) error {
txRunner := func(b []byte) func(Migration, *pop.Connection) error {
return func(mf Migration, c *pop.Connection) error {
content, err := mb.migrationContent(mf, c, b, true)
if err != nil {
return errors.Wrapf(err, "error processing %s", mf.Path)
Expand All @@ -173,31 +176,20 @@ func NewMigrationBox(dir fs.FS, c *pop.Connection, l *logrusx.Logger, opts ...Mi
l.WithField("migration", mf.Path).Trace("This is usually ok - ignoring migration because content is empty. This is ok!")
return nil
}
if _, err = tx.Exec(content); err != nil {
return errors.Wrapf(err, "error executing %s, sql: %s", mf.Path, content)
}
return nil
}
}

autoCommitRunner := func(b []byte) func(Migration, *pop.Connection) error {
return func(mf Migration, c *pop.Connection) error {
content, err := mb.migrationContent(mf, c, b, true)
if err != nil {
return errors.Wrapf(err, "error processing %s", mf.Path)
var q queryExecutor = c.Store.SQLDB()
if c.TX != nil {
q = c.TX
}
if isMigrationEmpty(content) {
l.WithField("migration", mf.Path).Trace("This is usually ok - ignoring migration because content is empty. This is ok!")
return nil
}
if _, err = c.RawQuery(content).ExecWithCount(); err != nil {

if _, err = q.Exec(content); err != nil {
return errors.Wrapf(err, "error executing %s, sql: %s", mf.Path, content)
}
return nil
}
}

err := mb.findMigrations(dir, txRunner, autoCommitRunner)
err := mb.findMigrations(dir, txRunner)
if err != nil {
return mb, err
}
Expand All @@ -210,8 +202,7 @@ func NewMigrationBox(dir fs.FS, c *pop.Connection, l *logrusx.Logger, opts ...Mi

func (mb *MigrationBox) findMigrations(
dir fs.FS,
runner func([]byte) func(m Migration, c *pop.Connection, tx *pop.Tx) error,
runnerNoTx func([]byte) func(m Migration, c *pop.Connection) error,
runner func([]byte) func(m Migration, c *pop.Connection) error,
) error {
err := fs.WalkDir(dir, ".", func(p string, info fs.DirEntry, err error) error {
if err != nil {
Expand Down Expand Up @@ -247,20 +238,17 @@ func (mb *MigrationBox) findMigrations(
}

mf := Migration{
Path: p,
Version: details.Version,
Name: details.Name,
DBType: details.DBType,
Direction: details.Direction,
Type: details.Type,
Content: string(content),
Path: p,
Version: details.Version,
Name: details.Name,
DBType: details.DBType,
Direction: details.Direction,
Type: details.Type,
Content: string(content),
Autocommit: details.Autocommit,
}

if details.Autocommit {
mf.RunnerNoTx = runnerNoTx(content)
} else {
mf.Runner = runner(content)
}
mf.Runner = runner(content)

switch details.Direction {
case "up":
Expand Down
13 changes: 5 additions & 8 deletions oryx/popx/migration_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,18 @@ type Migration struct {
DBType string
// Runner function to run/execute the migration. Will be wrapped in a
// database transaction. Mutually exclusive with RunnerNoTx
Runner func(Migration, *pop.Connection, *pop.Tx) error
// RunnerNoTx function to run/execute the migration. NOT wrapped in a
// database transaction. Mutually exclusive with Runner.
RunnerNoTx func(Migration, *pop.Connection) error
Runner func(Migration, *pop.Connection) error
// Content is the raw content of the migration file
Content string
// Autocommit indicates whether the migration should be run in autocommit mode
Autocommit bool
}

func (m Migration) Valid() error {
if m.Runner == nil && m.RunnerNoTx == nil {
if m.Runner == nil {
return errors.Errorf("no runner defined for %s", m.Path)
}
if m.Runner != nil && m.RunnerNoTx != nil {
return errors.Errorf("incompatible transaction and non-transaction runners defined for %s", m.Path)
}

return nil
}

Expand Down
112 changes: 53 additions & 59 deletions oryx/popx/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package popx

import (
"context"
"database/sql"
"fmt"
"math"
"os"
Expand All @@ -31,6 +30,10 @@ const (
tracingComponent = "github.com/ory/x/popx"
)

func (mb *MigrationBox) shouldNotUseTransaction(m Migration) bool {
return m.Autocommit || mb.c.Dialect.Name() == "cockroach" || mb.c.Dialect.Name() == "mysql"
}

// Up runs pending "up" migrations and applies them to the database.
func (mb *MigrationBox) Up(ctx context.Context) error {
_, err := mb.UpTo(ctx, 0)
Expand Down Expand Up @@ -91,9 +94,19 @@ func (mb *MigrationBox) UpTo(ctx context.Context, step int) (applied int, err er
return err
}

if mi.Runner != nil {
err := mb.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error {
if err := mi.Runner(mi, conn, conn.TX); err != nil {
noTx := mb.shouldNotUseTransaction(mi)
if noTx {
if err := mi.Runner(mi, c); err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err := c.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem inserting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}
} else {
if err := mb.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error {
if err := mi.Runner(mi, conn); err != nil {
return err
}

Expand All @@ -102,23 +115,12 @@ func (mb *MigrationBox) UpTo(ctx context.Context, step int) (applied int, err er
return errors.Wrapf(err, "problem inserting migration version %s", mi.Version)
}
return nil
})
if err != nil {
return err
}
} else {
l.Warn("Migration has requested running outside a transaction. Proceed with caution.")
if err := mi.RunnerNoTx(mi, c); err != nil {
}); err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err := c.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem inserting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}
}

l.Infof("> %s applied successfully", mi.Name)
l.WithField("autocommit", noTx).Infof("> %s applied successfully", mi.Name)
applied++
if step > 0 && applied >= step {
break
Expand Down Expand Up @@ -195,9 +197,19 @@ func (mb *MigrationBox) Down(ctx context.Context, steps int) (err error) {
return err
}

if mi.Runner != nil {
err := mb.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error {
err := mi.Runner(mi, conn, conn.TX)
if mb.shouldNotUseTransaction(mi) {
err := mi.Runner(mi, c)
if err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err := c.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem deleting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}
} else {
if err := mb.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error {
err := mi.Runner(mi, conn)
if err != nil {
return err
}
Expand All @@ -208,20 +220,9 @@ func (mb *MigrationBox) Down(ctx context.Context, steps int) (err error) {
}

return nil
})
if err != nil {
return err
}
} else {
err := mi.RunnerNoTx(mi, c)
if err != nil {
}); err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err := c.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem deleting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}
}

l.Infof("< %s applied successfully", mi.Name)
Expand All @@ -234,7 +235,7 @@ func (mb *MigrationBox) Down(ctx context.Context, steps int) (err error) {
func (mb *MigrationBox) createTransactionalMigrationTable(ctx context.Context, c *pop.Connection, l *logrusx.Logger) error {
mtn := sanitizedMigrationTableName(c)

if err := mb.execMigrationTransaction(ctx, []string{
if err := mb.createMigrationStatusTableTransaction(ctx, []string{
fmt.Sprintf(`CREATE TABLE %s (version VARCHAR (48) NOT NULL, version_self INT NOT NULL DEFAULT 0)`, mtn),
fmt.Sprintf(`CREATE UNIQUE INDEX %s_version_idx ON %s (version)`, mtn, mtn),
fmt.Sprintf(`CREATE INDEX %s_version_self_idx ON %s (version_self)`, mtn, mtn),
Expand Down Expand Up @@ -272,7 +273,7 @@ func (mb *MigrationBox) migrateToTransactionalMigrationTable(ctx context.Context
},
}

if err := mb.execMigrationTransaction(ctx, workload...); err != nil {
if err := mb.createMigrationStatusTableTransaction(ctx, workload...); err != nil {
return err
}

Expand All @@ -291,39 +292,32 @@ func (mb *MigrationBox) isolatedTransaction(ctx context.Context, direction strin
defer cancel()
}

conn, dberr := mb.c.NewTransactionContextOptions(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: false,
return Transaction(ctx, mb.c.WithContext(ctx), func(ctx context.Context, connection *pop.Connection) error {
return fn(connection)
})
if dberr != nil {
return dberr
}

err = fn(conn)
if err != nil {
dberr = conn.TX.Rollback()
} else {
dberr = conn.TX.Commit()
}

if dberr != nil {
return errors.Wrapf(dberr, "error committing or rolling back transaction; original error: %v", err)
}

return err
}

func (mb *MigrationBox) execMigrationTransaction(ctx context.Context, transactions ...[]string) error {
func (mb *MigrationBox) createMigrationStatusTableTransaction(ctx context.Context, transactions ...[]string) error {
for _, statements := range transactions {
if err := mb.isolatedTransaction(ctx, "init", func(conn *pop.Connection) error {
// CockroachDB does not support transactional schema changes, so we have to run
// the statements outside of a transaction.
if mb.c.Dialect.Name() == "cockroach" || mb.c.Dialect.Name() == "mysql" {
for _, statement := range statements {
if _, err := conn.TX.ExecContext(ctx, statement); err != nil {
if err := mb.c.WithContext(ctx).RawQuery(statement).Exec(); err != nil {
return errors.Wrapf(err, "unable to execute statement: %s", statement)
}
}
return nil
}); err != nil {
return err
} else {
if err := mb.isolatedTransaction(ctx, "init", func(conn *pop.Connection) error {
for _, statement := range statements {
if err := conn.WithContext(ctx).RawQuery(statement).Exec(); err != nil {
return errors.Wrapf(err, "unable to execute statement: %s", statement)
}
}
return nil
}); err != nil {
return err
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion oryx/sqlcon/dockertest/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func ConnectToTestMySQLPop(t testing.TB) *pop.Connection {
func startCockroachDB(version string) (*dockertest.Resource, error) {
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "cockroachdb/cockroach",
Tag: stringsx.Coalesce(version, "latest-v24.2"),
Tag: stringsx.Coalesce(version, "latest-v25.3"),
Cmd: []string{"start-single-node", "--insecure"},
})
if err == nil {
Expand Down
Loading