From ad5bbbb83e728aa318416c0f79275a260a1f9104 Mon Sep 17 00:00:00 2001 From: dnovitski <54758025+dnovitski@users.noreply.github.com> Date: Fri, 22 May 2026 11:00:19 +0200 Subject: [PATCH 1/2] Add --chunk-concurrent-size for parallel row-copy Port of PR #1398 by @shaohk: allows multiple row-copy chunks to execute in parallel within each iteration using errgroup. Key changes: - Add IterationRangeValues struct for thread-safe range passing - Serialize range calculation with CalculateNextIterationRangeEndValuesLock - Rewrite iterateChunks to spawn N goroutines per queue item via errgroup - Return SQL warnings from ApplyIterationInsertQuery (eliminates race on shared MigrationLastInsertSQLWarnings field) - Increase DB connection pool when concurrency > default pool size - Add --chunk-concurrent-size CLI flag (default 1, no behavior change) Co-authored-by: shaohk Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- doc/command-line-flags.md | 10 +++ go/base/context.go | 109 ++++++++++++++++----------- go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 117 +++++++++++++++++++---------- go/logic/applier_test.go | 18 ++--- go/logic/migrator.go | 153 ++++++++++++++++++++++++++------------ 6 files changed, 272 insertions(+), 137 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 2012c8c4c..2f7c5032a 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -78,6 +78,16 @@ See also: [`resuming-migrations`](resume.md) `--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300. +### chunk-concurrent-size + +`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential). Minimum `1`. + +When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table. + +Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies. + +Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: diff --git a/go/base/context.go b/go/base/context.go index 26d13fe07..fc58b7c13 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -27,6 +27,16 @@ import ( "github.com/go-ini/ini" ) +// IterationRangeValues holds the range boundaries for a single chunk-copy iteration. +// Used by concurrent row-copy to pass isolated range values to each worker goroutine. +type IterationRangeValues struct { + Min *sql.ColumnValues + Max *sql.ColumnValues + Size int64 + IncludeMinValues bool + HasFurtherRange bool +} + // RowsEstimateMethod is the type of row number estimation type RowsEstimateMethod string @@ -131,6 +141,7 @@ type MigrationContext struct { HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 ChunkSize int64 + ChunkConcurrentSize int64 niceRatio float64 MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap @@ -240,27 +251,28 @@ type MigrationContext struct { Metrics *metrics.Client - OriginalTableColumnsOnApplier *sql.ColumnList - OriginalTableColumns *sql.ColumnList - OriginalTableVirtualColumns *sql.ColumnList - OriginalTableUniqueKeys [](*sql.UniqueKey) - OriginalTableAutoIncrement uint64 - GhostTableColumns *sql.ColumnList - GhostTableVirtualColumns *sql.ColumnList - GhostTableUniqueKeys [](*sql.UniqueKey) - UniqueKey *sql.UniqueKey - SharedColumns *sql.ColumnList - ColumnRenameMap map[string]string - DroppedColumnsMap map[string]bool - MappedSharedColumns *sql.ColumnList - MigrationLastInsertSQLWarnings []string - MigrationRangeMinValues *sql.ColumnValues - MigrationRangeMaxValues *sql.ColumnValues - Iteration int64 - MigrationIterationRangeMinValues *sql.ColumnValues - MigrationIterationRangeMaxValues *sql.ColumnValues - InitialStreamerCoords mysql.BinlogCoordinates - ForceTmpTableName string + OriginalTableColumnsOnApplier *sql.ColumnList + OriginalTableColumns *sql.ColumnList + OriginalTableVirtualColumns *sql.ColumnList + OriginalTableUniqueKeys [](*sql.UniqueKey) + OriginalTableAutoIncrement uint64 + GhostTableColumns *sql.ColumnList + GhostTableVirtualColumns *sql.ColumnList + GhostTableUniqueKeys [](*sql.UniqueKey) + UniqueKey *sql.UniqueKey + SharedColumns *sql.ColumnList + ColumnRenameMap map[string]string + DroppedColumnsMap map[string]bool + MappedSharedColumns *sql.ColumnList + MigrationLastInsertSQLWarnings []string + MigrationRangeMinValues *sql.ColumnValues + MigrationRangeMaxValues *sql.ColumnValues + Iteration int64 + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + CalculateNextIterationRangeEndValuesLock *sync.Mutex + InitialStreamerCoords mysql.BinlogCoordinates + ForceTmpTableName string IncludeTriggers bool RemoveTriggerSuffix bool @@ -310,29 +322,31 @@ type ContextConfig struct { func NewMigrationContext() *MigrationContext { ctx, cancelFunc := context.WithCancel(context.Background()) return &MigrationContext{ - Uuid: uuid.NewString(), - defaultNumRetries: 60, - ChunkSize: 1000, - InspectorConnectionConfig: mysql.NewConnectionConfig(), - ApplierConnectionConfig: mysql.NewConnectionConfig(), - MaxLagMillisecondsThrottleThreshold: 1500, - CutOverLockTimeoutSeconds: 3, - DMLBatchSize: 10, - etaNanoseonds: ETAUnknown, - maxLoad: NewLoadMap(), - criticalLoad: NewLoadMap(), - throttleMutex: &sync.Mutex{}, - throttleHTTPMutex: &sync.Mutex{}, - throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), - configMutex: &sync.Mutex{}, - pointOfInterestTimeMutex: &sync.Mutex{}, - lastHeartbeatOnChangelogMutex: &sync.Mutex{}, - ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error), - ctx: ctx, - cancelFunc: cancelFunc, - abortMutex: &sync.Mutex{}, - Log: NewDefaultLogger(), + Uuid: uuid.NewString(), + defaultNumRetries: 60, + ChunkSize: 1000, + ChunkConcurrentSize: 1, + InspectorConnectionConfig: mysql.NewConnectionConfig(), + ApplierConnectionConfig: mysql.NewConnectionConfig(), + MaxLagMillisecondsThrottleThreshold: 1500, + CutOverLockTimeoutSeconds: 3, + DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, + maxLoad: NewLoadMap(), + criticalLoad: NewLoadMap(), + throttleMutex: &sync.Mutex{}, + throttleHTTPMutex: &sync.Mutex{}, + throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), + configMutex: &sync.Mutex{}, + pointOfInterestTimeMutex: &sync.Mutex{}, + lastHeartbeatOnChangelogMutex: &sync.Mutex{}, + CalculateNextIterationRangeEndValuesLock: &sync.Mutex{}, + ColumnRenameMap: make(map[string]string), + PanicAbort: make(chan error), + ctx: ctx, + cancelFunc: cancelFunc, + abortMutex: &sync.Mutex{}, + Log: NewDefaultLogger(), } } @@ -693,6 +707,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) { atomic.StoreInt64(&mctx.ChunkSize, chunkSize) } +func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) { + if chunkConcurrentSize < 1 { + chunkConcurrentSize = 1 + } + atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize) +} + func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) { if batchSize < 1 { batchSize = 1 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d77046231..3a029f825 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -123,6 +123,7 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") + chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") @@ -375,6 +376,7 @@ func main() { migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) + migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize) migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetThrottleQuery(*throttleQuery) diff --git a/go/logic/applier.go b/go/logic/applier.go index f3474b3ef..1912f38da 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -123,6 +123,11 @@ func (apl *Applier) InitDBConnections() (err error) { if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil { return err } + concurrentSize := atomic.LoadInt64(&apl.migrationContext.ChunkConcurrentSize) + if concurrentSize > int64(mysql.MaxDBPoolConnections) { + apl.db.SetMaxOpenConns(int(concurrentSize) + mysql.MaxDBPoolConnections) + apl.db.SetMaxIdleConns(int(concurrentSize) + mysql.MaxDBPoolConnections) + } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil { return err @@ -1021,10 +1026,40 @@ func (apl *Applier) ReadMigrationRangeValues() error { } // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, -// which will be used for copying the next chunk of rows. Ir returns "false" if there is -// no further chunk to work through, i.e. we're past the last chunk and are done with -// iterating the range (and thus done with copying row chunks) -func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +// which will be used for copying the next chunk of rows. It returns an IterationRangeValues +// struct with HasFurtherRange=false if there is no further chunk to work through. +// Thread-safe: uses a mutex to serialize access for concurrent row-copy. +// When advanceCursor is true, the function determines min from MigrationIterationRangeMaxValues +// (for concurrent mode where each goroutine advances the cursor). +// When advanceCursor is false, min is read from MigrationIterationRangeMinValues (pre-set by +// SetNextIterationRangeMinValues for single-threaded retry compatibility). +func (apl *Applier) CalculateNextIterationRangeEndValues(advanceCursor bool) (values *base.IterationRangeValues, err error) { + apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock() + defer apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock() + + result := &base.IterationRangeValues{ + Size: atomic.LoadInt64(&apl.migrationContext.ChunkSize), + } + + if advanceCursor { + // Concurrent mode: advance min from current max cursor + result.Min = apl.migrationContext.MigrationIterationRangeMaxValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + result.IncludeMinValues = true + } + } else { + // Single-threaded mode: min was pre-set by SetNextIterationRangeMinValues + result.Min = apl.migrationContext.MigrationIterationRangeMinValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + } + // First iteration: include the minimum values. Use Iteration counter (not cursor state) + // because cursor is mutated on first calc success, but Iteration only advances after + // successful insert — so on retry of the first chunk, this still returns true. + result.IncludeMinValues = (apl.migrationContext.GetIteration() == 0) + } + for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -1034,46 +1069,49 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + result.Min.AbstractValues(), apl.migrationContext.MigrationRangeMaxValues.AbstractValues(), - atomic.LoadInt64(&apl.migrationContext.ChunkSize), - apl.migrationContext.GetIteration() == 0, + result.Size, + result.IncludeMinValues, fmt.Sprintf("iteration:%d", apl.migrationContext.GetIteration()), ) if err != nil { - return hasFurtherRange, err + return result, err } rows, err := apl.db.Query(query, explodedArgs...) if err != nil { - return hasFurtherRange, err + return result, err } defer rows.Close() iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { - return hasFurtherRange, err + return result, err } - hasFurtherRange = true + result.HasFurtherRange = true } if err = rows.Err(); err != nil { - return hasFurtherRange, err + return result, err } - if hasFurtherRange { - apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues - return hasFurtherRange, nil + if result.HasFurtherRange { + result.Max = iterationRangeMaxValues + // Advance global cursor + apl.migrationContext.MigrationIterationRangeMinValues = result.Min + apl.migrationContext.MigrationIterationRangeMaxValues = result.Max + return result, nil } } apl.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") - return hasFurtherRange, nil + return result, nil } // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) { startTime := time.Now() - chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + chunkSize = iterationRangeValues.Size query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, @@ -1083,21 +1121,21 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i apl.migrationContext.MappedSharedColumns.Names(), apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), - apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), - apl.migrationContext.GetIteration() == 0, + iterationRangeValues.Min.AbstractValues(), + iterationRangeValues.Max.AbstractValues(), + iterationRangeValues.IncludeMinValues, apl.migrationContext.IsTransactionalTable(), // TODO: Don't hardcode this strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } - sqlResult, err := func() (gosql.Result, error) { + sqlResult, sqlWarnings, err := func() (gosql.Result, []string, error) { tx, err := apl.db.Begin() if err != nil { - return nil, err + return nil, nil, err } defer tx.Rollback() @@ -1105,30 +1143,30 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) if _, err := tx.Exec(sessionQuery); err != nil { - return nil, err + return nil, nil, err } result, err := tx.Exec(query, explodedArgs...) if err != nil { - return nil, err + return nil, nil, err } + var collectedWarnings []string if apl.migrationContext.PanicOnWarnings { rows, err := tx.Query("SHOW WARNINGS") if err != nil { - return nil, err + return nil, nil, err } defer rows.Close() if err = rows.Err(); err != nil { - return nil, err + return nil, nil, err } // Compile regex once before loop to avoid performance penalty and handle errors properly migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() if err != nil { - return nil, err + return nil, nil, err } - var sqlWarnings []string for rows.Next() { var level, message string var code int @@ -1139,29 +1177,32 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { continue } - sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + if err := rows.Err(); err != nil { + return nil, nil, err } - apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings } if err := tx.Commit(); err != nil { - return nil, err + return nil, nil, err } - return result, nil + return result, collectedWarnings, nil }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) + warnings = sqlWarnings apl.migrationContext.Log.Debugf( "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", - apl.migrationContext.MigrationIterationRangeMinValues, - apl.migrationContext.MigrationIterationRangeMaxValues, + iterationRangeValues.Min, + iterationRangeValues.Max, apl.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + return chunkSize, rowsAffected, duration, warnings, nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 85a5a01d3..68e4460d7 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -683,17 +683,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) suite.Require().NoError(err) suite.Require().Equal(int64(0), rowsAffected) // Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly - suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings) + suite.Require().Empty(sqlWarnings) // Check that the row was inserted rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName()) @@ -763,17 +763,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) suite.Equal(int64(1), rowsAffected) suite.Require().NoError(err) // Verify the warning was recorded and will cause the migrator to panic - suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings) - suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1") + suite.Require().NotEmpty(sqlWarnings) + suite.Require().Contains(sqlWarnings[0], "Warning: Data truncated for column 'name' at row 1") } func (suite *ApplierTestSuite) TestWriteCheckpoint() { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bec13e594..29664cd4c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -20,6 +20,8 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" + + "golang.org/x/sync/errgroup" ) var ( @@ -1565,7 +1567,8 @@ func (mgtr *Migrator) initiateApplier() error { } // iterateChunks iterates the existing table rows, and generates a copy task of -// a chunk of rows onto the ghost table. +// a chunk of rows onto the ghost table. Supports concurrent chunk copying via +// --chunk-concurrent-size. func (mgtr *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.rowCopyComplete, err) @@ -1592,59 +1595,113 @@ func (mgtr *Migrator) iterateChunks() error { return nil } copyRowsFunc := func() error { - mgtr.migrationContext.SetNextIterationRangeMinValues() - // Copy task: - applyCopyRowsFunc := func() error { - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done. - // There's another such check down the line - return nil - } + concurrentSize := atomic.LoadInt64(&mgtr.migrationContext.ChunkConcurrentSize) + if concurrentSize < 1 { + concurrentSize = 1 + } - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() - if err != nil { - return err // wrapping call will retry - } - if !hasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) - } - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - // No need for more writes. - // This is the de-facto place where we avoid writing in the event of completed cut-over. - // There could _still_ be a race condition, but that's as close as we can get. - // What about the race condition? Well, there's actually no data integrity issue. - // when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied. - // But some are still then collected at the binary log, and these are the ones we're trying to - // not apply here. If the race condition wins over us, then we just attempt to apply onto the - // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. - return nil - } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() - if err != nil { - return err // wrapping call will retry - } + g, gctx := errgroup.WithContext(mgtr.migrationContext.GetContext()) + g.SetLimit(int(concurrentSize)) - if mgtr.migrationContext.PanicOnWarnings { - if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { - for _, warning := range mgtr.migrationContext.MigrationLastInsertSQLWarnings { - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) - } - joinedWarnings := strings.Join(mgtr.migrationContext.MigrationLastInsertSQLWarnings, "; ") - return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) + for i := int64(0); i < concurrentSize; i++ { + g.Go(func() error { + if gctx.Err() != nil { + return gctx.Err() + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil } - } - atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) - return nil + if concurrentSize == 1 { + // Single-threaded path: matches master behavior exactly. + // Min is fixed before retry loop; range calc + insert are retried together. + // This allows hook-based chunk size reduction to take effect on retry. + mgtr.migrationContext.SetNextIterationRangeMinValues() + + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } + + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(false) + if err != nil { + return err + } + if !iterationRangeValues.HasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return nil + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) + if err != nil { + return err + } + + if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + joinedWarnings := strings.Join(sqlWarnings, "; ") + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) + } + + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil + } + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + return err + } + } else { + // Concurrent path: range calculation is serialized under mutex upfront. + // Each goroutine gets its own range; retries apply to the INSERT only. + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(true) + if err != nil { + return err + } + if !iterationRangeValues.HasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return nil + } + + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) + if err != nil { + return err + } + + if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + joinedWarnings := strings.Join(sqlWarnings, "; ") + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) + } + + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil + } + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + return err + } + } + return nil + }) } - if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { + + if err := g.Wait(); err != nil { return terminateRowIteration(err) } - // record last successfully copied range + // record last successfully copied range (before checking termination flag, + // so the final batch's range is captured for resume) mgtr.applier.LastIterationRangeMutex.Lock() if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() @@ -1652,6 +1709,10 @@ func (mgtr *Migrator) iterateChunks() error { } mgtr.applier.LastIterationRangeMutex.Unlock() + if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return terminateRowIteration(nil) + } + return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() From e989096ff3c721654aeb38cf2251b092edce2e0a Mon Sep 17 00:00:00 2001 From: dnovitski <54758025+dnovitski@users.noreply.github.com> Date: Sun, 14 Jun 2026 00:42:21 +0200 Subject: [PATCH 2/2] Speed up parallel row-copy: overlap range calc, drop per-batch barrier, cut per-chunk round-trips The --chunk-concurrent-size parallel row-copy only ran the INSERTs in parallel; the boundary calculation and the per-chunk transaction overhead serialized work and capped the achievable speedup well below the hardware's parallel-insert ceiling. This addresses three of those caps. Prefetch range producer (overlap serialized boundary calc with INSERTs): - A single dedicated producer goroutine is the sole caller of CalculateNextIterationRangeEndValues and streams pre-computed ranges into a buffered channel, so boundary scans now overlap the parallel INSERTs of earlier work instead of stalling between batches. - Split iterateChunks into iterateChunksSingle (unchanged single-threaded semantics) and iterateChunksConcurrent. - Size the applier pool for concurrentSize + producer + headroom. #1 Per-chunk round-trips (applier.go): - ApplyIterationInsertQuery sent BEGIN / SET SESSION / INSERT / COMMIT as four round-trips per chunk. It now sends "SET SESSION ...; INSERT ..." as a single autocommit, multi-statement round-trip on one pinned connection. The applier pool already enables multiStatements + interpolateParams + autocommit; RowsAffected() reports the INSERT (last statement), and the optional SHOW WARNINGS runs on the same pinned connection. 4 round-trips -> 1. #2 Persistent worker pool (migrator.go): - Replace the per-batch errgroup+g.Wait barrier (which stalled N workers on the slowest chunk every N chunks) with continuous dispatch to an errgroup bounded by SetLimit(concurrentSize) for a 200ms time quantum. Workers stay saturated; the only barrier is at the quantum boundary. The time bound keeps executeWriteFuncs returning to apply binlog events and re-check throttling, preserving row-copy/event mutual exclusion. Checkpoints record the last contiguous completed range (not the producer's prefetched cursor), so resume restarts from fully-copied data. Benchmarked on MySQL 8.0.46 (innodb_autoinc_lock_mode=2), 2.1M rows: copy time vs the prior parallel impl improved up to 32% (chunk=200, conc=4: 22s->15s; chunk=1000, conc=8: 8s->6s). Data integrity verified by row count + checksum. Co-Authored-By: Claude Opus 4.8 (1M context) --- doc/command-line-flags.md | 4 +- go/logic/applier.go | 105 +++++++------ go/logic/applier_test.go | 4 +- go/logic/migrator.go | 308 ++++++++++++++++++++++++++------------ 4 files changed, 281 insertions(+), 140 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 2f7c5032a..b91929abb 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -84,7 +84,9 @@ See also: [`resuming-migrations`](resume.md) When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table. -Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies. +Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies. A single dedicated producer goroutine streams these pre-calculated ranges to a pool of copy workers that run continuously (rather than in fixed barrier-synchronized batches), so the serialized boundary calculation overlaps with the parallel `INSERT`s and a slow chunk does not stall the others. Each chunk also applies its session variables and `INSERT` in a single autocommit round-trip, avoiding the per-chunk `BEGIN`/`SET SESSION`/`COMMIT` overhead. The applier connection pool is sized to `chunk-concurrent-size + 1 (producer) + headroom` automatically. + +For the speedup to materialize, MySQL should allow concurrent inserts to scale: on MySQL 8.0+ the default `innodb_autoinc_lock_mode = 2` (interleaved) is required for tables with an `AUTO_INCREMENT` column — under mode 0/1 an `INSERT ... SELECT` holds a table-level AUTO-INC lock that serializes concurrent chunks. Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag. diff --git a/go/logic/applier.go b/go/logic/applier.go index 1912f38da..d080414b3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -124,9 +124,14 @@ func (apl *Applier) InitDBConnections() (err error) { return err } concurrentSize := atomic.LoadInt64(&apl.migrationContext.ChunkConcurrentSize) - if concurrentSize > int64(mysql.MaxDBPoolConnections) { - apl.db.SetMaxOpenConns(int(concurrentSize) + mysql.MaxDBPoolConnections) - apl.db.SetMaxIdleConns(int(concurrentSize) + mysql.MaxDBPoolConnections) + if concurrentSize > 1 { + // Size the pool for concurrentSize parallel chunk-INSERTs plus the dedicated + // range-producer connection, with MaxDBPoolConnections of additional headroom + // for other applier queries. Without this, small concurrency values (2, 3) + // would contend with the producer for a connection and serialize. + poolSize := int(concurrentSize) + 1 + mysql.MaxDBPoolConnections + apl.db.SetMaxOpenConns(poolSize) + apl.db.SetMaxIdleConns(poolSize) } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil { @@ -1109,7 +1114,14 @@ func (apl *Applier) CalculateNextIterationRangeEndValues(advanceCursor bool) (va // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) { +// +// The session variables (time_zone, sql_mode) and the chunk INSERT are sent as a single +// multi-statement, autocommit round-trip on one pinned connection. The applier pool sets +// multiStatements + interpolateParams + autocommit, so this avoids the extra +// BEGIN / SET SESSION / COMMIT round-trips an explicit transaction would add to every +// chunk — the dominant per-chunk overhead at small chunk sizes. `RowsAffected()` reports +// the last statement's count (the INSERT), so the returned row count stays correct. +func (apl *Applier) ApplyIterationInsertQuery(ctx context.Context, iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) { startTime := time.Now() chunkSize = iterationRangeValues.Size @@ -1132,60 +1144,30 @@ func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.Iterati return chunkSize, rowsAffected, duration, nil, err } + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`, + apl.migrationContext.ApplierTimeZone, apl.generateSqlModeQuery()) + combinedQuery := sessionQuery + "; " + query + sqlResult, sqlWarnings, err := func() (gosql.Result, []string, error) { - tx, err := apl.db.Begin() + // Pin a single connection so the optional SHOW WARNINGS observes this INSERT's + // warnings (and not another pooled query's). + conn, err := apl.db.Conn(ctx) if err != nil { return nil, nil, err } - defer tx.Rollback() - - sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone) - sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) + defer conn.Close() - if _, err := tx.Exec(sessionQuery); err != nil { - return nil, nil, err - } - result, err := tx.Exec(query, explodedArgs...) + result, err := conn.ExecContext(ctx, combinedQuery, explodedArgs...) if err != nil { return nil, nil, err } var collectedWarnings []string if apl.migrationContext.PanicOnWarnings { - rows, err := tx.Query("SHOW WARNINGS") - if err != nil { - return nil, nil, err - } - defer rows.Close() - if err = rows.Err(); err != nil { - return nil, nil, err - } - - // Compile regex once before loop to avoid performance penalty and handle errors properly - migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() + collectedWarnings, err = apl.collectChunkInsertWarnings(ctx, conn) if err != nil { return nil, nil, err } - - for rows.Next() { - var level, message string - var code int - if err := rows.Scan(&level, &code, &message); err != nil { - apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") - continue - } - if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { - continue - } - collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) - } - if err := rows.Err(); err != nil { - return nil, nil, err - } - } - - if err := tx.Commit(); err != nil { - return nil, nil, err } return result, collectedWarnings, nil }() @@ -1205,6 +1187,41 @@ func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.Iterati return chunkSize, rowsAffected, duration, warnings, nil } +// collectChunkInsertWarnings runs SHOW WARNINGS on the given (pinned) connection right +// after a chunk INSERT and returns the warnings, skipping the benign duplicate-key +// warnings that INSERT IGNORE produces on the migration unique key. +func (apl *Applier) collectChunkInsertWarnings(ctx context.Context, conn *gosql.Conn) ([]string, error) { + rows, err := conn.QueryContext(ctx, "SHOW WARNINGS") + if err != nil { + return nil, err + } + defer rows.Close() + + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() + if err != nil { + return nil, err + } + + var collectedWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + continue + } + collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + if err := rows.Err(); err != nil { + return nil, err + } + return collectedWarnings, nil +} + // LockOriginalTable places a write lock on the original table func (apl *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 68e4460d7..7df6523f4 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -688,7 +688,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc suite.Require().NoError(err) suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(ctx, iterationRange) suite.Require().NoError(err) suite.Require().Equal(int64(0), rowsAffected) @@ -767,7 +767,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().NoError(err) suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(ctx, iterationRange) suite.Equal(int64(1), rowsAffected) suite.Require().NoError(err) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 29664cd4c..dfd59ec15 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -31,6 +31,12 @@ var ( checkpointTimeout = 2 * time.Second ) +// rowCopyQuantumDuration bounds how long the concurrent row-copy worker pool keeps +// dispatching chunks before yielding control back to executeWriteFuncs (which then +// applies any pending binlog events and re-checks throttling). It trades a small +// increase in worst-case event-apply latency for far fewer per-batch barriers. +const rowCopyQuantumDuration = 200 * time.Millisecond + type ChangelogState string const ( @@ -1583,116 +1589,211 @@ func (mgtr *Migrator) iterateChunks() error { return terminateRowIteration(nil) } + concurrentSize := atomic.LoadInt64(&mgtr.migrationContext.ChunkConcurrentSize) + if concurrentSize < 1 { + concurrentSize = 1 + } + + if concurrentSize == 1 { + return mgtr.iterateChunksSingle(terminateRowIteration) + } + return mgtr.iterateChunksConcurrent(concurrentSize, terminateRowIteration) +} + +// iterateChunksSingle is the single-threaded row-copy loop. It matches master +// behavior exactly: the next-iteration range is (re)calculated inside the retry +// loop so that hook-based chunk-size reduction takes effect on retry. +func (mgtr *Migrator) iterateChunksSingle(terminateRowIteration func(error) error) error { + ctx := mgtr.migrationContext.GetContext() var hasNoFurtherRangeFlag int64 - // Iterate per chunk: for { if err := mgtr.checkAbort(); err != nil { return terminateRowIteration(err) } if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done - // There's another such check down the line return nil } copyRowsFunc := func() error { - concurrentSize := atomic.LoadInt64(&mgtr.migrationContext.ChunkConcurrentSize) - if concurrentSize < 1 { - concurrentSize = 1 + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil } + // Min is fixed before retry loop; range calc + insert are retried together. + mgtr.migrationContext.SetNextIterationRangeMinValues() - g, gctx := errgroup.WithContext(mgtr.migrationContext.GetContext()) - g.SetLimit(int(concurrentSize)) + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } - for i := int64(0); i < concurrentSize; i++ { - g.Go(func() error { - if gctx.Err() != nil { - return gctx.Err() - } - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - return nil - } + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(false) + if err != nil { + return err + } + if !iterationRangeValues.HasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return nil + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } - if concurrentSize == 1 { - // Single-threaded path: matches master behavior exactly. - // Min is fixed before retry loop; range calc + insert are retried together. - // This allows hook-based chunk size reduction to take effect on retry. - mgtr.migrationContext.SetNextIterationRangeMinValues() + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(ctx, iterationRangeValues) + if err != nil { + return err + } + if err := mgtr.checkInsertWarnings(sqlWarnings); err != nil { + return err + } - applyCopyRowsFunc := func() error { - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - return nil - } + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) - iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(false) - if err != nil { - return err - } - if !iterationRangeValues.HasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return nil - } - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - return nil - } + mgtr.recordLastIterationRange(iterationRangeValues) + return nil + } + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + return terminateRowIteration(err) + } + if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return terminateRowIteration(nil) + } + return nil + } + if err := base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.copyRowsQueue, copyRowsFunc); err != nil { + if abortErr := mgtr.checkAbort(); abortErr != nil { + return terminateRowIteration(abortErr) + } + return terminateRowIteration(err) + } + } +} - _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) - if err != nil { - return err - } +// iterateChunksConcurrent copies row chunks using up to `concurrentSize` parallel +// INSERTs per batch. +// +// The dominant cost that previously capped concurrency was not the INSERTs but +// the per-chunk boundary calculation (CalculateNextIterationRangeEndValues): it +// runs a serialized, indexed scan of the source under a global mutex, and in the +// original implementation a batch's boundary calculations could not overlap with +// any INSERT — only the INSERTs ran in parallel, so the serial boundary scans +// (proportional to the whole table) showed up as pure overhead between batches. +// +// To remove that stall we run a single, dedicated producer goroutine that streams +// pre-calculated ranges into a buffered channel. The producer is the sole caller +// of CalculateNextIterationRangeEndValues, so cursor advancement stays correct and +// serialized, but it now runs *concurrently with* the INSERTs of earlier batches: +// by the time a batch is dequeued its ranges are already computed, so the worker +// goroutines never wait on boundary calculation. +func (mgtr *Migrator) iterateChunksConcurrent(concurrentSize int64, terminateRowIteration func(error) error) error { + ctx := mgtr.migrationContext.GetContext() - if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { - for _, warning := range sqlWarnings { - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) - } - joinedWarnings := strings.Join(sqlWarnings, "; ") - return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) - } + // Buffer enough ranges so the producer can run a batch (or two) ahead of the + // consumers without blocking. + rangesChannel := make(chan *base.IterationRangeValues, 2*concurrentSize) + producerErrChannel := make(chan error, 1) - atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) - return nil - } - if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck - return err - } - } else { - // Concurrent path: range calculation is serialized under mutex upfront. - // Each goroutine gets its own range; retries apply to the INSERT only. - iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(true) - if err != nil { - return err - } - if !iterationRangeValues.HasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return nil - } + // Range producer: serialized boundary calculation, decoupled from the INSERTs. + go func() { + defer close(rangesChannel) + for { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return + } + if err := mgtr.checkAbort(); err != nil { + return + } + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(true) + if err != nil { + producerErrChannel <- err + return + } + if !iterationRangeValues.HasFurtherRange { + return + } + if err := base.SendWithContext(ctx, rangesChannel, iterationRangeValues); err != nil { + return + } + } + }() - applyCopyRowsFunc := func() error { - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - return nil - } - _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) - if err != nil { - return err - } + var hasNoFurtherRangeFlag int64 + for { + if err := mgtr.checkAbort(); err != nil { + return terminateRowIteration(err) + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } + copyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } - if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { - for _, warning := range sqlWarnings { - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) - } - joinedWarnings := strings.Join(sqlWarnings, "; ") - return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(int(concurrentSize)) + + // Continuously dispatch pre-calculated ranges to up to concurrentSize + // workers for one time quantum. g.Go blocks while concurrentSize inserts + // are in flight, so workers stay saturated and a slow chunk no longer + // stalls the others behind a per-batch barrier; the only barrier is + // g.Wait() at the quantum boundary. Bounding the quantum by time keeps the + // single executeWriteFuncs goroutine returning to apply binlog events. + quantum := time.NewTimer(rowCopyQuantumDuration) + defer quantum.Stop() + + var lastDispatched *base.IterationRangeValues + dispatching := true + for dispatching { + var iterationRangeValues *base.IterationRangeValues + select { + case rv, ok := <-rangesChannel: + if !ok { + // Producer finished: table exhausted, or it hit an error. + select { + case err := <-producerErrChannel: + if err != nil { + _ = g.Wait() + return terminateRowIteration(err) } + default: + } + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + dispatching = false + continue + } + iterationRangeValues = rv + case <-quantum.C: + dispatching = false + continue + case <-gctx.Done(): + // A worker failed (or the migration is aborting); stop dispatching + // and let g.Wait surface the underlying error. + dispatching = false + continue + } - atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + lastDispatched = iterationRangeValues + rv := iterationRangeValues + g.Go(func() error { + if gctx.Err() != nil { + return gctx.Err() + } + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { return nil } - if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(ctx, rv) + if err != nil { return err } + if err := mgtr.checkInsertWarnings(sqlWarnings); err != nil { + return err + } + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil } - return nil + return mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc) //nolint:contextcheck }) } @@ -1700,25 +1801,22 @@ func (mgtr *Migrator) iterateChunks() error { return terminateRowIteration(err) } - // record last successfully copied range (before checking termination flag, - // so the final batch's range is captured for resume) - mgtr.applier.LastIterationRangeMutex.Lock() - if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { - mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() - mgtr.applier.LastIterationRangeMaxValues = mgtr.migrationContext.MigrationIterationRangeMaxValues.Clone() + // Every range dispatched this quantum has completed. Ranges are pulled in + // contiguous, increasing order, so the last one dispatched is the highest + // fully-copied boundary — record it so a checkpoint resumes from copied + // data rather than from the producer's prefetched cursor. + if lastDispatched != nil { + mgtr.recordLastIterationRange(lastDispatched) } - mgtr.applier.LastIterationRangeMutex.Unlock() if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { return terminateRowIteration(nil) } - return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() // Use helper to prevent deadlock if executeWriteFuncs exits - if err := base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.copyRowsQueue, copyRowsFunc); err != nil { - // Context cancelled, check for abort and exit + if err := base.SendWithContext(ctx, mgtr.copyRowsQueue, copyRowsFunc); err != nil { if abortErr := mgtr.checkAbort(); abortErr != nil { return terminateRowIteration(abortErr) } @@ -1727,6 +1825,30 @@ func (mgtr *Migrator) iterateChunks() error { } } +// checkInsertWarnings turns SQL warnings collected from a chunk INSERT into a fatal +// error when --panic-on-warnings is set. +func (mgtr *Migrator) checkInsertWarnings(sqlWarnings []string) error { + if !mgtr.migrationContext.PanicOnWarnings || len(sqlWarnings) == 0 { + return nil + } + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", strings.Join(sqlWarnings, "; ")) +} + +// recordLastIterationRange stores the most recently *completed* chunk range, used +// by Checkpoint to resume from a fully-copied boundary. +func (mgtr *Migrator) recordLastIterationRange(iterationRangeValues *base.IterationRangeValues) { + if iterationRangeValues == nil || iterationRangeValues.Min == nil || iterationRangeValues.Max == nil { + return + } + mgtr.applier.LastIterationRangeMutex.Lock() + mgtr.applier.LastIterationRangeMinValues = iterationRangeValues.Min.Clone() + mgtr.applier.LastIterationRangeMaxValues = iterationRangeValues.Max.Clone() + mgtr.applier.LastIterationRangeMutex.Unlock() +} + func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { if eventStruct.writeFunc != nil {