diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index d017c7c25..a231a8469 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -100,6 +100,30 @@ See also: [`resuming-migrations`](resume.md) Controls how many rows `gh-ost` copies in each row-copy iteration. The default is `1000`; allowed range is `10` to `100000`. +### parallel-copy + +`--parallel-copy` copies the table rows using multiple workers running concurrently. Binlog event application remains single-threaded, so event ordering is preserved. This is experimental and requires `--checkpoint`. Default is disabled. + +Parallel copy requires checkpoints because parallel INSERTs can go out of sequential order and leave gaps in target table. gh-ost checkpoints are consistent and do not have gaps, so resuming from a checkpoint will make the target table consistent again. + +Consistency is ensured by using +-INSERT IGNORE/SELECT FOR SHARE for row copy (current gh-ost behavior) +-DELETE/INSERT by DML applier when unique key change is detected (current gh-ost behavior) +-advanceFrontier call to ensure that checkpoints have no gaps (used by parallel-copy only) +Without all of those, consistency of parallel copy cannot be guaranteed (for example, row copy could insert row back after it was deleted). In existing serial mode, the problem is much easier because DML applier is never parallel with row copy. + +### parallel-copy-workers + +`--parallel-copy-workers` sets the number of parallel row-copy workers used when `--parallel-copy` is enabled. Has no effect otherwise. Default is 4; maximum is 64. The applier connection pool is sized to `parallel-copy-workers + 4`. Throughput gains tend to plateau past ~16 workers — so prefer the smallest value that meets your needs. + +### parallel-copy-max-heartbeatlag-millis + +`--parallel-copy-max-heartbeatlag-millis` sets a heartbeat-lag threshold (in milliseconds) used to throttle row-copy workers when `--parallel-copy` is enabled. Requires `--parallel-copy`. Default is `10000` (10 seconds). + +gh-ost's heartbeat lag is used as a lightweight proxy for binlog applier lag. When heartbeat lag is below this threshold, all workers copy rows at full concurrency. When it exceeds the threshold, workers pause and yield, giving the binlog applier time to catch up. This preserves the same priority model as serial row-copy, where binlog event application is never starved by row-copy work. If the applier is consistently lagging, workers will spend most of their time paused and throughput approaches the serial baseline; when the table's DML rate is low and the applier is not lagging, all workers run freely and deliver the full benefit of parallelism. + +HeartbeatLag is also the primary signal used by the cut-over gate: gh-ost will not attempt the table swap until HeartbeatLag drops below both `--max-lag-millis` and `--cut-over-lock-timeout-seconds`. Using it here as the parallel-copy throttle signal is therefore consistent with the most safety-critical decision point in the migration. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: diff --git a/go/base/context.go b/go/base/context.go index 4d0c3b09d..0c4fd183d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -55,6 +55,19 @@ const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 ETAUnknown = math.MinInt64 + + // MaxCopyWorkers is the hard upper bound on --parallel-copy-workers. The applier + // connection pool is sized to CopyWorkers + ParallelCopyConnHeadroom, so this + // also bounds connection usage and prevents exhausting the server's + // max_connections. + MaxCopyWorkers = 64 + // RecommendedMaxCopyWorkers is a soft threshold above which we warn: the + // throughput benefit of more workers plateaus while load keeps growing. + RecommendedMaxCopyWorkers = 16 + // ParallelCopyConnHeadroom is the number of connections added to the applier + // pool on top of CopyWorkers, to cover the DML event applier goroutine + // the status logger, the heartbeat goroutine, and the checkpoint loop. + ParallelCopyConnHeadroom = 4 ) var ( @@ -162,6 +175,10 @@ type MigrationContext struct { Checkpoint bool CheckpointIntervalSeconds int64 + ParallelCopy bool + ParallelCopyWorkers int64 + ParallelCopyMaxHeartbeatLagThresholdMillies int64 + DropServeSocket bool ServeSocketFile string ServeTCPPort int64 @@ -275,6 +292,20 @@ type MigrationContext struct { IsOpenMetadataLockInstruments bool Log Logger + + migrationLastInsertSQLWarningsMu sync.Mutex // unexported — only accessed via methods below +} + +func (mctx *MigrationContext) SetLastInsertSQLWarnings(warnings []string) { + mctx.migrationLastInsertSQLWarningsMu.Lock() + defer mctx.migrationLastInsertSQLWarningsMu.Unlock() + mctx.MigrationLastInsertSQLWarnings = warnings +} + +func (mctx *MigrationContext) GetLastInsertSQLWarnings() []string { + mctx.migrationLastInsertSQLWarningsMu.Lock() + defer mctx.migrationLastInsertSQLWarningsMu.Unlock() + return mctx.MigrationLastInsertSQLWarnings } type Logger interface { diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d77046231..8322f654e 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -164,6 +164,9 @@ func main() { flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections") flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints") flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints") + flag.BoolVar(&migrationContext.ParallelCopy, "parallel-copy", false, "Copy table rows using multiple parallel workers. Experimental; requires --checkpoint") + flag.Int64Var(&migrationContext.ParallelCopyWorkers, "parallel-copy-workers", 4, "Number of parallel row-copy workers when --parallel-copy is enabled (max 64; gains tend to plateau past ~16)") + flag.Int64Var(&migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies, "parallel-copy-max-heartbeatlag-millis", 10000, "When >0 and --parallel-copy is enabled, row-copy workers pause when gh-ost's own binlog applier lag exceeds this threshold (ms), giving the binlog applier priority. 0 disables this check.") flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint") flag.BoolVar(&migrationContext.Revert, "revert", false, "Attempt to revert completed migration") flag.StringVar(&migrationContext.OldTableName, "old-table", "", "The name of the old table when using --revert, e.g. '_mytable_del'") @@ -336,6 +339,23 @@ func main() { if migrationContext.CheckpointIntervalSeconds < 10 { migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10") } + if migrationContext.ParallelCopy { + if !migrationContext.Checkpoint { + migrationContext.Log.Fatalf("--parallel-copy requires --checkpoint") + } + if migrationContext.ParallelCopyWorkers < 1 { + migrationContext.Log.Fatalf("--parallel-copy-workers should be >=1") + } + if migrationContext.ParallelCopyWorkers > base.MaxCopyWorkers { + migrationContext.Log.Fatalf("--parallel-copy-workers should be <=%d", base.MaxCopyWorkers) + } + if migrationContext.ParallelCopyWorkers > base.RecommendedMaxCopyWorkers { + migrationContext.Log.Warningf("--parallel-copy-workers=%d is high: throughput gains tend to plateau while load on the server grows. Consider <=%d.", migrationContext.ParallelCopyWorkers, base.RecommendedMaxCopyWorkers) + } + if migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies > 0 && migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies < 100 { + migrationContext.Log.Fatalf("--parallel-copy-max-heartbeatlag-millis should be >=100 or 0 (disabled)") + } + } if migrationContext.CountTableRows && migrationContext.PanicOnWarnings { migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection") } diff --git a/go/logic/applier.go b/go/logic/applier.go index 50ad45ce2..ba7934a04 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -124,6 +124,11 @@ func (apl *Applier) InitDBConnections() (err error) { if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil { return err } + if apl.migrationContext.ParallelCopy { + poolSize := int(apl.migrationContext.ParallelCopyWorkers) + base.ParallelCopyConnHeadroom + apl.db.SetMaxOpenConns(poolSize) + apl.db.SetMaxIdleConns(poolSize) + } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil { return err @@ -1075,12 +1080,42 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, nil } +// iterationRange carries an explicit chunk range for ApplyIterationInsertQuery. It is +// passed only by the --parallel-copy path; serial callers pass nothing and the shared +// migrationContext.MigrationIterationRange* values are used instead. +type iterationRange struct { + min *sql.ColumnValues + max *sql.ColumnValues + includeRangeStart bool +} + // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +// +// Serial callers invoke it with no argument: the chunk range comes from the shared +// migrationContext.MigrationIterationRange* cursor. +// The --parallel-copy path passes an explicit range (captured under the boundary-scan +// mutex) so concurrent workers don't race on the shared cursor +func (apl *Applier) ApplyIterationInsertQuery(parallelRange ...*iterationRange) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { startTime := time.Now() chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + parallel := len(parallelRange) > 0 && parallelRange[0] != nil + var rangeMin, rangeMax *sql.ColumnValues + var includeRangeStartValues bool + if parallel { + // Use the explicitly-passed range. The parallel INSERT runs unlocked and + // concurrently, so it must NOT read the shared MigrationIterationRange* cursor: + // another worker is mutating it under parallelSelectMutex. + rangeMin = parallelRange[0].min + rangeMax = parallelRange[0].max + includeRangeStartValues = parallelRange[0].includeRangeStart + } else { + rangeMin = apl.migrationContext.MigrationIterationRangeMinValues + rangeMax = apl.migrationContext.MigrationIterationRangeMaxValues + includeRangeStartValues = apl.migrationContext.GetIteration() == 0 + } + query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, @@ -1089,9 +1124,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i apl.migrationContext.MappedSharedColumns.Names(), apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), - apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), - apl.migrationContext.GetIteration() == 0, + rangeMin.AbstractValues(), + rangeMax.AbstractValues(), + includeRangeStartValues, apl.migrationContext.IsTransactionalTable(), // TODO: Don't hardcode this strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."), @@ -1149,7 +1184,13 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i } sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) } - apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings + if parallel { + if len(sqlWarnings) > 0 { + apl.migrationContext.SetLastInsertSQLWarnings(sqlWarnings) + } + } else { + apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings + } } if err := tx.Commit(); err != nil { @@ -1165,8 +1206,8 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i duration = time.Since(startTime) apl.migrationContext.Log.Debugf( "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", - apl.migrationContext.MigrationIterationRangeMinValues, - apl.migrationContext.MigrationIterationRangeMaxValues, + rangeMin, + rangeMax, apl.migrationContext.GetIteration(), chunkSize) return chunkSize, rowsAffected, duration, nil diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 017a0c302..7267c9ec2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -13,6 +13,7 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "time" @@ -101,6 +102,18 @@ type Migrator struct { applyEventsQueue chan *applyEventStruct finishedMigrating int64 + + // Parallel row-copy state (used only when --parallel-copy is enabled). + // parallelCopySelectMutex serializes the boundary-scan SELECTs so chunk ranges are + // produced sequentially; the chunk INSERTs then run in parallel across workers. + // parallelCopyFrontierMutex guards parallelCopyPending/parallelCopyNextCommit, the + // iteration-keyed table that advances the checkpoint frontier only over a + // contiguous prefix of committed chunks (see advanceFrontier in parallel_copy.go). + parallelCopySelectMutex sync.Mutex + parallelCopyFrontierMutex sync.Mutex + parallelCopyPending map[int64]*rangeResult + parallelCopyNextCommit int64 + parallelCopyDispatchSeq int64 // monotone dispatch counter; separate from Iteration so Iteration tracks commits } func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { @@ -619,13 +632,26 @@ func (mgtr *Migrator) Migrate() (err error) { if err := mgtr.hooksExecutor.OnBeforeRowCopy(); err != nil { return err } - go func() { - if err := mgtr.executeWriteFuncs(); err != nil { - // Send error to PanicAbort to trigger abort - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) - } - }() - go mgtr.iterateChunks() + if mgtr.migrationContext.ParallelCopy { + // Row copy is produced by iterateChunks and consumed in parallel by workers (copyRowsParallel). + // Binlog DML is applied single-threaded by executeDMLWriteFuncs. + // it must not also pull copy tasks (the workers own copyRowsQueue), + // hence executeDMLWriteFuncs rather than executeWriteFuncs here. + go func() { + if err := mgtr.executeDMLWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + }() + go mgtr.copyRowsParallel() + } else { + go func() { + if err := mgtr.executeWriteFuncs(); err != nil { + // Send error to PanicAbort to trigger abort + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + }() + go mgtr.iterateChunks() + } mgtr.migrationContext.MarkRowCopyStartTime() go mgtr.initiateStatus() if mgtr.migrationContext.Checkpoint { @@ -1466,7 +1492,11 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, snap migrationProgressSn return } - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + throughputSuffix := "" + if mgtr.migrationContext.ParallelCopy { + throughputSuffix = fmt.Sprintf("; Throughput: %drows/sec", atomic.LoadInt64(&mgtr.migrationContext.EtaRowsPerSecond)) + } + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s%s", snap.totalRowsCopied, snap.rowsEstimate, snap.progressPct, snap.dmlApplied, snap.applyEventsBacklog, snap.applyEventsCapacity, @@ -1476,6 +1506,7 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, snap migrationProgressSn snap.heartbeatLagSeconds, snap.state, snap.eta, + throughputSuffix, ) mgtr.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", mgtr.migrationContext.GetIteration(), time.Now().Unix()), @@ -1650,8 +1681,17 @@ func (mgtr *Migrator) iterateChunks() error { // There's another such check down the line return nil } + // --parallel-copy state, captured once per copyRowsFunc invocation under + // parallelSelectMutex so the chunk INSERT (which runs unlocked, in parallel) uses + // a stable range even as other workers advance the shared cursor, and so a retry + // re-inserts the same chunk rather than re-scanning a new range. + var parallelIteration int64 + var parallelRangeMin, parallelRangeMax *sql.ColumnValues + var parallelCaptured bool copyRowsFunc := func() error { - mgtr.migrationContext.SetNextIterationRangeMinValues() + if !mgtr.migrationContext.ParallelCopy { + mgtr.migrationContext.SetNextIterationRangeMinValues() + } // Copy task: applyCopyRowsFunc := func() error { if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { @@ -1660,14 +1700,28 @@ func (mgtr *Migrator) iterateChunks() error { return nil } - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() - if err != nil { - return err // wrapping call will retry - } - if !hasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) + if mgtr.migrationContext.ParallelCopy { + // Serialize the boundary scan and capture this chunk's range once. A + // retry of this func (e.g. a transient INSERT error) then reuses the + // captured range instead of re-scanning, so the contiguous frontier in + // advanceFrontier is never left with an abandoned iteration. + if !parallelCaptured { + var stopErr error + parallelIteration, parallelRangeMin, parallelRangeMax, parallelCaptured, stopErr = mgtr.captureParallelChunkRange(&hasNoFurtherRangeFlag, terminateRowIteration) + if !parallelCaptured { + return stopErr + } + } + } else { + // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever + hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() + if err != nil { + return err // wrapping call will retry + } + if !hasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return terminateRowIteration(nil) + } } if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { // No need for more writes. @@ -1680,41 +1734,69 @@ func (mgtr *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() + var rowsAffected int64 + var err error + if mgtr.migrationContext.ParallelCopy { + // Pass the captured range explicitly so the INSERT doesn't read the + // shared cursor; with --panic-on-warnings it returns warnings as an error. + _, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery( + &iterationRange{min: parallelRangeMin, max: parallelRangeMax, includeRangeStart: parallelIteration == 0}, + ) + } else { + _, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery() + } if err != nil { return err // wrapping call will retry } if mgtr.migrationContext.PanicOnWarnings { - if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { - for _, warning := range mgtr.migrationContext.MigrationLastInsertSQLWarnings { + var warnings []string + if mgtr.migrationContext.ParallelCopy { + warnings = mgtr.migrationContext.GetLastInsertSQLWarnings() + } else { + warnings = mgtr.migrationContext.MigrationLastInsertSQLWarnings + } + if len(warnings) > 0 { + for _, warning := range warnings { mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) } - joinedWarnings := strings.Join(mgtr.migrationContext.MigrationLastInsertSQLWarnings, "; ") + joinedWarnings := strings.Join(warnings, "; ") return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) } } - atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + if mgtr.migrationContext.ParallelCopy { + // Fold the chunk into global state only once every earlier chunk is + // written, so the checkpoint frontier never skips a gap. + mgtr.advanceFrontier(parallelIteration, parallelRangeMin, parallelRangeMax, rowsAffected) + } else { + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + } return nil } if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { return terminateRowIteration(err) } - // record last successfully copied range - mgtr.applier.LastIterationRangeMutex.Lock() - if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { - mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() - mgtr.applier.LastIterationRangeMaxValues = mgtr.migrationContext.MigrationIterationRangeMaxValues.Clone() + if !mgtr.migrationContext.ParallelCopy { + // record last successfully copied range (parallel records it in advanceFrontier) + mgtr.applier.LastIterationRangeMutex.Lock() + if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { + mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() + mgtr.applier.LastIterationRangeMaxValues = mgtr.migrationContext.MigrationIterationRangeMaxValues.Clone() + } + mgtr.applier.LastIterationRangeMutex.Unlock() } - mgtr.applier.LastIterationRangeMutex.Unlock() return nil } - // Enqueue copy operation; to be executed by executeWriteFuncs() - // Use helper to prevent deadlock if executeWriteFuncs exits + + // Enqueue copy operation. In serial mode it is consumed by executeWriteFuncs(); in + // --parallel-copy mode it is consumed by one of the CopyWorkers workers started in + // copyRowsParallel(). The closure itself serializes its boundary SELECT under + // parallelSelectMutex, so the only thing parallelized is the chunk INSERT. + // Use helper to prevent deadlock if the consumer exits. if err := base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.copyRowsQueue, copyRowsFunc); err != nil { // Context cancelled, check for abort and exit if abortErr := mgtr.checkAbort(); abortErr != nil { @@ -1725,6 +1807,40 @@ func (mgtr *Migrator) iterateChunks() error { } } +// captureParallelChunkRange acquires parallelSelectMutex, advances the shared scan +// cursor, and captures the next chunk's range. Returns captured=true with the range +// values on success. Returns captured=false with nil err when no further work is needed +// (row copy already complete or table exhausted; in the latter case terminateRowIteration +// is called to signal the channel). Returns captured=false with non-nil err when the +// range scan itself fails; the caller should propagate it to trigger a retry. +func (mgtr *Migrator) captureParallelChunkRange(hasNoFurtherRangeFlag *int64, terminateRowIteration func(error) error) (iteration int64, rangeMin, rangeMax *sql.ColumnValues, captured bool, err error) { + mgtr.parallelCopySelectMutex.Lock() + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(hasNoFurtherRangeFlag) == 1 { + mgtr.parallelCopySelectMutex.Unlock() + return 0, nil, nil, false, nil + } + mgtr.migrationContext.SetNextIterationRangeMinValues() + hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() + if err != nil { + mgtr.parallelCopySelectMutex.Unlock() + return 0, nil, nil, false, err + } + if !hasFurtherRange { + atomic.StoreInt64(hasNoFurtherRangeFlag, 1) + mgtr.parallelCopySelectMutex.Unlock() + return 0, nil, nil, false, terminateRowIteration(nil) + } + iteration = atomic.LoadInt64(&mgtr.parallelCopyDispatchSeq) + rangeMin = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() + rangeMax = mgtr.migrationContext.MigrationIterationRangeMaxValues.Clone() + // Advance the dispatch counter under the lock so the next worker + // scans the next chunk. Iteration is advanced at commit time in + // advanceFrontier so it stays consistent with TotalRowsCopied. + atomic.AddInt64(&mgtr.parallelCopyDispatchSeq, 1) + mgtr.parallelCopySelectMutex.Unlock() + return iteration, rangeMin, rangeMax, true, nil +} + func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { if eventStruct.writeFunc != nil { diff --git a/go/logic/parallel_copy.go b/go/logic/parallel_copy.go new file mode 100644 index 000000000..435f9946c --- /dev/null +++ b/go/logic/parallel_copy.go @@ -0,0 +1,181 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/sql" +) + +// rangeResult is the outcome of a single parallel chunk INSERT, held until its +// iteration can be committed contiguously (see advanceFrontier). +type rangeResult struct { + rangeMin *sql.ColumnValues + rangeMax *sql.ColumnValues + rowsAffected int64 +} + +// Blocks the calling goroutine until gh-ost's own binlog +// applier HeartbeatLag drops below ParallelCopyMaxHeartbeatLagThresholdMillies. It is a +// no-op when the threshold is zero (disabled), when no heartbeat has been seen yet, or +// when ctx is cancelled. +// +// HeartbeatLag (time since gh-ost last processed its own changelog heartbeat) is the +// right signal here: replica replication lag is already covered by the general throttle; +// this check specifically targets the case where the replica is fine but gh-ost's own +// binlog applier is falling behind due to pressure. +func (mgtr *Migrator) throttleOnHeartbeatLag(ctx context.Context) { + threshold := atomic.LoadInt64(&mgtr.migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies) + if threshold <= 0 { + return + } + for { + lastHeartbeat := mgtr.migrationContext.GetLastHeartbeatOnChangelogTime() + if lastHeartbeat.IsZero() { + return // no heartbeat received yet; don't block on stale zero value + } + if time.Since(lastHeartbeat) <= time.Duration(threshold)*time.Millisecond { + return + } + timer := time.NewTimer(250 * time.Millisecond) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + } +} + +// copyRowsParallel is the --parallel-copy consumer side. iterateChunks remains the single +// producer, enqueuing copy-task closures onto copyRowsQueue exactly as in serial mode; this +// function starts CopyWorkers consumer goroutines that pull those closures and run them +// concurrently. Each closure serializes its own boundary SELECT under parallelSelectMutex +// (so chunk ranges are still produced sequentially) and only the chunk INSERT runs in +// parallel; advanceFrontier then commits global state in contiguous iteration order so a +// crash/resume with checkpoints never leaves un-copied holes. +// +// Completion: the producer (iterateChunks) returns once the scan is exhausted; we then stop +// the workers, wait for every in-flight INSERT to finish, and signal clean completion to +// rowCopyComplete exactly once. A fatal error in any closure is signaled by the closure +// itself (it holds iterateChunks' terminateRowIteration), which aborts the context; in that +// case we do not signal a (false) clean completion. +func (mgtr *Migrator) copyRowsParallel() { + mgtr.resetParallelState(mgtr.migrationContext.GetIteration()) + + workers := int(mgtr.migrationContext.ParallelCopyWorkers) + if workers < 1 { + workers = 1 + } + mgtr.migrationContext.Log.Infof("Parallel row copy: starting %d workers", workers) + + ctx := mgtr.migrationContext.GetContext() + workersDone := make(chan struct{}) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case copyRowsFunc := <-mgtr.copyRowsQueue: + // Parallel-copy lag throttle: back off before the global throttle + // kicks in, giving the binlog DML applier priority over row copy. + mgtr.throttleOnHeartbeatLag(ctx) + // Throttle before issuing the chunk, mirroring executeWriteFuncs. + mgtr.throttler.throttle(nil) + copyRowsStartTime := time.Now() + // Errors are routed to rowCopyComplete inside the closure (via + // terminateRowIteration); the ctx.Done() case below then unwinds us. + if err := copyRowsFunc(); err != nil { + return // closure already signaled via terminateRowIteration; skip nice-ratio sleep + } + if niceRatio := mgtr.migrationContext.GetNiceRatio(); niceRatio > 0 { + copyRowsDuration := time.Since(copyRowsStartTime) + sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) + sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond + time.Sleep(sleepTime) + } + case <-workersDone: + return + case <-ctx.Done(): + return + } + } + }() + } + + // Run the producer to completion. It enqueues one closure per chunk onto copyRowsQueue + // (an unbuffered channel, so every closure is handed to a worker before the next is + // produced) and returns once the boundary scan is exhausted or the migration aborts. + _ = mgtr.iterateChunks() + + // Producer done: tell idle workers to stop, then wait for any in-flight INSERTs. + close(workersDone) + wg.Wait() + + if err := mgtr.checkAbort(); err != nil { + // The error was already signaled by the failing closure; don't signal completion. + return + } + + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 0 { + _ = base.SendWithContext(ctx, mgtr.rowCopyComplete, nil) + } +} + +// resetParallelState initializes the iteration-keyed pending table and the next-to-commit +// cursor for a parallel row-copy. +func (mgtr *Migrator) resetParallelState(firstIteration int64) { + mgtr.parallelCopyFrontierMutex.Lock() + defer mgtr.parallelCopyFrontierMutex.Unlock() + mgtr.parallelCopyPending = make(map[int64]*rangeResult) + mgtr.parallelCopyNextCommit = firstIteration + atomic.StoreInt64(&mgtr.parallelCopyDispatchSeq, firstIteration) +} + +// advanceFrontier records a just-completed chunk INSERT (keyed by its iteration) +// and then commits every chunk that is now contiguously complete, in iteration +// order. "Committing" a chunk means folding its rows into TotalRowsCopied and +// advancing the checkpoint frontier (LastIterationRange{Min,Max}Values) to that +// chunk's range. +// +// Because parallel workers finish out of order, a chunk that completes before its +// predecessors is parked in parallelPending and only committed once the gap below +// it is filled. This guarantees the persisted frontier never advances past an +// un-written chunk, so a crash/resume can re-copy forward from the frontier without +// leaving holes. +func (mgtr *Migrator) advanceFrontier(iteration int64, rangeMin, rangeMax *sql.ColumnValues, rowsAffected int64) { + mgtr.parallelCopyFrontierMutex.Lock() + defer mgtr.parallelCopyFrontierMutex.Unlock() + + mgtr.parallelCopyPending[iteration] = &rangeResult{ + rangeMin: rangeMin, + rangeMax: rangeMax, + rowsAffected: rowsAffected, + } + + for { + result, ok := mgtr.parallelCopyPending[mgtr.parallelCopyNextCommit] + if !ok { + break + } + delete(mgtr.parallelCopyPending, mgtr.parallelCopyNextCommit) + mgtr.parallelCopyNextCommit++ + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, result.rowsAffected) + + mgtr.applier.LastIterationRangeMutex.Lock() + mgtr.applier.LastIterationRangeMinValues = result.rangeMin + mgtr.applier.LastIterationRangeMaxValues = result.rangeMax + mgtr.applier.LastIterationRangeMutex.Unlock() + } +} diff --git a/go/logic/parallel_copy_test.go b/go/logic/parallel_copy_test.go new file mode 100644 index 000000000..c03d98e4a --- /dev/null +++ b/go/logic/parallel_copy_test.go @@ -0,0 +1,121 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/sql" +) + +// newFrontierTestMigrator builds a Migrator with just enough state to exercise +// advanceFrontier: a migration context (for TotalRowsCopied) and an applier (for the +// LastIterationRange* frontier + its mutex). No DB connection is needed. +func newFrontierTestMigrator(firstIteration int64) *Migrator { + migrationContext := base.NewMigrationContext() + migrationContext.Iteration = firstIteration + mgtr := &Migrator{ + migrationContext: migrationContext, + applier: &Applier{migrationContext: migrationContext}, + } + mgtr.resetParallelState(firstIteration) + return mgtr +} + +// distinctRange returns a fresh ColumnValues pointer so committed ranges can be told +// apart by identity in assertions. +func distinctRange() *sql.ColumnValues { + return sql.NewColumnValues(1) +} + +// TestFrontierInOrderCommit: chunks completing in iteration order each commit +// immediately, summing rows and advancing the frontier to the latest range. +func TestFrontierInOrderCommit(t *testing.T) { + mgtr := newFrontierTestMigrator(0) + r0, r1, r2 := distinctRange(), distinctRange(), distinctRange() + + mgtr.advanceFrontier(0, distinctRange(), r0, 10) + require.Equal(t, int64(10), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r0, mgtr.applier.LastIterationRangeMaxValues) + + mgtr.advanceFrontier(1, distinctRange(), r1, 20) + mgtr.advanceFrontier(2, distinctRange(), r2, 30) + + require.Equal(t, int64(60), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r2, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(3), mgtr.parallelCopyNextCommit) + require.Empty(t, mgtr.parallelCopyPending) +} + +// TestFrontierOutOfOrderHoldAndRelease: a later chunk that finishes before its +// predecessors is held until the gap below it is filled, then released in order. +func TestFrontierOutOfOrderHoldAndRelease(t *testing.T) { + mgtr := newFrontierTestMigrator(0) + r0, r1, r2 := distinctRange(), distinctRange(), distinctRange() + + // iteration 2 finishes first: must NOT advance the frontier or rows. + mgtr.advanceFrontier(2, distinctRange(), r2, 30) + require.Equal(t, int64(0), mgtr.migrationContext.GetTotalRowsCopied()) + require.Nil(t, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(0), mgtr.parallelCopyNextCommit) + require.Len(t, mgtr.parallelCopyPending, 1) + + // iteration 0 commits only itself (1 still missing). + mgtr.advanceFrontier(0, distinctRange(), r0, 10) + require.Equal(t, int64(10), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r0, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(1), mgtr.parallelCopyNextCommit) + + // iteration 1 fills the gap and releases 1 then 2. + mgtr.advanceFrontier(1, distinctRange(), r1, 20) + require.Equal(t, int64(60), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r2, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(3), mgtr.parallelCopyNextCommit) + require.Empty(t, mgtr.parallelCopyPending) +} + +// TestFrontierGapSafety: with a gap open, the persisted frontier must never reflect +// a chunk above the gap, even after several higher chunks complete. +func TestFrontierGapSafety(t *testing.T) { + mgtr := newFrontierTestMigrator(0) + r0 := distinctRange() + + mgtr.advanceFrontier(0, distinctRange(), r0, 10) // commits 0 + // 2,3,4 complete while 1 is still in flight: all held. + mgtr.advanceFrontier(2, distinctRange(), distinctRange(), 30) + mgtr.advanceFrontier(3, distinctRange(), distinctRange(), 40) + mgtr.advanceFrontier(4, distinctRange(), distinctRange(), 50) + + // Frontier and rows must still reflect only the contiguous prefix [0]. + require.Equal(t, int64(10), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r0, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(1), mgtr.parallelCopyNextCommit) + require.Len(t, mgtr.parallelCopyPending, 3) +} + +// TestFrontierResumeBase: when resuming, the next-commit cursor starts at the +// checkpoint iteration, and contiguous commit works from that base. +func TestFrontierResumeBase(t *testing.T) { + mgtr := newFrontierTestMigrator(100) + r100, r101, r102 := distinctRange(), distinctRange(), distinctRange() + + mgtr.advanceFrontier(100, distinctRange(), r100, 5) + require.Equal(t, int64(101), mgtr.parallelCopyNextCommit) + require.Same(t, r100, mgtr.applier.LastIterationRangeMaxValues) + + // 102 out of order is held; 101 then releases 101 and 102. + mgtr.advanceFrontier(102, distinctRange(), r102, 7) + require.Same(t, r100, mgtr.applier.LastIterationRangeMaxValues) + mgtr.advanceFrontier(101, distinctRange(), r101, 6) + + require.Equal(t, int64(18), mgtr.migrationContext.GetTotalRowsCopied()) + require.Same(t, r102, mgtr.applier.LastIterationRangeMaxValues) + require.Equal(t, int64(103), mgtr.parallelCopyNextCommit) + require.Empty(t, mgtr.parallelCopyPending) +} diff --git a/localtests/parallel-copy-compound-pk/create.sql b/localtests/parallel-copy-compound-pk/create.sql new file mode 100644 index 000000000..5f671ae00 --- /dev/null +++ b/localtests/parallel-copy-compound-pk/create.sql @@ -0,0 +1,38 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + v varchar(128), + updated tinyint unsigned default 0, + primary key(id, v) +) auto_increment=1; + +insert into gh_ost_test values + (null, 11, 'eleven', 0),(null, 13, 'thirteen', 0), + (null, 17, 'seventeen', 0),(null, 19, 'nineteen', 0), + (null, 23, 'twenty three', 0),(null, 29, 'twenty nine', 0), + (null, 31, 'thirty one', 0),(null, 37, 'thirty seven', 0); + +-- Multiply rows so multiple chunks span the composite primary key and are +-- distributed across parallel workers at chunk-size=10. +insert into gh_ost_test (i, v) select i, v from gh_ost_test; +insert into gh_ost_test (i, v) select i, v from gh_ost_test; +insert into gh_ost_test (i, v) select i, v from gh_ost_test; +insert into gh_ost_test (i, v) select i, v from gh_ost_test; +insert into gh_ost_test (i, v) select i, v from gh_ost_test; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 11, 'eleven', 0); + update gh_ost_test set updated = 1 where i = 11 order by id desc limit 1; + insert into gh_ost_test values (null, 23, 'twenty three', 0); + delete from gh_ost_test where i = 31 order by id desc limit 1; +end ;; diff --git a/localtests/parallel-copy-compound-pk/extra_args b/localtests/parallel-copy-compound-pk/extra_args new file mode 100644 index 000000000..bfc0210b7 --- /dev/null +++ b/localtests/parallel-copy-compound-pk/extra_args @@ -0,0 +1 @@ +--parallel-copy --parallel-copy-workers=4 \ No newline at end of file diff --git a/localtests/parallel-copy-resume/create.sql b/localtests/parallel-copy-resume/create.sql new file mode 100644 index 000000000..f243f9379 --- /dev/null +++ b/localtests/parallel-copy-resume/create.sql @@ -0,0 +1,46 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + color varchar(32), + updated tinyint unsigned default 0, + primary key(id) +) auto_increment=1; + +insert into gh_ost_test (i, color) values + (1,'red'),(2,'green'),(3,'blue'),(4,'orange'), + (5,'yellow'),(6,'gold'),(7,'silver'),(8,'pink'), + (9,'cyan'),(10,'magenta'),(11,'teal'),(12,'lime'), + (13,'navy'),(14,'olive'),(15,'maroon'),(16,'coral'); + +-- Grow to ~32k rows so that, combined with --nice-ratio in test.sh, the first +-- run spends well over one checkpoint interval (10s) in row-copy. This makes it +-- likely that a checkpoint lands mid-copy, so --resume exercises gap-filling +-- from a partial frontier. (The test stays correct either way; the deterministic +-- gap logic is covered by go/logic/parallel_test.go.) +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test (i, color) values (101, 'concurrent'); + update gh_ost_test set updated = 1, color = 'updated' where i = 1 order by id desc limit 1; + delete from gh_ost_test where i = 2 order by id desc limit 1; +end ;; diff --git a/localtests/parallel-copy-resume/test.sh b/localtests/parallel-copy-resume/test.sh new file mode 100644 index 000000000..2fa018d82 --- /dev/null +++ b/localtests/parallel-copy-resume/test.sh @@ -0,0 +1,102 @@ +#!/bin/bash +# Custom test: kill a --parallel-copy migration mid-flight (after at least one +# checkpoint), then --resume it and verify the resulting ghost table matches the +# original. +# +# The test is robust to timing: if the first run happens to finish the row copy +# before we kill it, --resume simply has no gaps to fill and completes. Either +# way the ghost table must match the original. The deterministic gap-filling +# logic itself is covered by the unit tests in go/logic/parallel_test.go. + +table_name="gh_ost_test" +ghost_table_name="_gh_ost_test_gho" +postpone_flag_file=/tmp/gh-ost-test.parallel-resume.postpone +rm -f $postpone_flag_file + +# Connection/behaviour args shared by both runs. No --initially-drop-* flags +# (the resume run must keep the ghost/checkpoint tables produced by the first +# run) and no postpone flag (gh-ost re-creates a removed flag file at startup, +# which would re-postpone the resume run). +base_args="--user=gh-ost --password=gh-ost \ + --host=$replica_host --port=$replica_port \ + --assume-master-host=${master_host}:${master_port} \ + --database=test --table=${table_name} \ + --storage-engine=${storage_engine} --alter='engine=${storage_engine}' \ + --exact-rowcount --assume-rbr --skip-metadata-lock-check \ + --serve-socket-file=/tmp/gh-ost.test.sock --initially-drop-socket-file \ + --test-on-replica --default-retries=3 --chunk-size=10 \ + --checkpoint --checkpoint-seconds=10 --parallel-copy --parallel-copy-workers=4 \ + --verbose --debug --stack" + +cleanup_tables() { + gh-ost-test-mysql-master --default-character-set=utf8mb4 test -e " + drop table if exists _gh_ost_test_gho; + drop table if exists _gh_ost_test_ghc; + drop table if exists _gh_ost_test_ghk; + drop table if exists _gh_ost_test_del; + drop event if exists gh_ost_test; + drop table if exists gh_ost_test; + " +} + +echo >$test_logfile + +# --- First run: start, then simulate a crash after a checkpoint ------------- +# The postpone flag keeps the first run alive past row-copy so a checkpoint is +# guaranteed to land and the kill cannot race the cut-over (which would otherwise +# leave replication stopped). --nice-ratio slows the copy so the checkpoint is +# likely to land mid-copy, exercising resume-with-gaps. +touch $postpone_flag_file +first_cmd="GOTRACEBACK=crash $ghost_binary $base_args \ + --nice-ratio=5 --postpone-cut-over-flag-file=$postpone_flag_file \ + --initially-drop-ghost-table --initially-drop-old-table --execute" +echo "$first_cmd" >>$test_logfile +bash -c "$first_cmd" >>$test_logfile 2>&1 & +ghost_pid=$! + +# Wait for the first checkpoint, then kill -9 to simulate an uncatchable crash +# (no graceful table cleanup). +for i in {1..90}; do + grep -q "checkpoint success" $test_logfile && break + ps -p $ghost_pid >/dev/null 2>&1 || break + sleep 1 + echo_dot +done +kill -9 $ghost_pid 2>/dev/null +wait $ghost_pid 2>/dev/null +rm -f $postpone_flag_file + +if ! grep -q "checkpoint success" $test_logfile; then + echo + echo "WARN $test_name: first run never wrote a parallel checkpoint; skipping resume" + cleanup_tables + return 0 +fi + +# --- Resume run: must reuse the checkpoint and complete the migration ------- +resume_cmd="GOTRACEBACK=crash $ghost_binary $base_args --resume --execute" +echo "$resume_cmd" >>$test_logfile +timeout $test_timeout bash -c "$resume_cmd" >>$test_logfile 2>&1 +execution_result=$? +if [ $execution_result -ne 0 ]; then + echo + echo "ERROR $test_name: resume run failed (exit ${execution_result})" + print_log_excerpt + return 1 +fi + +# --- Validate: ghost table matches original --------------------------------- +# With --test-on-replica the rename is rolled back, so the ghost table remains +# for comparison. +gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select * from ${table_name} order by id" -ss >$orig_content_output_file +gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select * from ${ghost_table_name} order by id" -ss >$ghost_content_output_file +orig_checksum=$(cat $orig_content_output_file | md5sum) +ghost_checksum=$(cat $ghost_content_output_file | md5sum) +if [ "$orig_checksum" != "$ghost_checksum" ]; then + echo + echo "ERROR $test_name: checksum mismatch after resume" + diff $orig_content_output_file $ghost_content_output_file | head -40 + return 1 +fi +cleanup_tables +return 0 diff --git a/localtests/parallel-copy/create.sql b/localtests/parallel-copy/create.sql new file mode 100644 index 000000000..55d4374a1 --- /dev/null +++ b/localtests/parallel-copy/create.sql @@ -0,0 +1,35 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + color varchar(32), + updated tinyint unsigned default 0, + primary key(id) +) auto_increment=1; + +insert into gh_ost_test (i, color) values + (1,'red'),(2,'green'),(3,'blue'),(4,'orange'), + (5,'yellow'),(6,'gold'),(7,'silver'),(8,'pink'); + +-- Multiply to a few hundred rows so that, at chunk-size=10, many chunks are +-- distributed across the parallel workers. +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; +insert into gh_ost_test (i, color) select i, color from gh_ost_test; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test (i, color) values (101, 'concurrent'); + update gh_ost_test set updated = 1, color = 'updated' where i = 1 order by id desc limit 1; + delete from gh_ost_test where i = 2 order by id desc limit 1; +end ;; diff --git a/localtests/parallel-copy/extra_args b/localtests/parallel-copy/extra_args new file mode 100644 index 000000000..bfc0210b7 --- /dev/null +++ b/localtests/parallel-copy/extra_args @@ -0,0 +1 @@ +--parallel-copy --parallel-copy-workers=4 \ No newline at end of file