-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Move-tables. Multi-table #1726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move-tables. Multi-table #1726
Changes from all commits
d6ab6c8
43d298b
7778d74
f9ad349
aadb5ee
7e18a4e
d4ab1bf
a85e5f6
79394c2
dd8b581
a4fe2d2
43db172
42ce923
b133029
c6ca7ef
ec1671d
a52ebf7
19002c0
dc1d943
d38bf67
56a6c4a
6a3055a
d378a17
cdebc6c
dd6d32a
b203557
b68a807
cb1f359
a3a8f09
9e752b8
69e1110
8738524
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,10 +7,13 @@ package base | |
|
|
||
| import ( | ||
| "context" | ||
| "crypto/sha256" | ||
| "encoding/hex" | ||
| "fmt" | ||
| "math" | ||
| "os" | ||
| "regexp" | ||
| "sort" | ||
| "strings" | ||
| "sync" | ||
| "sync/atomic" | ||
|
|
@@ -76,6 +79,139 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea | |
| } | ||
| } | ||
|
|
||
| // MoveTable holds the per-table runtime state for a single table within a | ||
| // move-tables run. In move-tables mode the surrounding plumbing (one binlog | ||
| // stream, one applier connection, one throttler, one hooks executor) stays | ||
| // singular, but every migrated table carries its own schema, unique key, | ||
| // iteration progress, and counters keyed by table name. | ||
| // | ||
| // The range/iteration fields are guarded by the per-table rangeMutex. The | ||
| // applier-wide "current applied source coordinates" mutex stays single — there | ||
| // is one applied stream feeding all tables. | ||
| type MoveTable struct { | ||
| // Identity. | ||
| SourceDatabaseName string | ||
| SourceTableName string | ||
| TargetDatabaseName string | ||
| TargetTableName string | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
|
|
||
| // CreateTableStatement is the captured `SHOW CREATE TABLE` from the source, | ||
| // used to (re)create the table on the target. | ||
| CreateTableStatement string | ||
|
|
||
| // Schema, captured from the source (or from the target, on resume). In | ||
| // move-tables mode source and target schemas match, so the shared columns are | ||
| // identical to the original columns. | ||
| OriginalTableColumns *sql.ColumnList | ||
| OriginalTableVirtualColumns *sql.ColumnList | ||
| OriginalTableUniqueKeys [](*sql.UniqueKey) | ||
| UniqueKey *sql.UniqueKey | ||
| SharedColumns *sql.ColumnList | ||
| MappedSharedColumns *sql.ColumnList | ||
|
|
||
| // RowsEstimate is the estimated row count for this table. | ||
| RowsEstimate int64 | ||
|
|
||
| // Iteration / range state. Guarded by rangeMutex (except Iteration, which is | ||
| // accessed atomically so status readers don't need the lock). | ||
| MigrationRangeMinValues *sql.ColumnValues | ||
| MigrationRangeMaxValues *sql.ColumnValues | ||
| MigrationIterationRangeMinValues *sql.ColumnValues | ||
| MigrationIterationRangeMaxValues *sql.ColumnValues | ||
| Iteration int64 | ||
|
|
||
| // LastIterationRange* record the last successfully-copied chunk range, used | ||
| // for checkpointing. Guarded by rangeMutex. | ||
| LastIterationRangeMinValues *sql.ColumnValues | ||
| LastIterationRangeMaxValues *sql.ColumnValues | ||
|
|
||
| // RowsCopied is the number of rows copied for this table (accessed atomically). | ||
| RowsCopied int64 | ||
|
|
||
| // rowCopyComplete is set (1) once this table's row copy finishes. The | ||
| // on-row-copy-complete hook and the cutover only proceed once every table is | ||
| // complete. Accessed atomically. | ||
| rowCopyComplete int64 | ||
|
|
||
| // rangeMutex guards this table's range/iteration fields. | ||
| rangeMutex sync.Mutex | ||
| } | ||
|
|
||
| // GetIteration returns the table's current iteration counter. | ||
| func (mt *MoveTable) GetIteration() int64 { | ||
| return atomic.LoadInt64(&mt.Iteration) | ||
| } | ||
|
|
||
| // IncrementIteration advances the table's iteration counter by one. | ||
| func (mt *MoveTable) IncrementIteration() { | ||
| atomic.AddInt64(&mt.Iteration, 1) | ||
| } | ||
|
|
||
| // SetNextIterationRangeMinValues advances the iteration window: the next chunk's | ||
| // min becomes the previous chunk's max (or the table min for the first chunk). | ||
| func (mt *MoveTable) SetNextIterationRangeMinValues() { | ||
| mt.rangeMutex.Lock() | ||
| defer mt.rangeMutex.Unlock() | ||
| mt.MigrationIterationRangeMinValues = mt.MigrationIterationRangeMaxValues | ||
| if mt.MigrationIterationRangeMinValues == nil { | ||
| mt.MigrationIterationRangeMinValues = mt.MigrationRangeMinValues | ||
| } | ||
| } | ||
|
|
||
| // IsRowCopyComplete reports whether this table has finished its row copy. | ||
| func (mt *MoveTable) IsRowCopyComplete() bool { | ||
| return atomic.LoadInt64(&mt.rowCopyComplete) > 0 | ||
| } | ||
|
|
||
| // SetRowCopyComplete marks this table's row copy as finished. | ||
| func (mt *MoveTable) SetRowCopyComplete() { | ||
| atomic.StoreInt64(&mt.rowCopyComplete, 1) | ||
| } | ||
|
|
||
| // RecordLastIterationRange stores the last successfully-copied chunk range for | ||
| // checkpointing. | ||
| func (mt *MoveTable) RecordLastIterationRange() { | ||
| mt.rangeMutex.Lock() | ||
| defer mt.rangeMutex.Unlock() | ||
| if mt.MigrationIterationRangeMinValues != nil && mt.MigrationIterationRangeMaxValues != nil { | ||
| mt.LastIterationRangeMinValues = mt.MigrationIterationRangeMinValues.Clone() | ||
| mt.LastIterationRangeMaxValues = mt.MigrationIterationRangeMaxValues.Clone() | ||
| } | ||
| } | ||
|
|
||
| // GetLastIterationRange returns clones of the last successfully-copied chunk | ||
| // range for checkpointing. Either value may be nil if no chunk has completed. | ||
| func (mt *MoveTable) GetLastIterationRange() (minValues, maxValues *sql.ColumnValues) { | ||
| mt.rangeMutex.Lock() | ||
| defer mt.rangeMutex.Unlock() | ||
| if mt.LastIterationRangeMinValues != nil { | ||
| minValues = mt.LastIterationRangeMinValues.Clone() | ||
| } | ||
| if mt.LastIterationRangeMaxValues != nil { | ||
| maxValues = mt.LastIterationRangeMaxValues.Clone() | ||
| } | ||
| return minValues, maxValues | ||
| } | ||
|
|
||
| // GetRowsCopied returns the number of rows copied for this table. | ||
| func (mt *MoveTable) GetRowsCopied() int64 { | ||
| return atomic.LoadInt64(&mt.RowsCopied) | ||
| } | ||
|
|
||
| // RestoreFromCheckpoint rehydrates this table's row-copy state from a resumed | ||
| // checkpoint: the next chunk starts at the last-copied range, and the iteration | ||
| // counter and rows-copied total are restored. | ||
| func (mt *MoveTable) RestoreFromCheckpoint(rangeMin, rangeMax *sql.ColumnValues, iteration, rowsCopied int64) { | ||
| mt.rangeMutex.Lock() | ||
| mt.MigrationIterationRangeMinValues = rangeMin | ||
| mt.MigrationIterationRangeMaxValues = rangeMax | ||
| mt.LastIterationRangeMinValues = rangeMin | ||
| mt.LastIterationRangeMaxValues = rangeMax | ||
| mt.rangeMutex.Unlock() | ||
| atomic.StoreInt64(&mt.Iteration, iteration) | ||
| atomic.StoreInt64(&mt.RowsCopied, rowsCopied) | ||
| } | ||
|
|
||
| // MigrationContext has the general, global state of migration. It is used by | ||
| // all components throughout the migration process. | ||
| type MigrationContext struct { | ||
|
|
@@ -285,12 +421,15 @@ type MigrationContext struct { | |
|
|
||
| // move tables: | ||
| MoveTables struct { | ||
| TableNames []string // List of table names to be moved. | ||
| TargetHost string // Target hostname for the move. This must be a primary/writable host. | ||
| TargetPort int // Target MySQL port for the move. | ||
| TargetUser string // Target username for the move. If not specified, it will default to the source user. | ||
| TargetPass string // Target password for the move. If not specified, it will default to the source password. | ||
| TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. | ||
| TableNames []string // Ordered list of table names to be moved (order from --move-tables). Iteration is deterministic over this slice, never over the Tables map. | ||
| // Tables holds the per-table runtime state, keyed by source table name. | ||
| // Populated by InitMoveTableContainers() once per-table schema is known. | ||
| Tables map[string]*MoveTable | ||
| TargetHost string // Target hostname for the move. This must be a primary/writable host. | ||
| TargetPort int // Target MySQL port for the move. | ||
| TargetUser string // Target username for the move. If not specified, it will default to the source user. | ||
| TargetPass string // Target password for the move. If not specified, it will default to the source password. | ||
| TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. | ||
|
|
||
| // AllowOnSourcePrimary opts in to running the move-tables read path (schema | ||
| // inspection, the full row copy, binlog streaming) directly against the | ||
|
|
@@ -418,6 +557,9 @@ func getSafeTableName(baseName string, suffix string) string { | |
| // GetGhostTableName generates the name of ghost table, based on original table name | ||
| // or a given table name | ||
| func (mctx *MigrationContext) GetGhostTableName() string { | ||
| if mctx.IsMoveTablesMode() { | ||
| panic("GetGhostTableName() must not be called in move-tables mode; there is no ghost table (the target keeps each migrated table's name)") | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I chose to add panics to function calls that I felt were dangerous enough if they accidentally were called by a codepath from One thought - we could swap the raw panics for |
||
| } | ||
| if mctx.Revert { | ||
| // When reverting the "ghost" table is the _del table from the original migration. | ||
| return mctx.OldTableName | ||
|
|
@@ -429,15 +571,6 @@ func (mctx *MigrationContext) GetGhostTableName() string { | |
| } | ||
| } | ||
|
|
||
| // GetTargetTableName generates the name of the target table, based on original table name and | ||
| // the migration context (i.e. move-tables mode). | ||
| func (mctx *MigrationContext) GetTargetTableName() string { | ||
| if mctx.IsMoveTablesMode() { | ||
| return mctx.MoveTables.TableNames[0] | ||
| } | ||
| return mctx.GetGhostTableName() | ||
| } | ||
|
|
||
| // GetTargetDatabaseName fetches the name of the target database, which defaults to the original | ||
| // database name unless we're in move-tables mode. | ||
| func (mctx *MigrationContext) GetTargetDatabaseName() string { | ||
|
|
@@ -449,6 +582,9 @@ func (mctx *MigrationContext) GetTargetDatabaseName() string { | |
|
|
||
| // GetOldTableName generates the name of the "old" table, into which the original table is renamed. | ||
| func (mctx *MigrationContext) GetOldTableName() string { | ||
| if mctx.IsMoveTablesMode() { | ||
| panic("GetOldTableName() must not be called in move-tables mode; use MoveTableDelName(tableName) for each migrated table's `_<table>_del` rollback handle") | ||
| } | ||
| var tableName string | ||
| if mctx.ForceTmpTableName != "" { | ||
| tableName = mctx.ForceTmpTableName | ||
|
|
@@ -470,9 +606,29 @@ func (mctx *MigrationContext) GetOldTableName() string { | |
| return getSafeTableName(tableName, suffix) | ||
| } | ||
|
|
||
| // MoveTableDelName returns the `_<table>_del` rollback-handle table name for a | ||
| // specific migrated table in move-tables mode. It mirrors GetOldTableName but | ||
| // for an explicit table name, so a multi-table cutover can rename every source | ||
| // table in one atomic RENAME. Revert is disallowed in move-tables mode, so the | ||
| // suffix is always "del". | ||
| func (mctx *MigrationContext) MoveTableDelName(tableName string) string { | ||
| suffix := "del" | ||
| if mctx.TimestampOldTable { | ||
| t := mctx.StartTime | ||
| timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d", | ||
| t.Year(), t.Month(), t.Day(), | ||
| t.Hour(), t.Minute(), t.Second()) | ||
| return getSafeTableName(tableName, fmt.Sprintf("%s_%s", timestamp, suffix)) | ||
| } | ||
| return getSafeTableName(tableName, suffix) | ||
| } | ||
|
|
||
| // GetChangelogTableName generates the name of changelog table, based on original table name | ||
| // or a given table name. | ||
| func (mctx *MigrationContext) GetChangelogTableName() string { | ||
| if mctx.IsMoveTablesMode() { | ||
| panic("GetChangelogTableName() must not be called in move-tables mode; there is no changelog table (§1.2)") | ||
| } | ||
| if mctx.ForceTmpTableName != "" { | ||
| return getSafeTableName(mctx.ForceTmpTableName, "ghc") | ||
| } else { | ||
|
|
@@ -484,15 +640,13 @@ func (mctx *MigrationContext) GetChangelogTableName() string { | |
| func (mctx *MigrationContext) GetCheckpointTableName() string { | ||
| if mctx.ForceTmpTableName != "" { | ||
| return getSafeTableName(mctx.ForceTmpTableName, "ghk") | ||
| } else { | ||
| return getSafeTableName(mctx.OriginalTableName, "ghk") | ||
| } | ||
| } | ||
|
|
||
| // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout | ||
| // the swap-tables process. | ||
| func (mctx *MigrationContext) GetVoluntaryLockName() string { | ||
| return fmt.Sprintf("%s.%s.lock", mctx.DatabaseName, mctx.OriginalTableName) | ||
| if mctx.IsMoveTablesMode() { | ||
| // One checkpoint table per run, named from the set-derived run token so it | ||
| // does not depend on any single migrated table and is stable across resume. | ||
| return getSafeTableName("gho_"+mctx.MoveTablesRunToken(), "ghk") | ||
| } | ||
| return getSafeTableName(mctx.OriginalTableName, "ghk") | ||
| } | ||
|
|
||
| // RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW` | ||
|
|
@@ -1189,6 +1343,84 @@ func (mctx *MigrationContext) IsMoveTablesMode() bool { | |
| return len(mctx.MoveTables.TableNames) > 0 | ||
| } | ||
|
|
||
| // InitMoveTableContainers builds (or rebuilds) the per-table runtime containers | ||
| // from the ordered MoveTables.TableNames list. It is idempotent: tables already | ||
| // present in the map keep their existing container so callers may invoke it | ||
| // after partially populating state. Source and target table names match in | ||
| // move-tables mode; only the database may differ. | ||
| func (mctx *MigrationContext) InitMoveTableContainers() { | ||
| if mctx.MoveTables.Tables == nil { | ||
| mctx.MoveTables.Tables = make(map[string]*MoveTable, len(mctx.MoveTables.TableNames)) | ||
| } | ||
| for _, tableName := range mctx.MoveTables.TableNames { | ||
| if _, ok := mctx.MoveTables.Tables[tableName]; ok { | ||
| continue | ||
| } | ||
| mctx.MoveTables.Tables[tableName] = &MoveTable{ | ||
| SourceDatabaseName: mctx.DatabaseName, | ||
| SourceTableName: tableName, | ||
| TargetDatabaseName: mctx.GetTargetDatabaseName(), | ||
| TargetTableName: tableName, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // GetMoveTable returns the per-table container for the given source table name, | ||
| // or nil if it has not been initialized. | ||
| func (mctx *MigrationContext) GetMoveTable(tableName string) *MoveTable { | ||
| if mctx.MoveTables.Tables == nil { | ||
| return nil | ||
| } | ||
| return mctx.MoveTables.Tables[tableName] | ||
| } | ||
|
|
||
| // OrderedMoveTables returns the per-table containers in --move-tables order. | ||
| // Iteration must always use this deterministic order, never the Tables map's | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
| // (random) iteration order. | ||
| func (mctx *MigrationContext) OrderedMoveTables() []*MoveTable { | ||
| tables := make([]*MoveTable, 0, len(mctx.MoveTables.TableNames)) | ||
| for _, tableName := range mctx.MoveTables.TableNames { | ||
| if mt := mctx.GetMoveTable(tableName); mt != nil { | ||
| tables = append(tables, mt) | ||
| } | ||
| } | ||
| return tables | ||
| } | ||
|
|
||
| // MoveTablesRunToken returns a short, stable identifier for a move-tables run, | ||
| // derived from the (sorted) set of migrated table names. It is: | ||
| // - deterministic: the same table set always yields the same token, so a | ||
| // resumed run finds the same run-wide artifacts (e.g. the checkpoint table). | ||
| // - order-independent: --move-tables=a,b and --move-tables=b,a match. | ||
| // - fixed-length: independent of how many tables are moved (so it never blows | ||
| // past identifier length limits the way a concatenation of names would). | ||
| // | ||
| // It is used to name run-wide singular artifacts (checkpoint table, applier | ||
| // advisory lock, serve socket) so they never depend on any single migrated | ||
| // table name. Returns "" outside move-tables mode. | ||
| func (mctx *MigrationContext) MoveTablesRunToken() string { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the func that determines the unique identifier per run based on the table names. More details in the comment above but just wanted to flag it directly with a comment. A non-start alternative could have been concating all the table names, but that would make for a really long identifier in most cases. Another alternative could have been to require ForceTmpTableName in |
||
| if !mctx.IsMoveTablesMode() { | ||
| return "" | ||
| } | ||
| names := append([]string(nil), mctx.MoveTables.TableNames...) | ||
| sort.Strings(names) | ||
| // NUL separator: table names cannot contain it, so the join is unambiguous. | ||
| sum := sha256.Sum256([]byte(strings.Join(names, "\x00"))) | ||
| return hex.EncodeToString(sum[:6]) // 12 hex chars / 48 bits | ||
| } | ||
|
|
||
| // AllMoveTablesRowCopyComplete reports whether every migrated table has finished | ||
| // its row copy. The on-row-copy-complete hook and the cutover only proceed once | ||
| // this is true. | ||
| func (mctx *MigrationContext) AllMoveTablesRowCopyComplete() bool { | ||
| for _, mt := range mctx.OrderedMoveTables() { | ||
| if !mt.IsRowCopyComplete() { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| // SendWithContext attempts to send a value to a channel, but returns early | ||
| // if the context is cancelled. This prevents goroutine deadlocks when the | ||
| // channel receiver has exited due to an error. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.