Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d6ab6c8
wip
zacharysierakowski Jun 22, 2026
43d298b
Merge branch 'feature-move-tables' into multitable-spike
zacharysierakowski Jun 22, 2026
7778d74
stash
zacharysierakowski Jun 22, 2026
f9ad349
tidy printMigrationStatusHint
zacharysierakowski Jun 22, 2026
aadb5ee
three tables
zacharysierakowski Jun 22, 2026
7e18a4e
Merge branch 'lag-measurement-mods' into multitable-spike
zacharysierakowski Jun 22, 2026
d4ab1bf
Fix TableNames[0] refs and change checkpoint to row per table. Add ha…
zacharysierakowski Jun 22, 2026
a85e5f6
update GH_OST_TABLE_NAME
zacharysierakowski Jun 22, 2026
79394c2
misc
zacharysierakowski Jun 23, 2026
dd8b581
strong guards (some panics) against any calls to .OriginalTableName o…
zacharysierakowski Jun 23, 2026
a4fe2d2
remove dead code
zacharysierakowski Jun 23, 2026
43db172
guard read/write checkpoint funcs
zacharysierakowski Jun 23, 2026
42ce923
fix row closure lint
zacharysierakowski Jun 23, 2026
b133029
more guards on non-move-table specific funcs. fix up front table exis…
zacharysierakowski Jun 23, 2026
c6ca7ef
Merge branch 'chriskirkland/issues-8260-pt2' into multitable-spike
zacharysierakowski Jun 23, 2026
ec1671d
Merge branch 'chriskirkland/issues-8260-pt2' into multitable-spike
zacharysierakowski Jun 23, 2026
a52ebf7
fix existing test case that queried the checkpoint table
zacharysierakowski Jun 23, 2026
19002c0
merge conflict
zacharysierakowski Jun 23, 2026
dc1d943
rename func to be more clear for move-tables multi table drop
zacharysierakowski Jun 23, 2026
d38bf67
applier_test fixes
zacharysierakowski Jun 23, 2026
56a6c4a
test coverage fix, mising movetables.tableNames
zacharysierakowski Jun 23, 2026
6a3055a
failpoint merge conflict
zacharysierakowski Jun 23, 2026
d378a17
Merge branch 'chriskirkland/issues-8260-pt2' into multitable-spike
zacharysierakowski Jun 23, 2026
cdebc6c
fixed callsite of move-tables-panic-after-row-copy
zacharysierakowski Jun 23, 2026
dd6d32a
naming things
zacharysierakowski Jun 23, 2026
b203557
dynamic lookup _ghk
zacharysierakowski Jun 23, 2026
b68a807
couple straggling issues from e2e testing
zacharysierakowski Jun 23, 2026
cb1f359
Merge branch 'chriskirkland/issues-8260-pt2' into multitable-spike
zacharysierakowski Jun 23, 2026
a3a8f09
Merge branch 'feature-move-tables' into multitable-spike
zacharysierakowski Jun 24, 2026
9e752b8
added some unit tests for checkpoint
zacharysierakowski Jun 24, 2026
69e1110
integration tests
zacharysierakowski Jun 24, 2026
8738524
get rid of GetTargetTableName
zacharysierakowski Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 255 additions & 23 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ package base

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"math"
"os"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,6 +79,139 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
}
}

// MoveTable holds the per-table runtime state for a single table within a
Comment thread
zacharysierakowski marked this conversation as resolved.
// move-tables run. In move-tables mode the surrounding plumbing (one binlog
// stream, one applier connection, one throttler, one hooks executor) stays
// singular, but every migrated table carries its own schema, unique key,
// iteration progress, and counters keyed by table name.
//
// The range/iteration fields are guarded by the per-table rangeMutex. The
// applier-wide "current applied source coordinates" mutex stays single — there
// is one applied stream feeding all tables.
type MoveTable struct {
// Identity.
SourceDatabaseName string
SourceTableName string
TargetDatabaseName string
TargetTableName string
Comment thread
zacharysierakowski marked this conversation as resolved.

// CreateTableStatement is the captured `SHOW CREATE TABLE` from the source,
// used to (re)create the table on the target.
CreateTableStatement string

// Schema, captured from the source (or from the target, on resume). In
// move-tables mode source and target schemas match, so the shared columns are
// identical to the original columns.
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
MappedSharedColumns *sql.ColumnList

// RowsEstimate is the estimated row count for this table.
RowsEstimate int64

// Iteration / range state. Guarded by rangeMutex (except Iteration, which is
// accessed atomically so status readers don't need the lock).
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
Iteration int64

// LastIterationRange* record the last successfully-copied chunk range, used
// for checkpointing. Guarded by rangeMutex.
LastIterationRangeMinValues *sql.ColumnValues
LastIterationRangeMaxValues *sql.ColumnValues

// RowsCopied is the number of rows copied for this table (accessed atomically).
RowsCopied int64

// rowCopyComplete is set (1) once this table's row copy finishes. The
// on-row-copy-complete hook and the cutover only proceed once every table is
// complete. Accessed atomically.
rowCopyComplete int64

// rangeMutex guards this table's range/iteration fields.
rangeMutex sync.Mutex
}

// GetIteration returns the table's current iteration counter.
func (mt *MoveTable) GetIteration() int64 {
return atomic.LoadInt64(&mt.Iteration)
}

// IncrementIteration advances the table's iteration counter by one.
func (mt *MoveTable) IncrementIteration() {
atomic.AddInt64(&mt.Iteration, 1)
}

// SetNextIterationRangeMinValues advances the iteration window: the next chunk's
// min becomes the previous chunk's max (or the table min for the first chunk).
func (mt *MoveTable) SetNextIterationRangeMinValues() {
mt.rangeMutex.Lock()
defer mt.rangeMutex.Unlock()
mt.MigrationIterationRangeMinValues = mt.MigrationIterationRangeMaxValues
if mt.MigrationIterationRangeMinValues == nil {
mt.MigrationIterationRangeMinValues = mt.MigrationRangeMinValues
}
}

// IsRowCopyComplete reports whether this table has finished its row copy.
func (mt *MoveTable) IsRowCopyComplete() bool {
return atomic.LoadInt64(&mt.rowCopyComplete) > 0
}

// SetRowCopyComplete marks this table's row copy as finished.
func (mt *MoveTable) SetRowCopyComplete() {
atomic.StoreInt64(&mt.rowCopyComplete, 1)
}

// RecordLastIterationRange stores the last successfully-copied chunk range for
// checkpointing.
func (mt *MoveTable) RecordLastIterationRange() {
mt.rangeMutex.Lock()
defer mt.rangeMutex.Unlock()
if mt.MigrationIterationRangeMinValues != nil && mt.MigrationIterationRangeMaxValues != nil {
mt.LastIterationRangeMinValues = mt.MigrationIterationRangeMinValues.Clone()
mt.LastIterationRangeMaxValues = mt.MigrationIterationRangeMaxValues.Clone()
}
}

// GetLastIterationRange returns clones of the last successfully-copied chunk
// range for checkpointing. Either value may be nil if no chunk has completed.
func (mt *MoveTable) GetLastIterationRange() (minValues, maxValues *sql.ColumnValues) {
mt.rangeMutex.Lock()
defer mt.rangeMutex.Unlock()
if mt.LastIterationRangeMinValues != nil {
minValues = mt.LastIterationRangeMinValues.Clone()
}
if mt.LastIterationRangeMaxValues != nil {
maxValues = mt.LastIterationRangeMaxValues.Clone()
}
return minValues, maxValues
}

// GetRowsCopied returns the number of rows copied for this table.
func (mt *MoveTable) GetRowsCopied() int64 {
return atomic.LoadInt64(&mt.RowsCopied)
}

// RestoreFromCheckpoint rehydrates this table's row-copy state from a resumed
// checkpoint: the next chunk starts at the last-copied range, and the iteration
// counter and rows-copied total are restored.
func (mt *MoveTable) RestoreFromCheckpoint(rangeMin, rangeMax *sql.ColumnValues, iteration, rowsCopied int64) {
mt.rangeMutex.Lock()
mt.MigrationIterationRangeMinValues = rangeMin
mt.MigrationIterationRangeMaxValues = rangeMax
mt.LastIterationRangeMinValues = rangeMin
mt.LastIterationRangeMaxValues = rangeMax
mt.rangeMutex.Unlock()
atomic.StoreInt64(&mt.Iteration, iteration)
atomic.StoreInt64(&mt.RowsCopied, rowsCopied)
}

// MigrationContext has the general, global state of migration. It is used by
// all components throughout the migration process.
type MigrationContext struct {
Expand Down Expand Up @@ -285,12 +421,15 @@ type MigrationContext struct {

// move tables:
MoveTables struct {
TableNames []string // List of table names to be moved.
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.
TableNames []string // Ordered list of table names to be moved (order from --move-tables). Iteration is deterministic over this slice, never over the Tables map.
// Tables holds the per-table runtime state, keyed by source table name.
// Populated by InitMoveTableContainers() once per-table schema is known.
Tables map[string]*MoveTable
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.

// AllowOnSourcePrimary opts in to running the move-tables read path (schema
// inspection, the full row copy, binlog streaming) directly against the
Expand Down Expand Up @@ -418,6 +557,9 @@ func getSafeTableName(baseName string, suffix string) string {
// GetGhostTableName generates the name of ghost table, based on original table name
// or a given table name
func (mctx *MigrationContext) GetGhostTableName() string {
if mctx.IsMoveTablesMode() {
panic("GetGhostTableName() must not be called in move-tables mode; there is no ghost table (the target keeps each migrated table's name)")

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to add panics to function calls that I felt were dangerous enough if they accidentally were called by a codepath from --move-tables. Happy to discuss alternatives, or ditching the panics altogether, but they gave me confidence while making the changes across this PR.

One thought - we could swap the raw panics for failpoint panics and only enable them for integration tests if we wanted to get fancier while removing intentional panics from the prod binary. But I don't know if that gains us that much.

}
if mctx.Revert {
// When reverting the "ghost" table is the _del table from the original migration.
return mctx.OldTableName
Expand All @@ -429,15 +571,6 @@ func (mctx *MigrationContext) GetGhostTableName() string {
}
}

// GetTargetTableName generates the name of the target table, based on original table name and
// the migration context (i.e. move-tables mode).
func (mctx *MigrationContext) GetTargetTableName() string {
if mctx.IsMoveTablesMode() {
return mctx.MoveTables.TableNames[0]
}
return mctx.GetGhostTableName()
}

// GetTargetDatabaseName fetches the name of the target database, which defaults to the original
// database name unless we're in move-tables mode.
func (mctx *MigrationContext) GetTargetDatabaseName() string {
Expand All @@ -449,6 +582,9 @@ func (mctx *MigrationContext) GetTargetDatabaseName() string {

// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
func (mctx *MigrationContext) GetOldTableName() string {
if mctx.IsMoveTablesMode() {
panic("GetOldTableName() must not be called in move-tables mode; use MoveTableDelName(tableName) for each migrated table's `_<table>_del` rollback handle")
}
var tableName string
if mctx.ForceTmpTableName != "" {
tableName = mctx.ForceTmpTableName
Expand All @@ -470,9 +606,29 @@ func (mctx *MigrationContext) GetOldTableName() string {
return getSafeTableName(tableName, suffix)
}

// MoveTableDelName returns the `_<table>_del` rollback-handle table name for a
// specific migrated table in move-tables mode. It mirrors GetOldTableName but
// for an explicit table name, so a multi-table cutover can rename every source
// table in one atomic RENAME. Revert is disallowed in move-tables mode, so the
// suffix is always "del".
func (mctx *MigrationContext) MoveTableDelName(tableName string) string {
suffix := "del"
if mctx.TimestampOldTable {
t := mctx.StartTime
timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute(), t.Second())
return getSafeTableName(tableName, fmt.Sprintf("%s_%s", timestamp, suffix))
}
return getSafeTableName(tableName, suffix)
}

// GetChangelogTableName generates the name of changelog table, based on original table name
// or a given table name.
func (mctx *MigrationContext) GetChangelogTableName() string {
if mctx.IsMoveTablesMode() {
panic("GetChangelogTableName() must not be called in move-tables mode; there is no changelog table (§1.2)")
}
if mctx.ForceTmpTableName != "" {
return getSafeTableName(mctx.ForceTmpTableName, "ghc")
} else {
Expand All @@ -484,15 +640,13 @@ func (mctx *MigrationContext) GetChangelogTableName() string {
func (mctx *MigrationContext) GetCheckpointTableName() string {
if mctx.ForceTmpTableName != "" {
return getSafeTableName(mctx.ForceTmpTableName, "ghk")
} else {
return getSafeTableName(mctx.OriginalTableName, "ghk")
}
}

// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
// the swap-tables process.
func (mctx *MigrationContext) GetVoluntaryLockName() string {
return fmt.Sprintf("%s.%s.lock", mctx.DatabaseName, mctx.OriginalTableName)
if mctx.IsMoveTablesMode() {
// One checkpoint table per run, named from the set-derived run token so it
// does not depend on any single migrated table and is stable across resume.
return getSafeTableName("gho_"+mctx.MoveTablesRunToken(), "ghk")
}
return getSafeTableName(mctx.OriginalTableName, "ghk")
}

// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
Expand Down Expand Up @@ -1189,6 +1343,84 @@ func (mctx *MigrationContext) IsMoveTablesMode() bool {
return len(mctx.MoveTables.TableNames) > 0
}

// InitMoveTableContainers builds (or rebuilds) the per-table runtime containers
// from the ordered MoveTables.TableNames list. It is idempotent: tables already
// present in the map keep their existing container so callers may invoke it
// after partially populating state. Source and target table names match in
// move-tables mode; only the database may differ.
func (mctx *MigrationContext) InitMoveTableContainers() {
if mctx.MoveTables.Tables == nil {
mctx.MoveTables.Tables = make(map[string]*MoveTable, len(mctx.MoveTables.TableNames))
}
for _, tableName := range mctx.MoveTables.TableNames {
if _, ok := mctx.MoveTables.Tables[tableName]; ok {
continue
}
mctx.MoveTables.Tables[tableName] = &MoveTable{
SourceDatabaseName: mctx.DatabaseName,
SourceTableName: tableName,
TargetDatabaseName: mctx.GetTargetDatabaseName(),
TargetTableName: tableName,
}
}
}

// GetMoveTable returns the per-table container for the given source table name,
// or nil if it has not been initialized.
func (mctx *MigrationContext) GetMoveTable(tableName string) *MoveTable {
if mctx.MoveTables.Tables == nil {
return nil
}
return mctx.MoveTables.Tables[tableName]
}

// OrderedMoveTables returns the per-table containers in --move-tables order.
// Iteration must always use this deterministic order, never the Tables map's
Comment thread
zacharysierakowski marked this conversation as resolved.
// (random) iteration order.
func (mctx *MigrationContext) OrderedMoveTables() []*MoveTable {
tables := make([]*MoveTable, 0, len(mctx.MoveTables.TableNames))
for _, tableName := range mctx.MoveTables.TableNames {
if mt := mctx.GetMoveTable(tableName); mt != nil {
tables = append(tables, mt)
}
}
return tables
}

// MoveTablesRunToken returns a short, stable identifier for a move-tables run,
// derived from the (sorted) set of migrated table names. It is:
// - deterministic: the same table set always yields the same token, so a
// resumed run finds the same run-wide artifacts (e.g. the checkpoint table).
// - order-independent: --move-tables=a,b and --move-tables=b,a match.
// - fixed-length: independent of how many tables are moved (so it never blows
// past identifier length limits the way a concatenation of names would).
//
// It is used to name run-wide singular artifacts (checkpoint table, applier
// advisory lock, serve socket) so they never depend on any single migrated
// table name. Returns "" outside move-tables mode.
func (mctx *MigrationContext) MoveTablesRunToken() string {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the func that determines the unique identifier per run based on the table names. More details in the comment above but just wanted to flag it directly with a comment.

A non-start alternative could have been concating all the table names, but that would make for a really long identifier in most cases.

Another alternative could have been to require ForceTmpTableName in --move-tables mode, so that the operator chooses what the identifier for the table is. We'd potentially need to extend that a bit for other callsites like the server sock file, etc.

if !mctx.IsMoveTablesMode() {
return ""
}
names := append([]string(nil), mctx.MoveTables.TableNames...)
sort.Strings(names)
// NUL separator: table names cannot contain it, so the join is unambiguous.
sum := sha256.Sum256([]byte(strings.Join(names, "\x00")))
return hex.EncodeToString(sum[:6]) // 12 hex chars / 48 bits
}

// AllMoveTablesRowCopyComplete reports whether every migrated table has finished
// its row copy. The on-row-copy-complete hook and the cutover only proceed once
// this is true.
func (mctx *MigrationContext) AllMoveTablesRowCopyComplete() bool {
for _, mt := range mctx.OrderedMoveTables() {
if !mt.IsRowCopyComplete() {
return false
}
}
return true
}

// SendWithContext attempts to send a value to a channel, but returns early
// if the context is cancelled. This prevents goroutine deadlocks when the
// channel receiver has exited due to an error.
Expand Down
38 changes: 38 additions & 0 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,44 @@ func TestGetTableNames(t *testing.T) {
}
}

func TestMoveTableDelName(t *testing.T) {
context := NewMigrationContext()
// Per-table `_<table>_del` rollback handle, independent of any other table.
require.Equal(t, "_some_table_del", context.MoveTableDelName("some_table"))
require.Equal(t, "_other_del", context.MoveTableDelName("other"))

// Honors --timestamp-old-table like the single-table GetOldTableName does.
context.TimestampOldTable = true
longForm := "Jan 2, 2006 at 3:04pm (MST)"
context.StartTime, _ = time.Parse(longForm, "Feb 3, 2013 at 7:54pm (PST)")
require.Equal(t, "_some_table_20130203195400_del", context.MoveTableDelName("some_table"))
}

func TestMoveTablesRunToken(t *testing.T) {
// Empty outside move-tables mode.
require.Equal(t, "", NewMigrationContext().MoveTablesRunToken())

context := NewMigrationContext()
context.MoveTables.TableNames = []string{"a", "b", "c"}
token := context.MoveTablesRunToken()
// Fixed length, lowercase hex (12 chars / 48 bits).
require.Len(t, token, 12)
require.Regexp(t, "^[0-9a-f]{12}$", token)
// Deterministic: the same set always yields the same token (so a resumed run
// finds the same run-wide artifacts).
require.Equal(t, token, context.MoveTablesRunToken())

// Order-independent: --move-tables=a,b,c and =c,b,a match.
reordered := NewMigrationContext()
reordered.MoveTables.TableNames = []string{"c", "b", "a"}
require.Equal(t, token, reordered.MoveTablesRunToken())

// A different set yields a different token.
different := NewMigrationContext()
different.MoveTables.TableNames = []string{"a", "b", "d"}
require.NotEqual(t, token, different.MoveTablesRunToken())
}

func TestGetTriggerNames(t *testing.T) {
{
context := NewMigrationContext()
Expand Down
Loading
Loading