Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ See also: [`resuming-migrations`](resume.md)

Controls how many rows `gh-ost` copies in each row-copy iteration. The default is `1000`; allowed range is `10` to `100000`.

### parallel-copy

`--parallel-copy` copies the table rows using multiple workers running concurrently. Binlog event application remains single-threaded, so event ordering is preserved. This is experimental and requires `--checkpoint`. Default is disabled.

Parallel copy requires checkpoints because parallel INSERTs can go out of sequential order and leave gaps in target table. gh-ost checkpoints are consistent and do not have gaps, so resuming from a checkpoint will make the target table consistent again.

Consistency is ensured by using
-INSERT IGNORE/SELECT FOR SHARE for row copy (current gh-ost behavior)
-DELETE/INSERT by DML applier when unique key change is detected (current gh-ost behavior)
-advanceFrontier call to ensure that checkpoints have no gaps (used by parallel-copy only)
Without all of those, consistency of parallel copy cannot be guaranteed (for example, row copy could insert row back after it was deleted). In existing serial mode, the problem is much easier because DML applier is never parallel with row copy.

### parallel-copy-workers

`--parallel-copy-workers` sets the number of parallel row-copy workers used when `--parallel-copy` is enabled. Has no effect otherwise. Default is 4; maximum is 64. The applier connection pool is sized to `parallel-copy-workers + 4`. Throughput gains tend to plateau past ~16 workers — so prefer the smallest value that meets your needs.

### parallel-copy-max-heartbeatlag-millis

`--parallel-copy-max-heartbeatlag-millis` sets a heartbeat-lag threshold (in milliseconds) used to throttle row-copy workers when `--parallel-copy` is enabled. Requires `--parallel-copy`. Default is `10000` (10 seconds).

gh-ost's heartbeat lag is used as a lightweight proxy for binlog applier lag. When heartbeat lag is below this threshold, all workers copy rows at full concurrency. When it exceeds the threshold, workers pause and yield, giving the binlog applier time to catch up. This preserves the same priority model as serial row-copy, where binlog event application is never starved by row-copy work. If the applier is consistently lagging, workers will spend most of their time paused and throughput approaches the serial baseline; when the table's DML rate is low and the applier is not lagging, all workers run freely and deliver the full benefit of parallelism.

HeartbeatLag is also the primary signal used by the cut-over gate: gh-ost will not attempt the table swap until HeartbeatLag drops below both `--max-lag-millis` and `--cut-over-lock-timeout-seconds`. Using it here as the parallel-copy throttle signal is therefore consistent with the most safety-critical decision point in the migration.

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
Expand Down
31 changes: 31 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64

// MaxCopyWorkers is the hard upper bound on --parallel-copy-workers. The applier
// connection pool is sized to CopyWorkers + ParallelCopyConnHeadroom, so this
// also bounds connection usage and prevents exhausting the server's
// max_connections.
MaxCopyWorkers = 64
// RecommendedMaxCopyWorkers is a soft threshold above which we warn: the
// throughput benefit of more workers plateaus while load keeps growing.
RecommendedMaxCopyWorkers = 16
// ParallelCopyConnHeadroom is the number of connections added to the applier
// pool on top of CopyWorkers, to cover the DML event applier goroutine
// the status logger, the heartbeat goroutine, and the checkpoint loop.
ParallelCopyConnHeadroom = 4
)

var (
Expand Down Expand Up @@ -162,6 +175,10 @@ type MigrationContext struct {
Checkpoint bool
CheckpointIntervalSeconds int64

ParallelCopy bool
ParallelCopyWorkers int64
ParallelCopyMaxHeartbeatLagThresholdMillies int64

DropServeSocket bool
ServeSocketFile string
ServeTCPPort int64
Expand Down Expand Up @@ -275,6 +292,20 @@ type MigrationContext struct {
IsOpenMetadataLockInstruments bool

Log Logger

migrationLastInsertSQLWarningsMu sync.Mutex // unexported — only accessed via methods below
}

func (mctx *MigrationContext) SetLastInsertSQLWarnings(warnings []string) {
mctx.migrationLastInsertSQLWarningsMu.Lock()
defer mctx.migrationLastInsertSQLWarningsMu.Unlock()
mctx.MigrationLastInsertSQLWarnings = warnings
}

func (mctx *MigrationContext) GetLastInsertSQLWarnings() []string {
mctx.migrationLastInsertSQLWarningsMu.Lock()
defer mctx.migrationLastInsertSQLWarningsMu.Unlock()
return mctx.MigrationLastInsertSQLWarnings
}

type Logger interface {
Expand Down
20 changes: 20 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ func main() {
flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections")
flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints")
flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints")
flag.BoolVar(&migrationContext.ParallelCopy, "parallel-copy", false, "Copy table rows using multiple parallel workers. Experimental; requires --checkpoint")
flag.Int64Var(&migrationContext.ParallelCopyWorkers, "parallel-copy-workers", 4, "Number of parallel row-copy workers when --parallel-copy is enabled (max 64; gains tend to plateau past ~16)")
flag.Int64Var(&migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies, "parallel-copy-max-heartbeatlag-millis", 10000, "When >0 and --parallel-copy is enabled, row-copy workers pause when gh-ost's own binlog applier lag exceeds this threshold (ms), giving the binlog applier priority. 0 disables this check.")
flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint")
flag.BoolVar(&migrationContext.Revert, "revert", false, "Attempt to revert completed migration")
flag.StringVar(&migrationContext.OldTableName, "old-table", "", "The name of the old table when using --revert, e.g. '_mytable_del'")
Expand Down Expand Up @@ -336,6 +339,23 @@ func main() {
if migrationContext.CheckpointIntervalSeconds < 10 {
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
}
if migrationContext.ParallelCopy {
if !migrationContext.Checkpoint {
migrationContext.Log.Fatalf("--parallel-copy requires --checkpoint")
}
if migrationContext.ParallelCopyWorkers < 1 {
migrationContext.Log.Fatalf("--parallel-copy-workers should be >=1")
}
if migrationContext.ParallelCopyWorkers > base.MaxCopyWorkers {
migrationContext.Log.Fatalf("--parallel-copy-workers should be <=%d", base.MaxCopyWorkers)
}
if migrationContext.ParallelCopyWorkers > base.RecommendedMaxCopyWorkers {
migrationContext.Log.Warningf("--parallel-copy-workers=%d is high: throughput gains tend to plateau while load on the server grows. Consider <=%d.", migrationContext.ParallelCopyWorkers, base.RecommendedMaxCopyWorkers)
}
if migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies > 0 && migrationContext.ParallelCopyMaxHeartbeatLagThresholdMillies < 100 {
migrationContext.Log.Fatalf("--parallel-copy-max-heartbeatlag-millis should be >=100 or 0 (disabled)")
}
}
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
}
Expand Down
55 changes: 48 additions & 7 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (apl *Applier) InitDBConnections() (err error) {
if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil {
return err
}
if apl.migrationContext.ParallelCopy {
poolSize := int(apl.migrationContext.ParallelCopyWorkers) + base.ParallelCopyConnHeadroom
apl.db.SetMaxOpenConns(poolSize)
apl.db.SetMaxIdleConns(poolSize)
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
Expand Down Expand Up @@ -1075,12 +1080,42 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
return hasFurtherRange, nil
}

// iterationRange carries an explicit chunk range for ApplyIterationInsertQuery. It is
// passed only by the --parallel-copy path; serial callers pass nothing and the shared
// migrationContext.MigrationIterationRange* values are used instead.
type iterationRange struct {
min *sql.ColumnValues
max *sql.ColumnValues
includeRangeStart bool
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
//
// Serial callers invoke it with no argument: the chunk range comes from the shared
// migrationContext.MigrationIterationRange* cursor.
// The --parallel-copy path passes an explicit range (captured under the boundary-scan
// mutex) so concurrent workers don't race on the shared cursor
func (apl *Applier) ApplyIterationInsertQuery(parallelRange ...*iterationRange) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize)

parallel := len(parallelRange) > 0 && parallelRange[0] != nil
var rangeMin, rangeMax *sql.ColumnValues
var includeRangeStartValues bool
if parallel {
// Use the explicitly-passed range. The parallel INSERT runs unlocked and
// concurrently, so it must NOT read the shared MigrationIterationRange* cursor:
// another worker is mutating it under parallelSelectMutex.
rangeMin = parallelRange[0].min
rangeMax = parallelRange[0].max
includeRangeStartValues = parallelRange[0].includeRangeStart
} else {
rangeMin = apl.migrationContext.MigrationIterationRangeMinValues
rangeMax = apl.migrationContext.MigrationIterationRangeMaxValues
includeRangeStartValues = apl.migrationContext.GetIteration() == 0
}

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
Expand All @@ -1089,9 +1124,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
apl.migrationContext.MappedSharedColumns.Names(),
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
apl.migrationContext.GetIteration() == 0,
rangeMin.AbstractValues(),
rangeMax.AbstractValues(),
includeRangeStartValues,
apl.migrationContext.IsTransactionalTable(),
// TODO: Don't hardcode this
strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."),
Expand Down Expand Up @@ -1149,7 +1184,13 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
if parallel {
if len(sqlWarnings) > 0 {
apl.migrationContext.SetLastInsertSQLWarnings(sqlWarnings)
}
} else {
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}
}

if err := tx.Commit(); err != nil {
Expand All @@ -1165,8 +1206,8 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
duration = time.Since(startTime)
apl.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
apl.migrationContext.MigrationIterationRangeMinValues,
apl.migrationContext.MigrationIterationRangeMaxValues,
rangeMin,
rangeMax,
apl.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
Expand Down
Loading