-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Move tables 1.6 crash safe resume #1708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eb60db0
006f9ce
2bb2d54
ee8974a
e850f0a
c57d247
8e49eb7
0e3de73
f032927
3f4059a
30ecb11
2679312
169f377
96aab08
48e6920
b7c087d
b46fc5a
0112669
4a5a826
692e834
5a6932e
4396b16
396ae07
f5273fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -110,6 +110,35 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { | |||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) checkpointDB() *gosql.DB { | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil { | ||||||||||||
| return apl.moveTablesTargetDB | ||||||||||||
| } | ||||||||||||
| return apl.db | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) checkpointDatabaseName() string { | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() { | ||||||||||||
| return apl.migrationContext.GetTargetDatabaseName() | ||||||||||||
| } | ||||||||||||
| return apl.migrationContext.DatabaseName | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string { | ||||||||||||
| if chk == nil || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() { | ||||||||||||
| return "" | ||||||||||||
| } | ||||||||||||
| return chk.MoveTablesCutOverDrainGTID.String() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) checkpointRangeColumnNames() (minColumnNames []string, maxColumnNames []string) { | ||||||||||||
| for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { | ||||||||||||
| minColumnNames = append(minColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_min") | ||||||||||||
| maxColumnNames = append(maxColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_max") | ||||||||||||
| } | ||||||||||||
| return minColumnNames, maxColumnNames | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings | ||||||||||||
| // for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions, | ||||||||||||
| // hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid | ||||||||||||
|
|
@@ -358,9 +387,10 @@ func (apl *Applier) prepareQueries() (err error) { | |||||||||||
| } | ||||||||||||
| if apl.migrationContext.Checkpoint { | ||||||||||||
| if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( | ||||||||||||
| apl.migrationContext.DatabaseName, | ||||||||||||
| apl.checkpointDatabaseName(), | ||||||||||||
| apl.migrationContext.GetCheckpointTableName(), | ||||||||||||
| &apl.migrationContext.UniqueKey.Columns, | ||||||||||||
| apl.migrationContext.IsMoveTablesMode(), | ||||||||||||
| ); err != nil { | ||||||||||||
| return err | ||||||||||||
| } | ||||||||||||
|
|
@@ -781,6 +811,12 @@ func (apl *Applier) CreateCheckpointTable() error { | |||||||||||
| "`gh_ost_dml_applied` bigint", | ||||||||||||
| "`gh_ost_is_cutover` tinyint(1) DEFAULT '0'", | ||||||||||||
| } | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() { | ||||||||||||
| colDefs = append(colDefs, | ||||||||||||
| "`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'", | ||||||||||||
| "`gh_ost_move_tables_drain_gtid` text charset ascii", | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
| for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { | ||||||||||||
| if col.MySQLType == "" { | ||||||||||||
| return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) | ||||||||||||
|
|
@@ -797,12 +833,12 @@ func (apl *Applier) CreateCheckpointTable() error { | |||||||||||
| } | ||||||||||||
|
|
||||||||||||
| query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", | ||||||||||||
| sql.EscapeName(apl.migrationContext.DatabaseName), | ||||||||||||
| sql.EscapeName(apl.checkpointDatabaseName()), | ||||||||||||
| sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), | ||||||||||||
| strings.Join(colDefs, ",\n "), | ||||||||||||
| ) | ||||||||||||
| apl.migrationContext.Log.Infof("Created checkpoint table") | ||||||||||||
| if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { | ||||||||||||
| if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { | ||||||||||||
| return err | ||||||||||||
| } | ||||||||||||
| return nil | ||||||||||||
|
|
@@ -811,14 +847,14 @@ func (apl *Applier) CreateCheckpointTable() error { | |||||||||||
| // dropTable drops a given table on the applied host | ||||||||||||
| func (apl *Applier) dropTable(tableName string) error { | ||||||||||||
| query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, | ||||||||||||
| sql.EscapeName(apl.migrationContext.DatabaseName), | ||||||||||||
| sql.EscapeName(apl.checkpointDatabaseName()), | ||||||||||||
| sql.EscapeName(tableName), | ||||||||||||
| ) | ||||||||||||
| apl.migrationContext.Log.Infof("Dropping table %s.%s", | ||||||||||||
| sql.EscapeName(apl.migrationContext.DatabaseName), | ||||||||||||
| sql.EscapeName(apl.checkpointDatabaseName()), | ||||||||||||
| sql.EscapeName(tableName), | ||||||||||||
| ) | ||||||||||||
| if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { | ||||||||||||
| if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { | ||||||||||||
| return err | ||||||||||||
| } | ||||||||||||
| apl.migrationContext.Log.Infof("Table dropped") | ||||||||||||
|
|
@@ -983,24 +1019,51 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { | |||||||||||
| return insertId, err | ||||||||||||
| } | ||||||||||||
| args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover) | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() { | ||||||||||||
| args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk)) | ||||||||||||
| } | ||||||||||||
| args = append(args, uniqueKeyArgs...) | ||||||||||||
| res, err := apl.db.Exec(query, args...) | ||||||||||||
| res, err := apl.checkpointDB().Exec(query, args...) | ||||||||||||
| if err != nil { | ||||||||||||
| return insertId, err | ||||||||||||
| } | ||||||||||||
| return res.LastInsertId() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { | ||||||||||||
| row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) | ||||||||||||
| minColumnNames, maxColumnNames := apl.checkpointRangeColumnNames() | ||||||||||||
| selectColumns := []string{ | ||||||||||||
| "gh_ost_chk_id", | ||||||||||||
| "gh_ost_chk_timestamp", | ||||||||||||
| "gh_ost_chk_coords", | ||||||||||||
| "gh_ost_chk_iteration", | ||||||||||||
| "gh_ost_rows_copied", | ||||||||||||
| "gh_ost_dml_applied", | ||||||||||||
| "gh_ost_is_cutover", | ||||||||||||
| } | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() { | ||||||||||||
| selectColumns = append(selectColumns, "gh_ost_move_tables_cutover_started", "gh_ost_move_tables_drain_gtid") | ||||||||||||
| } | ||||||||||||
| selectColumns = append(selectColumns, minColumnNames...) | ||||||||||||
| selectColumns = append(selectColumns, maxColumnNames...) | ||||||||||||
|
|
||||||||||||
| row := apl.checkpointDB().QueryRow(fmt.Sprintf( | ||||||||||||
| `select /* gh-ost */ %s from %s.%s order by gh_ost_chk_id desc limit 1`, | ||||||||||||
| strings.Join(selectColumns, ", "), | ||||||||||||
| sql.EscapeName(apl.checkpointDatabaseName()), | ||||||||||||
| sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), | ||||||||||||
| )) | ||||||||||||
| chk := &Checkpoint{ | ||||||||||||
| IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), | ||||||||||||
| IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| var coordStr string | ||||||||||||
| var coordStr, drainGTIDStr string | ||||||||||||
| var timestamp int64 | ||||||||||||
| ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover} | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() { | ||||||||||||
| ptrs = append(ptrs, &chk.MoveTablesCutOverStarted, &drainGTIDStr) | ||||||||||||
| } | ||||||||||||
| ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) | ||||||||||||
| ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) | ||||||||||||
| err := row.Scan(ptrs...) | ||||||||||||
|
|
@@ -1024,6 +1087,51 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { | |||||||||||
| } | ||||||||||||
| chk.LastTrxCoords = fileCoords | ||||||||||||
| } | ||||||||||||
| if apl.migrationContext.IsMoveTablesMode() && drainGTIDStr != "" { | ||||||||||||
| drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) | ||||||||||||
| if err != nil { | ||||||||||||
| return nil, err | ||||||||||||
| } | ||||||||||||
| chk.MoveTablesCutOverDrainGTID = drainGTID | ||||||||||||
| } | ||||||||||||
| return chk, nil | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) { | ||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [question] why can't we re-use I assume there's something subtle I'm missing 🤔
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we technically could have. There might have been a reason at some point during my commit history but I agree it looks like it would be safe to use Lines 1020 to 1024 in f5273fa
|
||||||||||||
| row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s where gh_ost_move_tables_cutover_started = 1 and gh_ost_move_tables_drain_gtid is not null and gh_ost_move_tables_drain_gtid != '' order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) | ||||||||||||
| chk := &Checkpoint{} | ||||||||||||
| var coordStr, drainGTIDStr string | ||||||||||||
| var timestamp int64 | ||||||||||||
| err := row.Scan(&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr) | ||||||||||||
| if err != nil { | ||||||||||||
| if errors.Is(err, gosql.ErrNoRows) { | ||||||||||||
| return nil, ErrNoCheckpointFound | ||||||||||||
| } | ||||||||||||
| return nil, err | ||||||||||||
| } | ||||||||||||
| chk.Timestamp = time.Unix(timestamp, 0) | ||||||||||||
| if coordStr != "" { | ||||||||||||
| if apl.migrationContext.UseGTIDs { | ||||||||||||
| coords, err := mysql.NewGTIDBinlogCoordinates(coordStr) | ||||||||||||
| if err != nil { | ||||||||||||
| return nil, err | ||||||||||||
| } | ||||||||||||
| chk.LastTrxCoords = coords | ||||||||||||
| } else { | ||||||||||||
| coords, err := mysql.ParseFileBinlogCoordinates(coordStr) | ||||||||||||
| if err != nil { | ||||||||||||
| return nil, err | ||||||||||||
| } | ||||||||||||
| chk.LastTrxCoords = coords | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| if drainGTIDStr != "" { | ||||||||||||
| drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) | ||||||||||||
| if err != nil { | ||||||||||||
| return nil, err | ||||||||||||
| } | ||||||||||||
| chk.MoveTablesCutOverDrainGTID = drainGTID | ||||||||||||
| } | ||||||||||||
| return chk, nil | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to not fail the request without these, and just enable them. Could also change it to a
Fataland force the operator to rerun with the required flags. But that feels kinda silly if they are always required.