diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 4009fb146..8221cf67d 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -373,6 +373,14 @@ func main() { if migrationContext.PostponeCutOverFlagFile == "" { log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables") } + if !migrationContext.Checkpoint { + log.Infof("--move-tables requires checkpointing; enabling --checkpoint") + migrationContext.Checkpoint = true + } + if !migrationContext.UseGTIDs { + log.Infof("--move-tables requires GTID coordinates for cutover drain checks; enabling --gtid") + migrationContext.UseGTIDs = true + } migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",") for i := range migrationContext.MoveTables.TableNames { migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i]) diff --git a/go/logic/applier.go b/go/logic/applier.go index bf4d208da..051c4ca8d 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -110,6 +110,35 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { } } +func (apl *Applier) checkpointDB() *gosql.DB { + if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil { + return apl.moveTablesTargetDB + } + return apl.db +} + +func (apl *Applier) checkpointDatabaseName() string { + if apl.migrationContext.IsMoveTablesMode() { + return apl.migrationContext.GetTargetDatabaseName() + } + return apl.migrationContext.DatabaseName +} + +func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string { + if chk == nil || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() { + return "" + } + return chk.MoveTablesCutOverDrainGTID.String() +} + +func (apl *Applier) checkpointRangeColumnNames() (minColumnNames []string, maxColumnNames []string) { + for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { + minColumnNames = append(minColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_min") + maxColumnNames = append(maxColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_max") + } + return minColumnNames, maxColumnNames +} + // compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings // for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions, // hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid @@ -358,9 +387,10 @@ func (apl *Applier) prepareQueries() (err error) { } if apl.migrationContext.Checkpoint { if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( - apl.migrationContext.DatabaseName, + apl.checkpointDatabaseName(), apl.migrationContext.GetCheckpointTableName(), &apl.migrationContext.UniqueKey.Columns, + apl.migrationContext.IsMoveTablesMode(), ); err != nil { return err } @@ -781,6 +811,12 @@ func (apl *Applier) CreateCheckpointTable() error { "`gh_ost_dml_applied` bigint", "`gh_ost_is_cutover` tinyint(1) DEFAULT '0'", } + if apl.migrationContext.IsMoveTablesMode() { + colDefs = append(colDefs, + "`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'", + "`gh_ost_move_tables_drain_gtid` text charset ascii", + ) + } for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { if col.MySQLType == "" { return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) @@ -797,12 +833,12 @@ func (apl *Applier) CreateCheckpointTable() error { } query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), strings.Join(colDefs, ",\n "), ) apl.migrationContext.Log.Infof("Created checkpoint table") - if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { return err } return nil @@ -811,14 +847,14 @@ func (apl *Applier) CreateCheckpointTable() error { // dropTable drops a given table on the applied host func (apl *Applier) dropTable(tableName string) error { query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(tableName), ) apl.migrationContext.Log.Infof("Dropping table %s.%s", - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(tableName), ) - if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { return err } apl.migrationContext.Log.Infof("Table dropped") @@ -983,8 +1019,11 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { return insertId, err } args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover) + if apl.migrationContext.IsMoveTablesMode() { + args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk)) + } args = append(args, uniqueKeyArgs...) - res, err := apl.db.Exec(query, args...) + res, err := apl.checkpointDB().Exec(query, args...) if err != nil { return insertId, err } @@ -992,15 +1031,39 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { } func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { - row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) + minColumnNames, maxColumnNames := apl.checkpointRangeColumnNames() + selectColumns := []string{ + "gh_ost_chk_id", + "gh_ost_chk_timestamp", + "gh_ost_chk_coords", + "gh_ost_chk_iteration", + "gh_ost_rows_copied", + "gh_ost_dml_applied", + "gh_ost_is_cutover", + } + if apl.migrationContext.IsMoveTablesMode() { + selectColumns = append(selectColumns, "gh_ost_move_tables_cutover_started", "gh_ost_move_tables_drain_gtid") + } + selectColumns = append(selectColumns, minColumnNames...) + selectColumns = append(selectColumns, maxColumnNames...) + + row := apl.checkpointDB().QueryRow(fmt.Sprintf( + `select /* gh-ost */ %s from %s.%s order by gh_ost_chk_id desc limit 1`, + strings.Join(selectColumns, ", "), + sql.EscapeName(apl.checkpointDatabaseName()), + sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), + )) chk := &Checkpoint{ IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), } - var coordStr string + var coordStr, drainGTIDStr string var timestamp int64 ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover} + if apl.migrationContext.IsMoveTablesMode() { + ptrs = append(ptrs, &chk.MoveTablesCutOverStarted, &drainGTIDStr) + } ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) @@ -1024,6 +1087,51 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { } chk.LastTrxCoords = fileCoords } + if apl.migrationContext.IsMoveTablesMode() && drainGTIDStr != "" { + drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) + if err != nil { + return nil, err + } + chk.MoveTablesCutOverDrainGTID = drainGTID + } + return chk, nil +} + +func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) { + row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s where gh_ost_move_tables_cutover_started = 1 and gh_ost_move_tables_drain_gtid is not null and gh_ost_move_tables_drain_gtid != '' order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) + chk := &Checkpoint{} + var coordStr, drainGTIDStr string + var timestamp int64 + err := row.Scan(&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr) + if err != nil { + if errors.Is(err, gosql.ErrNoRows) { + return nil, ErrNoCheckpointFound + } + return nil, err + } + chk.Timestamp = time.Unix(timestamp, 0) + if coordStr != "" { + if apl.migrationContext.UseGTIDs { + coords, err := mysql.NewGTIDBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = coords + } else { + coords, err := mysql.ParseFileBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = coords + } + } + if drainGTIDStr != "" { + drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) + if err != nil { + return nil, err + } + chk.MoveTablesCutOverDrainGTID = drainGTID + } return chk, nil } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index d8fcc3d7f..7e6a48790 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1113,6 +1113,154 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) + suite.Require().False(gotChk.MoveTablesCutOverStarted) + suite.Require().Nil(gotChk.MoveTablesCutOverDrainGTID) +} + +func (suite *ApplierTestSuite) TestWriteCheckpointMoveTables() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器") + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.UseGTIDs = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.Checkpoint = true + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabase + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id", "id2"}), + } + + inspector := NewInspector(migrationContext) + suite.Require().NoError(inspector.InitDBConnections()) + + err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) + suite.Require().NoError(err) + + applier := NewApplier(migrationContext) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + err = applier.CreateCheckpointTable() + suite.Require().NoError(err) + + err = applier.prepareQueries() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues(inspector.db) + suite.Require().NoError(err) + + coords, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-10") + suite.Require().NoError(err) + drainGTID, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-20") + suite.Require().NoError(err) + + chk := &Checkpoint{ + LastTrxCoords: coords, + IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, + IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, + Iteration: 3, + RowsCopied: 1000, + DMLApplied: 2000, + IsCutover: false, + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + id, err := applier.WriteCheckpoint(chk) + suite.Require().NoError(err) + suite.Require().Equal(int64(1), id) + + gotChk, err := applier.ReadLastCheckpoint() + suite.Require().NoError(err) + + suite.Require().Equal(chk.Iteration, gotChk.Iteration) + suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) + suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) + suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) + suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) + suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) + suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) + suite.Require().True(gotChk.MoveTablesCutOverStarted) + suite.Require().NotNil(gotChk.MoveTablesCutOverDrainGTID) + suite.Require().Equal(drainGTID.String(), gotChk.MoveTablesCutOverDrainGTID.String()) +} + +func (suite *ApplierTestSuite) TestReadMoveTablesCutOverCheckpointIgnoresRowCopyCheckpoints() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null primary key)", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id) VALUES (1), (2), (3)", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.Checkpoint = true + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabase + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + inspector := NewInspector(migrationContext) + suite.Require().NoError(inspector.InitDBConnections()) + err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) + suite.Require().NoError(err) + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.InitDBConnections()) + suite.Require().NoError(applier.CreateCheckpointTable()) + suite.Require().NoError(applier.prepareQueries()) + suite.Require().NoError(applier.ReadMigrationRangeValues(inspector.db)) + + coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(1234)) + chk := &Checkpoint{ + LastTrxCoords: coords, + IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, + IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, + Iteration: 1, + RowsCopied: 3, + DMLApplied: 0, + } + _, err = applier.WriteCheckpoint(chk) + suite.Require().NoError(err) + + _, err = applier.ReadMoveTablesCutOverCheckpoint() + suite.Require().ErrorIs(err, ErrNoCheckpointFound) } func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() { diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go index cffe08c4b..f81a2bb16 100644 --- a/go/logic/checkpoint.go +++ b/go/logic/checkpoint.go @@ -24,9 +24,11 @@ type Checkpoint struct { IterationRangeMin *sql.ColumnValues // IterationRangeMax is the max shared key value // for the chunk copier range. - IterationRangeMax *sql.ColumnValues - Iteration int64 - RowsCopied int64 - DMLApplied int64 - IsCutover bool + IterationRangeMax *sql.ColumnValues + Iteration int64 + RowsCopied int64 + DMLApplied int64 + IsCutover bool + MoveTablesCutOverStarted bool + MoveTablesCutOverDrainGTID mysql.BinlogCoordinates } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 57c28d198..534aed8e3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -100,8 +100,9 @@ type Migrator struct { rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete - copyRowsQueue chan tableWriteFunc - applyEventsQueue chan *applyEventStruct + copyRowsQueue chan tableWriteFunc + applyEventsQueue chan *applyEventStruct + applyEventsInFlight int64 finishedMigrating int64 } @@ -804,6 +805,183 @@ func (mgtr *Migrator) prepareMoveTablesCopyState() { mgtr.migrationContext.MappedSharedColumns = mgtr.migrationContext.OriginalTableColumns } +func (mgtr *Migrator) hydrateMoveTablesStateFromTarget() error { + probeContext := base.NewMigrationContext() + probeContext.DatabaseName = mgtr.migrationContext.GetTargetDatabaseName() + targetInspector := &Inspector{db: mgtr.applier.moveTablesTargetDB, migrationContext: probeContext} + + columns, virtualColumns, uniqueKeys, err := targetInspector.InspectTableColumnsAndUniqueKeys(mgtr.migrationContext.GetTargetTableName()) + if err != nil { + return err + } + + mgtr.migrationContext.OriginalTableColumns = columns + mgtr.migrationContext.OriginalTableVirtualColumns = virtualColumns + mgtr.migrationContext.OriginalTableUniqueKeys = uniqueKeys + mgtr.migrationContext.UniqueKey = targetInspector.selectUniqueKey(uniqueKeys) + mgtr.migrationContext.SharedColumns = columns + mgtr.migrationContext.MappedSharedColumns = columns + return nil +} + +func (mgtr *Migrator) persistMoveTablesCutOverCheckpoint(drainGTID mysql.BinlogCoordinates, isCutover bool) error { + mgtr.applier.CurrentCoordinatesMutex.Lock() + safeCoords := mgtr.applier.CurrentCoordinates + mgtr.applier.CurrentCoordinatesMutex.Unlock() + + if safeCoords == nil || safeCoords.IsEmpty() { + // In move-tables mode CurrentCoordinates may never advance on a quiet source + // (no _ghc heartbeats, no DML). If there is no backlog, the streamer's + // frontier is a safe fallback for checkpointing. + if mgtr.eventsStreamer != nil && len(mgtr.applyEventsQueue) == 0 && len(mgtr.eventsStreamer.eventsChannel) == 0 { + safeCoords = mgtr.eventsStreamer.GetCurrentBinlogCoordinates() + } + if safeCoords == nil || safeCoords.IsEmpty() { + return errors.New("current coordinates are empty, cannot checkpoint move-tables cutover") + } + } + safeCoords = safeCoords.Clone() + + chk := &Checkpoint{ + LastTrxCoords: safeCoords, + IterationRangeMin: sql.NewColumnValues(mgtr.migrationContext.UniqueKey.Len()), + IterationRangeMax: sql.NewColumnValues(mgtr.migrationContext.UniqueKey.Len()), + Iteration: mgtr.migrationContext.GetIteration(), + RowsCopied: atomic.LoadInt64(&mgtr.migrationContext.TotalRowsCopied), + DMLApplied: atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), + IsCutover: isCutover, + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + mgtr.applier.LastIterationRangeMutex.Lock() + if mgtr.applier.LastIterationRangeMinValues != nil { + chk.IterationRangeMin = mgtr.applier.LastIterationRangeMinValues.Clone() + } + if mgtr.applier.LastIterationRangeMaxValues != nil { + chk.IterationRangeMax = mgtr.applier.LastIterationRangeMaxValues.Clone() + } + mgtr.applier.LastIterationRangeMutex.Unlock() + id, err := mgtr.applier.WriteCheckpoint(chk) + chk.Id = id + return err +} + +// moveTablesDrainCoordinateReached returns true when current is at-or-ahead of +// drain within the same coordinate family. For GTID drains, current must also +// be GTID-backed; mixed GTID/file-pos comparisons are treated as not reached. +func moveTablesDrainCoordinateReached(current mysql.BinlogCoordinates, drain mysql.BinlogCoordinates) bool { + if current == nil || current.IsEmpty() || drain == nil || drain.IsEmpty() { + return false + } + switch drain.(type) { + case *mysql.GTIDBinlogCoordinates: + if _, ok := current.(*mysql.GTIDBinlogCoordinates); !ok { + return false + } + } + return !current.SmallerThan(drain) +} + +// moveTablesDrainProvenByStreamerProgress is the non-DML-tail fallback used +// in T3: if both queues are empty and the streamer has advanced to drain, +// drain is considered complete even when applier coords did not move. +func moveTablesDrainProvenByStreamerProgress(drain mysql.BinlogCoordinates, streamer mysql.BinlogCoordinates, applyBacklog int, streamerBacklog int) bool { + if applyBacklog != 0 || streamerBacklog != 0 { + return false + } + return moveTablesDrainCoordinateReached(streamer, drain) +} + +func (mgtr *Migrator) drainMoveTablesCutOver(drainGTID mysql.BinlogCoordinates) error { + drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second + mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", + drainTimeout, moveTablesCutOverDrainPollInterval) + drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) + defer cancel() + ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) + defer ticker.Stop() + for { + if err := mgtr.checkAbort(); err != nil { + return err + } + // Primary signal: applier's coordinate (advances when relevant apply work runs). + mgtr.applier.CurrentCoordinatesMutex.Lock() + applierCoords := mgtr.applier.CurrentCoordinates + mgtr.applier.CurrentCoordinatesMutex.Unlock() + drainReached := moveTablesDrainCoordinateReached(applierCoords, drainGTID) + // Backlogs gate completion: if either queue is non-empty, drain is not done. + applyBacklog := len(mgtr.applyEventsQueue) + streamerBacklog := 0 + var streamerCoords mysql.BinlogCoordinates + applierDisplay := "" + if applierCoords != nil { + applierDisplay = applierCoords.DisplayString() + } + if mgtr.eventsStreamer != nil { + streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) + if mgtr.eventsStreamer.binlogReader != nil { + // Secondary signal: streamer's latest source position. + streamerCoords = mgtr.eventsStreamer.GetCurrentBinlogCoordinates() + } + } + applyInFlight := atomic.LoadInt64(&mgtr.applyEventsInFlight) + // Normal completion path: applier reached drain, both queues are empty, + // and no apply handler is still running. + if drainReached && applyBacklog == 0 && streamerBacklog == 0 && applyInFlight == 0 { + mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") + return nil + } + // Fallback for non-DML tail: GTID can advance due to unrelated/non-row events, + // so applier may stop moving while streamer has already crossed drain. + if applyInFlight == 0 && moveTablesDrainProvenByStreamerProgress(drainGTID, streamerCoords, applyBacklog, streamerBacklog) { + mgtr.migrationContext.Log.Infof("T3: drain complete via streamer frontier (non-DML tail after T2)") + return nil + } + if drainReached { + mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d, in_flight=%d)", applyBacklog, streamerBacklog, applyInFlight) + } else { + mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling (applier=%s drain=%s in_flight=%d)", applierDisplay, drainGTID.DisplayString(), applyInFlight) + } + select { + case <-drainCtx.Done(): + streamerDisplay := "" + if streamerCoords != nil { + streamerDisplay = streamerCoords.DisplayString() + } + return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID (applier=%s drain=%s streamer=%s apply_backlog=%d streamer_backlog=%d in_flight=%d)", + drainTimeout, applierDisplay, drainGTID.DisplayString(), streamerDisplay, applyBacklog, streamerBacklog, applyInFlight) + case <-ticker.C: + } + } +} + +func (mgtr *Migrator) resumeMoveTablesCutOverFromCheckpoint(chk *Checkpoint) error { + if chk == nil || !chk.MoveTablesCutOverStarted || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() { + return errors.New("checkpoint does not contain move-tables cutover resume state") + } + if chk.LastTrxCoords != nil && !chk.LastTrxCoords.IsEmpty() { + mgtr.applier.CurrentCoordinatesMutex.Lock() + mgtr.applier.CurrentCoordinates = chk.LastTrxCoords.Clone() + mgtr.applier.CurrentCoordinatesMutex.Unlock() + } + mgtr.migrationContext.Log.Infof("Resuming move-tables cutover from checkpoint at coords=%+v drain_gtid=%s", + chk.LastTrxCoords, chk.MoveTablesCutOverDrainGTID.DisplayString()) + if err := mgtr.drainMoveTablesCutOver(chk.MoveTablesCutOverDrainGTID); err != nil { + return err + } + if mgtr.migrationContext.Checkpoint { + if err := mgtr.persistMoveTablesCutOverCheckpoint(chk.MoveTablesCutOverDrainGTID, true); err != nil { + mgtr.migrationContext.Log.Warningf("failed to checkpoint drained move-tables cutover: %+v", err) + } + } + atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) + mgtr.migrationContext.Log.Debugf("T4: CutOverCompleteFlag set") + if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { + return fmt.Errorf("on-success hook failed: %w", err) + } + return nil +} + func (mgtr *Migrator) MoveTables() (err error) { mgtr.migrationContext.Log.Infof("Moving tables %v from %s to %s (%s)", mgtr.migrationContext.MoveTables.TableNames, @@ -833,18 +1011,100 @@ func (mgtr *Migrator) MoveTables() (err error) { // so we don't leave things hanging around defer mgtr.teardown() + if mgtr.migrationContext.Checkpoint && mgtr.migrationContext.Resume { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.MoveTables.ConnectionConfig + mgtr.applier = NewApplier(mgtr.migrationContext) + if err := mgtr.applier.InitDBConnections(); err != nil { + return err + } + cutoverResumeCheckpoint, err := mgtr.applier.ReadMoveTablesCutOverCheckpoint() + if err != nil && !errors.Is(err, ErrNoCheckpointFound) { + return err + } + if cutoverResumeCheckpoint != nil && cutoverResumeCheckpoint.MoveTablesCutOverStarted && cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID != nil && !cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID.IsEmpty() { + mgtr.migrationContext.InitialStreamerCoords = cutoverResumeCheckpoint.LastTrxCoords + mgtr.migrationContext.Iteration = cutoverResumeCheckpoint.Iteration + atomic.StoreInt64(&mgtr.migrationContext.TotalRowsCopied, cutoverResumeCheckpoint.RowsCopied) + atomic.StoreInt64(&mgtr.migrationContext.TotalDMLEventsApplied, cutoverResumeCheckpoint.DMLApplied) + if err := mgtr.hydrateMoveTablesStateFromTarget(); err != nil { + return fmt.Errorf("failed to hydrate move-tables resume state from target: %w", err) + } + if err := mgtr.createFlagFiles(); err != nil { + return err + } + if err := mgtr.initiateStreaming(); err != nil { + return err + } + if err := mgtr.applier.prepareQueries(); err != nil { + return err + } + if err := mgtr.hooksExecutor.OnValidated(); err != nil { + return err + } + if err := mgtr.initiateServer(); err != nil { + return err + } + defer mgtr.server.RemoveSocketFile() + if err := mgtr.addDMLEventsListener(); err != nil { + return err + } + mgtr.initiateThrottler() + go func() { + if err := mgtr.executeWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + }() + // Do not initiate status ticker in cutover resume path: inspector is not initialized, + // and we're only doing drain polling + hooks before exit (no row copy to monitor). + if err := mgtr.resumeMoveTablesCutOverFromCheckpoint(cutoverResumeCheckpoint); err != nil { + return err + } + if err := mgtr.finalCleanup(); err != nil { + return nil + } + mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", + mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) + if err := mgtr.checkAbort(); err != nil { + return err + } + return nil + } + // Do not teardown this preflight applier on the miss path. Its DB handles + // come from the shared connection cache keyed by migration UUID, and + // closing them here would poison the later inspector/applier init path + // with "sql: database is closed". + mgtr.applier = nil + } + if err := mgtr.initiateInspector(); err != nil { return err } if err := mgtr.checkAbort(); err != nil { return err } + mgtr.prepareMoveTablesCopyState() if err := mgtr.initiateApplier(); err != nil { return err } if err := mgtr.checkAbort(); err != nil { return err } + if mgtr.migrationContext.Checkpoint && mgtr.migrationContext.Resume { + lastCheckpoint, err := mgtr.applier.ReadLastCheckpoint() + if err != nil { + return mgtr.migrationContext.Log.Errorf("no checkpoint found, unable to resume: %+v", err) + } + mgtr.migrationContext.Log.Infof("Resuming move-tables from checkpoint coords=%+v range_min=%+v range_max=%+v iteration=%d", + lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String(), lastCheckpoint.Iteration) + + mgtr.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin + mgtr.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + mgtr.migrationContext.Iteration = lastCheckpoint.Iteration + atomic.StoreInt64(&mgtr.migrationContext.TotalRowsCopied, lastCheckpoint.RowsCopied) + atomic.StoreInt64(&mgtr.migrationContext.TotalDMLEventsApplied, lastCheckpoint.DMLApplied) + mgtr.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords + } if err := mgtr.createFlagFiles(); err != nil { return err } @@ -858,12 +1118,15 @@ func (mgtr *Migrator) MoveTables() (err error) { return err } - mgtr.prepareMoveTablesCopyState() - // this function assumes that the unique key constraint has been set. if err := mgtr.applier.prepareQueries(); err != nil { return err } + if mgtr.migrationContext.Checkpoint && !mgtr.migrationContext.Resume { + if err := mgtr.applier.CreateCheckpointTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create checkpoint table, see further error details") + } + } // Validation complete! Run on-validated hook. if err := mgtr.hooksExecutor.OnValidated(); err != nil { @@ -900,6 +1163,9 @@ func (mgtr *Migrator) MoveTables() (err error) { go mgtr.iterateChunks() mgtr.migrationContext.MarkRowCopyStartTime() go mgtr.initiateStatus() + if mgtr.migrationContext.Checkpoint { + go mgtr.checkpointLoop() + } mgtr.migrationContext.Log.Debugf("Operating until row copy is complete") mgtr.consumeRowCopyComplete() @@ -937,15 +1203,12 @@ func (mgtr *Migrator) MoveTables() (err error) { // atomicCutOver, waitForEventsUpToLock, heartbeat-lag) was built on a // single-server assumption that no longer holds when the applier writes target // and the streamer reads source. Each is replaced or dropped here. -// -// Crash safety (persisting the drain GTID before T3) is #8210. Enriched hook -// env vars (GH_OST_DRAIN_GTID, GH_OST_TARGET_*) are #8211. Target-side -// throttling is #8212. None of those are wired here. func (mgtr *Migrator) moveTablesCutOver() (err error) { if mgtr.migrationContext.Noop { mgtr.migrationContext.Log.Debugf("Noop operation; not really moving tables") return nil } + defer atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 0) // ----- Postpone gate (precedes T0) ----- // Mirrors standard cutOver()'s sleepWhileTrue postpone structure but DROPS the @@ -979,6 +1242,9 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 0) mgtr.migrationContext.Log.Debugf("checking for cut-over postpone: complete") + // Disables throttling and background checkpoint loop + atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 1) + // ----- T0: on-before-cut-over hook ----- // Non-zero hook exit aborts cutover BEFORE any source DDL fires. if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil { @@ -995,7 +1261,8 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { // No retry on the RENAME: it is not idempotent — a partial success leaves // the table already renamed and a retry would fail. The operator re-runs // the whole hook chain on failure. - pinnedConn, err := mgtr.inspector.db.Conn(context.Background()) + cutOverCtx := mgtr.migrationContext.GetContext() + pinnedConn, err := mgtr.inspector.db.Conn(cutOverCtx) if err != nil { return fmt.Errorf("failed to pin connection for T1/T2: %w", err) } @@ -1008,7 +1275,7 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { sql.EscapeName(sourceDB), sql.EscapeName(sourceTable), sql.EscapeName(sourceDB), sql.EscapeName(delTable)) mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery) - if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil { + if _, err := pinnedConn.ExecContext(cutOverCtx, renameQuery); err != nil { return fmt.Errorf("RENAME failed: %w", err) } @@ -1016,7 +1283,7 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { // @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself. // Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4 var drainGTIDStr string - if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil { + if err := pinnedConn.QueryRowContext(cutOverCtx, "select @@global.gtid_executed").Scan(&drainGTIDStr); err != nil { return fmt.Errorf("drain GTID capture failed: %w", err) } drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) @@ -1024,46 +1291,18 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { return fmt.Errorf("drain GTID parse failed: %w", err) } mgtr.migrationContext.Log.Infof("T2: captured drain GTID: %s", drainGTID.DisplayString()) - - // ----- T3: drain poll ----- - // Wait until applier.CurrentCoordinates catches up to drainGTID. The drain - // is complete when the applier's coords are not strictly smaller than the - // drain target (i.e. the applier contains every GTID in drainGTID). Reads - // of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration - // logging is Debug only to avoid spamming Info on a hot loop. - drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second - mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", - drainTimeout, moveTablesCutOverDrainPollInterval) - drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) - defer cancel() - ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) - defer ticker.Stop() - for { - if err := mgtr.checkAbort(); err != nil { - return err - } - mgtr.applier.CurrentCoordinatesMutex.Lock() - applierCoords := mgtr.applier.CurrentCoordinates - mgtr.applier.CurrentCoordinatesMutex.Unlock() - applyBacklog := len(mgtr.applyEventsQueue) - streamerBacklog := 0 - if mgtr.eventsStreamer != nil { - streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) - } - if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 { - mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") - break - } - if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) { - mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog) - } else { - mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling") + if mgtr.migrationContext.Checkpoint { + if err := mgtr.persistMoveTablesCutOverCheckpoint(drainGTID, false); err != nil { + return fmt.Errorf("failed to persist move-tables cutover checkpoint: %w", err) } - select { - case <-drainCtx.Done(): - return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout) - case <-ticker.C: - // next iteration + } + + if err := mgtr.drainMoveTablesCutOver(drainGTID); err != nil { + return err + } + if mgtr.migrationContext.Checkpoint { + if err := mgtr.persistMoveTablesCutOverCheckpoint(drainGTID, true); err != nil { + mgtr.migrationContext.Log.Warningf("failed to checkpoint drained move-tables cutover: %+v", err) } } @@ -1833,13 +2072,20 @@ func (mgtr *Migrator) initiateApplier() error { } if mgtr.migrationContext.IsMoveTablesMode() { - createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) - if err != nil { - return fmt.Errorf("failed to fetch create table statement: %w", err) - } - if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err + if !mgtr.migrationContext.Resume { + createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) + if err != nil { + return fmt.Errorf("failed to fetch create table statement: %w", err) + } + if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + } else { + mgtr.migrationContext.Log.Infof("Resuming move-tables; reusing existing target table %s.%s", + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), + sql.EscapeName(mgtr.migrationContext.GetTargetTableName()), + ) } } else { if mgtr.migrationContext.Revert { @@ -2010,6 +2256,9 @@ func (mgtr *Migrator) iterateChunks() error { } func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { + atomic.AddInt64(&mgtr.applyEventsInFlight, 1) + defer atomic.AddInt64(&mgtr.applyEventsInFlight, -1) + handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { if eventStruct.writeFunc != nil { if err := mgtr.retryOperation(*eventStruct.writeFunc); err != nil { @@ -2096,6 +2345,17 @@ func (mgtr *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { mgtr.applier.CurrentCoordinatesMutex.Unlock() return chk, err } + // In move-tables mode we do not emit heartbeat rows into _ghc, so + // CurrentCoordinates may not advance while the system is otherwise idle. + // If there is no backlog in either queue, it is safe to treat the current + // streamer coordinates as applied for checkpointing purposes. + if mgtr.migrationContext.IsMoveTablesMode() && len(mgtr.applyEventsQueue) == 0 && (mgtr.eventsStreamer == nil || len(mgtr.eventsStreamer.eventsChannel) == 0) { + mgtr.applier.CurrentCoordinates = coords.Clone() + id, err := mgtr.applier.WriteCheckpoint(chk) + chk.Id = id + mgtr.applier.CurrentCoordinatesMutex.Unlock() + return chk, err + } mgtr.applier.CurrentCoordinatesMutex.Unlock() time.Sleep(500 * time.Millisecond) } @@ -2265,9 +2525,17 @@ func (mgtr *Migrator) finalCleanup() error { } if mgtr.migrationContext.IsMoveTablesMode() { + if mgtr.migrationContext.Checkpoint { + if mgtr.migrationContext.OkToDropTable { + if err := mgtr.retryOperation(mgtr.applier.DropCheckpointTable); err != nil { + return err + } + } else if !mgtr.migrationContext.Noop { + mgtr.migrationContext.Log.Infof("Am not dropping checkpoint table without `--ok-to-drop-table`. To drop the checkpoint table, issue:") + mgtr.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), sql.EscapeName(mgtr.migrationContext.GetCheckpointTableName())) + } + } // for move-tables mode, we're done at this point - // TODO(zacharysierakowski): when we add the checkpoint table in for 1.6, make sure we cleanup - // the checkpoint table here first before returning (looks like that's a few lines below changelog table cleanup) return nil } diff --git a/go/logic/migrator_move_tables_cutover_test.go b/go/logic/migrator_move_tables_cutover_test.go index 22426be7d..230e3d3a2 100644 --- a/go/logic/migrator_move_tables_cutover_test.go +++ b/go/logic/migrator_move_tables_cutover_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "testing" "time" @@ -82,6 +83,21 @@ func TestMoveTablesCutOver_OnBeforeCutOverHookAbortsBeforeRename(t *testing.T) { "post-state: only the failing T0 hook fires; no OnSuccess, no OnBeginPostponed") } +type onSuccessCheckHooks struct { + *recordingHooks + onSuccessCheck func() error +} + +func (h *onSuccessCheckHooks) OnSuccess(bool) error { + if err := h.record("OnSuccess"); err != nil { + return err + } + if h.onSuccessCheck != nil { + return h.onSuccessCheck() + } + return nil +} + // TestMoveTablesCutOver_PostponeGateFiresOnBeginPostponedOnce maps to the // postpone-gate decision in #8209-implement-protocol (keep OnBeginPostponed // firing logic with the same once-per-cutover semantics as standard cutOver). @@ -131,6 +147,43 @@ func TestMoveTablesCutOver_PostponeGateFiresOnBeginPostponedOnce(t *testing.T) { "post-state: hook failure must leave CutOverCompleteFlag unset") } +// TestResumeMoveTablesCutOverFromCheckpointAlreadyDrained verifies the crash- +// safe resume branch skips T1 entirely and proceeds directly to T5 when the +// persisted checkpoint already shows a drain-satisfied position. +func TestResumeMoveTablesCutOverFromCheckpointAlreadyDrained(t *testing.T) { + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + + ctx := base.NewMigrationContext() + ctx.Hooks = fakeHooks + ctx.Checkpoint = false + + m := NewMigrator(ctx, "test") + m.applier = NewApplier(ctx) + + drainGTID, err := mysql.NewGTIDBinlogCoordinates("11111111-1111-1111-1111-111111111111:1-10") + require.NoError(t, err) + + chk := &Checkpoint{ + LastTrxCoords: drainGTID, + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), "pre-state: flag must be 0") + require.Empty(t, calls, "pre-state: no hooks recorded") + + require.NoError(t, m.resumeMoveTablesCutOverFromCheckpoint(chk)) + + require.Equal(t, int64(1), atomic.LoadInt64(&ctx.CutOverCompleteFlag), + "post-state: resume path must set CutOverCompleteFlag before exiting") + require.Equal(t, []string{"fake:OnSuccess"}, calls, + "post-state: resume path should jump directly to T5 without rerunning T0/T1") + if m.applier.CurrentCoordinates != nil { + require.Equal(t, drainGTID.String(), m.applier.CurrentCoordinates.String()) + } +} + // ----------------------------------------------------------------------------- // Integration tests - real MySQL via testcontainers, exercise T1/T2/T3. // @@ -200,7 +253,7 @@ func (s *MoveTablesCutOverSuite) containingDrainGTID() *mysql.GTIDBinlogCoordina // buildMigrator wires a Migrator with the test container's *sql.DB pinned to // inspector.db and a fresh Applier. initialCoords may be nil for the drain- // timeout case. -func (s *MoveTablesCutOverSuite) buildMigrator(fakeHooks *recordingHooks, initialCoords mysql.BinlogCoordinates) (*Migrator, *base.MigrationContext) { +func (s *MoveTablesCutOverSuite) buildMigrator(fakeHooks base.Hooks, initialCoords mysql.BinlogCoordinates) (*Migrator, *base.MigrationContext) { ctx := context.Background() connectionConfig, err := getTestConnectionConfig(ctx, s.mysqlContainer) s.Require().NoError(err) @@ -358,6 +411,98 @@ func (s *MoveTablesCutOverSuite) TestDrainWaitsForQueuedDML() { "post-state: only T0 fires before the drain loop times out") } +// TestDrainWaitsForInFlightApplyEvent ensures T3 does not complete while an +// apply handler is still running. The blocked handler simulates the last source +// DML not yet landing on the target. If T3 exits early, OnSuccess observes the +// target missing that row and fails immediately. +func (s *MoveTablesCutOverSuite) TestDrainWaitsForInFlightApplyEvent() { + ctx := context.Background() + _, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther)) + s.Require().NoError(err) + _, err = s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName())) + s.Require().NoError(err) + _, err = s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestOtherTableName())) + s.Require().NoError(err) + s.T().Cleanup(func() { + _, _ = s.db.ExecContext(context.Background(), "DROP TABLE IF EXISTS "+getTestOtherTableName()) + }) + + drainGTID := s.containingDrainGTID() + const pendingID = 42 + raceErr := errors.New("on-success observed target missing in-flight apply") + + origPoll := moveTablesCutOverDrainPollInterval + moveTablesCutOverDrainPollInterval = 50 * time.Millisecond + s.T().Cleanup(func() { + moveTablesCutOverDrainPollInterval = origPoll + }) + + var calls []string + fakeHooks := &onSuccessCheckHooks{ + recordingHooks: &recordingHooks{name: "fake", calls: &calls}, + onSuccessCheck: func() error { + var count int + query := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE id = ?", getTestOtherTableName()) + if err := s.db.QueryRowContext(context.Background(), query, pendingID).Scan(&count); err != nil { + return err + } + if count == 0 { + return raceErr + } + return nil + }, + } + m, mc := s.buildMigrator(fakeHooks, drainGTID) + mc.CutOverLockTimeoutSeconds = 1 + m.applier.CurrentCoordinatesMutex.Lock() + m.applier.CurrentCoordinates = drainGTID + m.applier.CurrentCoordinatesMutex.Unlock() + + started := make(chan struct{}) + release := make(chan struct{}) + var startedOnce sync.Once + var releaseOnce sync.Once + releaseApply := func() { + releaseOnce.Do(func() { + close(release) + }) + } + s.T().Cleanup(releaseApply) + blockApply := tableWriteFunc(func() error { + startedOnce.Do(func() { + close(started) + }) + <-release + _, err := s.db.ExecContext(context.Background(), fmt.Sprintf("INSERT INTO %s VALUES (?)", getTestOtherTableName()), pendingID) + return err + }) + go func() { + <-started + time.Sleep(200 * time.Millisecond) + releaseApply() + }() + go func() { + _ = m.onApplyEventStruct(newApplyEventStructByFunc(&blockApply)) + }() + + select { + case <-started: + case <-time.After(time.Second): + s.Require().FailNow("blocked apply event never started") + } + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0") + err = m.moveTablesCutOver() + s.Require().NoError(err) + + var count int + query := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE id = ?", getTestOtherTableName()) + s.Require().NoError(s.db.QueryRowContext(context.Background(), query, pendingID).Scan(&count)) + s.Require().Equal(1, count, "post-state: target must contain the pending row by the time cutover succeeds") + s.Require().Equal([]string{"fake:OnBeforeCutOver", "fake:OnSuccess"}, calls, + "post-state: cutover should only reach OnSuccess after the target row is present") +} + func TestMoveTablesCutOver(t *testing.T) { if testing.Short() { t.Skip("skipping integration suite in short mode") diff --git a/go/sql/builder.go b/go/sql/builder.go index 1c3c612fa..3f4c375ef 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -120,11 +120,12 @@ func BuildEqualsPreparedComparison(columns []string) (result string, err error) // It holds the prepared query statement so it doesn't need to be recreated every time. type CheckpointInsertQueryBuilder struct { - uniqueKeyColumns *ColumnList - preparedStatement string + uniqueKeyColumns *ColumnList + preparedStatement string + includeMoveTablesCutOverColumns bool } -func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList) (*CheckpointInsertQueryBuilder, error) { +func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList, includeMoveTablesCutOverColumns bool) (*CheckpointInsertQueryBuilder, error) { if uniqueKeyColumns.Len() == 0 { return nil, fmt.Errorf("got 0 columns in BuildSetCheckpointInsertQuery") } @@ -139,26 +140,48 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) - stmt := fmt.Sprintf(` - insert /* gh-ost */ - into %s.%s - (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, - gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, - %s, %s) - values - (unix_timestamp(now()), ?, ?, - ?, ?, ?, - %s, %s)`, - databaseName, tableName, - strings.Join(minUniqueColNames, ", "), - strings.Join(maxUniqueColNames, ", "), - strings.Join(values, ", "), - strings.Join(values, ", "), - ) - b := &CheckpointInsertQueryBuilder{ - uniqueKeyColumns: uniqueKeyColumns, - preparedStatement: stmt, + uniqueKeyColumns: uniqueKeyColumns, + preparedStatement: func() string { + if includeMoveTablesCutOverColumns { + return fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid, + %s, %s) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + ?, ?, + %s, %s)`, + databaseName, tableName, + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), + strings.Join(values, ", "), + ) + } + + return fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + %s, %s) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + %s, %s)`, + databaseName, tableName, + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), + strings.Join(values, ", "), + ) + }(), + includeMoveTablesCutOverColumns: includeMoveTablesCutOverColumns, } return b, nil } diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0fcf31441..38b5043ab 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -1347,7 +1347,7 @@ func TestCheckpointQueryBuilder(t *testing.T) { tableName := "_tbl_ghk" valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) - builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns, false) require.NoError(t, err) query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) require.NoError(t, err) @@ -1366,3 +1366,30 @@ func TestCheckpointQueryBuilder(t *testing.T) { require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) } + +func TestMoveTablesCheckpointQueryBuilder(t *testing.T) { + databaseName := "mydb" + tableName := "_tbl_ghk" + valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} + uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns, true) + require.NoError(t, err) + query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) + require.NoError(t, err) + expected := ` + insert /* gh-ost */ into mydb._tbl_ghk + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid, + name_min, position_min, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_min, + name_max, position_max, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_max) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + ?, ?, + ?, ?, ?, + ?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) +} diff --git a/script/move-tables/README.md b/script/move-tables/README.md index db57f750a..398ec7517 100644 --- a/script/move-tables/README.md +++ b/script/move-tables/README.md @@ -24,10 +24,37 @@ script/build --cli Run gh-ost to move tables: ```bash -./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3307 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose +./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3307 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose --checkpoint --checkpoint-seconds 10 ``` Note: replicas in this local topology are configured with `read_only=ON` and `super_read_only=ON`. If you point `--host` at `mysql-source-replica` (3308), the cutover `RENAME TABLE` step will fail by design. Use source primary (3307) as the inspected host when you want cutover to rename on source. + +Start continuous inserts against the source. +```bash +script/move-tables/insert-source-primary-loop +``` + +Check the target - it should have the initial data from the source and should be receiving the new data. +```bash +script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +``` + +Remove the cutover flag file. +```bash +rm /tmp/ghost-move-tables.postpone.flag +``` + +You'll see the continuous inserts will stop because of the table rename. + +Check the source - table has been renamed. +```bash +script/move-tables/mysql-source-primary -D gh_ost_test_db -e "SELECT * FROM _gh_ost_test_del;" +``` + +Check the target has the same set of data. +```bash +script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +``` \ No newline at end of file diff --git a/script/move-tables/insert-source-primary-loop b/script/move-tables/insert-source-primary-loop index c5571ff1c..d2a0afddc 100755 --- a/script/move-tables/insert-source-primary-loop +++ b/script/move-tables/insert-source-primary-loop @@ -3,28 +3,45 @@ set -euo pipefail # Continuously insert new rows into gh_ost_test on source primary. # Usage: -# script/move-tables/insert-source-primary-loop [start_column1] [sleep_seconds] +# script/move-tables/insert-source-primary-loop [start_column1] [sleep_seconds] [rows_per_batch] # Example: -# script/move-tables/insert-source-primary-loop 100000 0.2 +# script/move-tables/insert-source-primary-loop 100000 0.2 1 +# Fast example: +# script/move-tables/insert-source-primary-loop 100000 0 50 start_i="${1:-100000}" delay="${2:-0.2}" +rows_per_batch="${3:-1}" i="$start_i" echo "Starting continuous inserts on source primary. Press Ctrl+C to stop." -echo "start_column1=$start_i sleep_seconds=$delay" +echo "start_column1=$start_i sleep_seconds=$delay rows_per_batch=$rows_per_batch" trap 'echo; echo "Stopped."; exit 0' INT TERM while true; do ts="$(date +%s)" + values="" + batch_start="$i" + + for ((n=0; n/dev/null || true echo "Removing containers..." -docker rm -f mysql-source-replica mysql-source-primary mysql-target-replica mysql-target-primary 2>/dev/null || true \ No newline at end of file +docker rm -f mysql-source-replica mysql-source-primary mysql-target-replica mysql-target-primary 2>/dev/null || true + +echo "Cleaning up Docker resources..." +docker system prune -f +docker volume prune -f \ No newline at end of file