Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eb60db0
dont use background ctx
zacharysierakowski Jun 12, 2026
006f9ce
added a bit more detail to the move-tables readme in case its helpful…
zacharysierakowski Jun 12, 2026
2bb2d54
Starting point for checkpoints / crash safe resume
zacharysierakowski Jun 12, 2026
ee8974a
conditionally change the checkpoints table
zacharysierakowski Jun 12, 2026
e850f0a
new col renames
zacharysierakowski Jun 12, 2026
c57d247
fmt
zacharysierakowski Jun 12, 2026
8e49eb7
don't change ReadLastCheckpoint, it's not called from move-tables mode
zacharysierakowski Jun 12, 2026
0e3de73
DrainGTID -> MoveTablesCutoverDrainGTID
zacharysierakowski Jun 12, 2026
f032927
casing
zacharysierakowski Jun 12, 2026
3f4059a
oof it's the weekend. stashing
zacharysierakowski Jun 12, 2026
30ecb11
Merge branch 'feature-move-tables' into move-tables-1.6-crash-safe-re…
zacharysierakowski Jun 15, 2026
2679312
stashing some changes. need a new codespace
zacharysierakowski Jun 17, 2026
169f377
more inserts at once
zacharysierakowski Jun 17, 2026
96aab08
prune on teardown
zacharysierakowski Jun 17, 2026
48e6920
always set --checkpoint and --gtid on --move-tables even if not user …
zacharysierakowski Jun 17, 2026
b7c087d
cutover protocol changes
zacharysierakowski Jun 17, 2026
b46fc5a
lint
zacharysierakowski Jun 17, 2026
0112669
Potential fix for pull request finding
zacharysierakowski Jun 17, 2026
4a5a826
Potential fix for pull request finding
zacharysierakowski Jun 17, 2026
692e834
Potential fix for pull request finding
zacharysierakowski Jun 17, 2026
5a6932e
fmt
zacharysierakowski Jun 17, 2026
4396b16
add test (and fix) to prove race with apply queue empty but not applied
zacharysierakowski Jun 18, 2026
396ae07
Merge branch 'feature-move-tables' into move-tables-1.6-crash-safe-re…
zacharysierakowski Jun 18, 2026
f5273fa
missing throttler initialization in move-tables resume
zacharysierakowski Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ func main() {
if migrationContext.PostponeCutOverFlagFile == "" {
log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables")
}
if !migrationContext.Checkpoint {
log.Infof("--move-tables requires checkpointing; enabling --checkpoint")
migrationContext.Checkpoint = true
}
if !migrationContext.UseGTIDs {
log.Infof("--move-tables requires GTID coordinates for cutover drain checks; enabling --gtid")
migrationContext.UseGTIDs = true
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to not fail the request without these, and just enable them. Could also change it to a Fatal and force the operator to rerun with the required flags. But that feels kinda silly if they are always required.

migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",")
for i := range migrationContext.MoveTables.TableNames {
migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i])
Expand Down
126 changes: 117 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

func (apl *Applier) checkpointDB() *gosql.DB {
if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil {
return apl.moveTablesTargetDB
}
return apl.db
}

func (apl *Applier) checkpointDatabaseName() string {
if apl.migrationContext.IsMoveTablesMode() {
return apl.migrationContext.GetTargetDatabaseName()
}
return apl.migrationContext.DatabaseName
}

func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string {
if chk == nil || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() {
return ""
}
return chk.MoveTablesCutOverDrainGTID.String()
}

func (apl *Applier) checkpointRangeColumnNames() (minColumnNames []string, maxColumnNames []string) {
for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() {
minColumnNames = append(minColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_min")
maxColumnNames = append(maxColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_max")
}
return minColumnNames, maxColumnNames
}

// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
Expand Down Expand Up @@ -358,9 +387,10 @@ func (apl *Applier) prepareQueries() (err error) {
}
if apl.migrationContext.Checkpoint {
if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder(
apl.migrationContext.DatabaseName,
apl.checkpointDatabaseName(),
apl.migrationContext.GetCheckpointTableName(),
&apl.migrationContext.UniqueKey.Columns,
apl.migrationContext.IsMoveTablesMode(),
); err != nil {
return err
}
Expand Down Expand Up @@ -781,6 +811,12 @@ func (apl *Applier) CreateCheckpointTable() error {
"`gh_ost_dml_applied` bigint",
"`gh_ost_is_cutover` tinyint(1) DEFAULT '0'",
}
if apl.migrationContext.IsMoveTablesMode() {
colDefs = append(colDefs,
"`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'",
"`gh_ost_move_tables_drain_gtid` text charset ascii",
)
}
for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() {
if col.MySQLType == "" {
return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
Expand All @@ -797,12 +833,12 @@ func (apl *Applier) CreateCheckpointTable() error {
}

query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(apl.migrationContext.GetCheckpointTableName()),
strings.Join(colDefs, ",\n "),
)
apl.migrationContext.Log.Infof("Created checkpoint table")
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
return nil
Expand All @@ -811,14 +847,14 @@ func (apl *Applier) CreateCheckpointTable() error {
// dropTable drops a given table on the applied host
func (apl *Applier) dropTable(tableName string) error {
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
apl.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
apl.migrationContext.Log.Infof("Table dropped")
Expand Down Expand Up @@ -983,24 +1019,51 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
return insertId, err
}
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover)
if apl.migrationContext.IsMoveTablesMode() {
args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk))
}
args = append(args, uniqueKeyArgs...)
res, err := apl.db.Exec(query, args...)
res, err := apl.checkpointDB().Exec(query, args...)
if err != nil {
return insertId, err
}
return res.LastInsertId()
}

func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
minColumnNames, maxColumnNames := apl.checkpointRangeColumnNames()
selectColumns := []string{
"gh_ost_chk_id",
"gh_ost_chk_timestamp",
"gh_ost_chk_coords",
"gh_ost_chk_iteration",
"gh_ost_rows_copied",
"gh_ost_dml_applied",
"gh_ost_is_cutover",
}
if apl.migrationContext.IsMoveTablesMode() {
selectColumns = append(selectColumns, "gh_ost_move_tables_cutover_started", "gh_ost_move_tables_drain_gtid")
}
selectColumns = append(selectColumns, minColumnNames...)
selectColumns = append(selectColumns, maxColumnNames...)

row := apl.checkpointDB().QueryRow(fmt.Sprintf(
`select /* gh-ost */ %s from %s.%s order by gh_ost_chk_id desc limit 1`,
strings.Join(selectColumns, ", "),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(apl.migrationContext.GetCheckpointTableName()),
))
chk := &Checkpoint{
IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
}

var coordStr string
var coordStr, drainGTIDStr string
var timestamp int64
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover}
if apl.migrationContext.IsMoveTablesMode() {
ptrs = append(ptrs, &chk.MoveTablesCutOverStarted, &drainGTIDStr)
}
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
err := row.Scan(ptrs...)
Expand All @@ -1024,6 +1087,51 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
}
chk.LastTrxCoords = fileCoords
}
if apl.migrationContext.IsMoveTablesMode() && drainGTIDStr != "" {
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return nil, err
}
chk.MoveTablesCutOverDrainGTID = drainGTID
}
return chk, nil
}

func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] why can't we re-use ReadLastCheckpoint for this? the only significant difference I see is that the original fn is L1053-4 which we should able to ignore.

I assume there's something subtle I'm missing 🤔

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we technically could have. There might have been a reason at some point during my commit history but I agree it looks like it would be safe to use ReadLastCheckpoint. The big difference is that ReadMoveTablesCutOverCheckpoint has a where clause for gh_ost_move_tables_cutover_started = 1 and to make sure the drain GTID is set. But the one callsite i see of ReadMoveTablesCutOverCheckpoint does that check in the code as well

gh-ost/go/logic/migrator.go

Lines 1020 to 1024 in f5273fa

cutoverResumeCheckpoint, err := mgtr.applier.ReadMoveTablesCutOverCheckpoint()
if err != nil && !errors.Is(err, ErrNoCheckpointFound) {
return err
}
if cutoverResumeCheckpoint != nil && cutoverResumeCheckpoint.MoveTablesCutOverStarted && cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID != nil && !cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID.IsEmpty() {

row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s where gh_ost_move_tables_cutover_started = 1 and gh_ost_move_tables_drain_gtid is not null and gh_ost_move_tables_drain_gtid != '' order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
chk := &Checkpoint{}
var coordStr, drainGTIDStr string
var timestamp int64
err := row.Scan(&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr)
if err != nil {
if errors.Is(err, gosql.ErrNoRows) {
return nil, ErrNoCheckpointFound
}
return nil, err
}
chk.Timestamp = time.Unix(timestamp, 0)
if coordStr != "" {
if apl.migrationContext.UseGTIDs {
coords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = coords
} else {
coords, err := mysql.ParseFileBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = coords
}
}
if drainGTIDStr != "" {
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return nil, err
}
chk.MoveTablesCutOverDrainGTID = drainGTID
}
return chk, nil
}

Expand Down
148 changes: 148 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,154 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
suite.Require().False(gotChk.MoveTablesCutOverStarted)
suite.Require().Nil(gotChk.MoveTablesCutOverDrainGTID)
}

func (suite *ApplierTestSuite) TestWriteCheckpointMoveTables() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName()))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器")
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.UseGTIDs = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.Checkpoint = true
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabase
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id", "id2"}),
}

inspector := NewInspector(migrationContext)
suite.Require().NoError(inspector.InitDBConnections())

err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns)
suite.Require().NoError(err)

applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
suite.Require().NoError(err)

err = applier.CreateCheckpointTable()
suite.Require().NoError(err)

err = applier.prepareQueries()
suite.Require().NoError(err)

err = applier.ReadMigrationRangeValues(inspector.db)
suite.Require().NoError(err)

coords, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-10")
suite.Require().NoError(err)
drainGTID, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-20")
suite.Require().NoError(err)

chk := &Checkpoint{
LastTrxCoords: coords,
IterationRangeMin: applier.migrationContext.MigrationRangeMinValues,
IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues,
Iteration: 3,
RowsCopied: 1000,
DMLApplied: 2000,
IsCutover: false,
MoveTablesCutOverStarted: true,
MoveTablesCutOverDrainGTID: drainGTID,
}
id, err := applier.WriteCheckpoint(chk)
suite.Require().NoError(err)
suite.Require().Equal(int64(1), id)

gotChk, err := applier.ReadLastCheckpoint()
suite.Require().NoError(err)

suite.Require().Equal(chk.Iteration, gotChk.Iteration)
suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String())
suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String())
suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String())
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
suite.Require().True(gotChk.MoveTablesCutOverStarted)
suite.Require().NotNil(gotChk.MoveTablesCutOverDrainGTID)
suite.Require().Equal(drainGTID.String(), gotChk.MoveTablesCutOverDrainGTID.String())
}

func (suite *ApplierTestSuite) TestReadMoveTablesCutOverCheckpointIgnoresRowCopyCheckpoints() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null primary key)", getTestTableName()))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id) VALUES (1), (2), (3)", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.Checkpoint = true
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabase
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

inspector := NewInspector(migrationContext)
suite.Require().NoError(inspector.InitDBConnections())
err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns)
suite.Require().NoError(err)

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.InitDBConnections())
suite.Require().NoError(applier.CreateCheckpointTable())
suite.Require().NoError(applier.prepareQueries())
suite.Require().NoError(applier.ReadMigrationRangeValues(inspector.db))

coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(1234))
chk := &Checkpoint{
LastTrxCoords: coords,
IterationRangeMin: applier.migrationContext.MigrationRangeMinValues,
IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues,
Iteration: 1,
RowsCopied: 3,
DMLApplied: 0,
}
_, err = applier.WriteCheckpoint(chk)
suite.Require().NoError(err)

_, err = applier.ReadMoveTablesCutOverCheckpoint()
suite.Require().ErrorIs(err, ErrNoCheckpointFound)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
Expand Down
12 changes: 7 additions & 5 deletions go/logic/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type Checkpoint struct {
IterationRangeMin *sql.ColumnValues
// IterationRangeMax is the max shared key value
// for the chunk copier range.
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
MoveTablesCutOverStarted bool
MoveTablesCutOverDrainGTID mysql.BinlogCoordinates
}
Loading
Loading