From 16b227143304754a409c3756e27a1745a03805ba Mon Sep 17 00:00:00 2001 From: Zach Sierakowski Date: Mon, 22 Jun 2026 17:09:28 +0000 Subject: [PATCH 1/2] misc broken local test things from pulling in integration tests changes --- script/move-tables/README.md | 6 +++--- script/move-tables/insert-source-primary-loop | 2 +- script/move-tables/reset | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/script/move-tables/README.md b/script/move-tables/README.md index 07d1037a7..c582eefc2 100644 --- a/script/move-tables/README.md +++ b/script/move-tables/README.md @@ -7,12 +7,12 @@ script/move-tables/setup Verify data is present in the source cluster. ```bash -script/move-tables/mysql-source-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +script/move-tables/mysql-source-primary -D test -e "SELECT * FROM gh_ost_test;" ``` Verify the empty database is present in the target cluster. ```bash -script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SHOW TABLES;" +script/move-tables/mysql-target-primary -D test -e "SHOW TABLES;" ``` ### Testing `gh-ost` @@ -24,7 +24,7 @@ script/build --cli Run gh-ost to move tables: ```bash -./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose --checkpoint --checkpoint-seconds 10 --initially-drop-socket-file +./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=test --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=test --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose --checkpoint --checkpoint-seconds 10 --initially-drop-socket-file ``` Start continuous inserts against the source. diff --git a/script/move-tables/insert-source-primary-loop b/script/move-tables/insert-source-primary-loop index 490442ec4..1147e55f1 100755 --- a/script/move-tables/insert-source-primary-loop +++ b/script/move-tables/insert-source-primary-loop @@ -13,7 +13,7 @@ start_i="${1:-100000}" delay="${2:-0.2}" rows_per_batch="${3:-1}" i="$start_i" -DATABASE="${DATABASE:-gh_ost_test_db} +DATABASE="${DATABASE:-test}" echo "Starting continuous inserts on source primary. Press Ctrl+C to stop." echo "start_column1=$start_i sleep_seconds=$delay rows_per_batch=$rows_per_batch" diff --git a/script/move-tables/reset b/script/move-tables/reset index 679e7aa78..a9a354869 100755 --- a/script/move-tables/reset +++ b/script/move-tables/reset @@ -4,13 +4,13 @@ set -euo pipefail GH_OST_ROOT=$(git rev-parse --show-toplevel) SCRIPT_PATH="${GH_OST_ROOT}/script/move-tables" -DATABASE_NAME="${GH_OST_TEST_DB:-gh_ost_test_db}" +DATABASE_NAME="${GH_OST_TEST_DB:-test}" # Reset source table state regardless of whether cutover renamed it. ${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" -e "DROP TABLE IF EXISTS _gh_ost_test_del, gh_ost_test;" # Recreate and seed source table data, same fixture as setup uses. -${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" < "${GH_OST_ROOT}/localtests/move-tables/create.sql" +${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" < "${GH_OST_ROOT}/localtests/move-tables/single/create.sql" ${SCRIPT_PATH}/mysql-target-primary -D "${DATABASE_NAME}" -e "DROP TABLE IF EXISTS gh_ost_test, _gh_ost_test_ghk;" From 04ef676c57c9f5fc8a3de76cceff4d384744c088 Mon Sep 17 00:00:00 2001 From: Zach Sierakowski Date: Mon, 22 Jun 2026 17:09:56 +0000 Subject: [PATCH 2/2] Fix up print status so all fields are accurate, including lag measurement --- go/base/context.go | 106 +++++++++++++++++++++++++++--------- go/binlog/binlog_entry.go | 7 ++- go/binlog/gomysql_reader.go | 1 + go/logic/hooks.go | 12 +++- go/logic/migrator.go | 60 +++++++++++++++++--- 5 files changed, 152 insertions(+), 34 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 747d23d01..cccf44f33 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -177,31 +177,38 @@ type MigrationContext struct { CutOverType CutOver ReplicaServerId uint - Hostname string - AssumeMasterHostname string - ApplierTimeZone string - ApplierWaitTimeout int64 - TableEngine string - RowsEstimate int64 - RowsDeltaEstimate int64 - UsedRowsEstimateMethod RowsEstimateMethod - HasSuperPrivilege bool - OriginalBinlogFormat string - OriginalBinlogRowImage string - InspectorConnectionConfig *mysql.ConnectionConfig - InspectorMySQLVersion string - ApplierConnectionConfig *mysql.ConnectionConfig - ApplierMySQLVersion string - StartTime time.Time - RowCopyStartTime time.Time - RowCopyEndTime time.Time - LockTablesStartTime time.Time - RenameTablesStartTime time.Time - RenameTablesEndTime time.Time - pointOfInterestTime time.Time - pointOfInterestTimeMutex *sync.Mutex - lastHeartbeatOnChangelogTime time.Time - lastHeartbeatOnChangelogMutex *sync.Mutex + Hostname string + AssumeMasterHostname string + ApplierTimeZone string + ApplierWaitTimeout int64 + TableEngine string + RowsEstimate int64 + RowsDeltaEstimate int64 + UsedRowsEstimateMethod RowsEstimateMethod + HasSuperPrivilege bool + OriginalBinlogFormat string + OriginalBinlogRowImage string + InspectorConnectionConfig *mysql.ConnectionConfig + InspectorMySQLVersion string + ApplierConnectionConfig *mysql.ConnectionConfig + ApplierMySQLVersion string + StartTime time.Time + RowCopyStartTime time.Time + RowCopyEndTime time.Time + LockTablesStartTime time.Time + RenameTablesStartTime time.Time + RenameTablesEndTime time.Time + pointOfInterestTime time.Time + pointOfInterestTimeMutex *sync.Mutex + lastHeartbeatOnChangelogTime time.Time + lastHeartbeatOnChangelogMutex *sync.Mutex + // lastAppliedBinlogEventTime is the binlog-header timestamp of the last event + // applied to the target in move-tables mode. lastBinlogEventStreamedTime is the + // wall-clock time the streamer last delivered an event for the moved table. + // Together they drive the move-tables writer-lag metric (see GetBinlogWriterLag). + lastAppliedBinlogEventTime time.Time + lastBinlogEventStreamedTime time.Time + binlogWriterLagMutex *sync.Mutex CurrentLag int64 currentProgress uint64 etaNanoseonds int64 @@ -358,6 +365,7 @@ func NewMigrationContext() *MigrationContext { configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, lastHeartbeatOnChangelogMutex: &sync.Mutex{}, + binlogWriterLagMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), ctx: ctx, @@ -737,6 +745,54 @@ func (mctx *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time { return mctx.lastHeartbeatOnChangelogTime } +// UpdateLastAppliedBinlogEventTime records the binlog-header timestamp of the +// last event successfully applied to the target. Used by move-tables mode to +// derive writer lag. +func (mctx *MigrationContext) UpdateLastAppliedBinlogEventTime(t time.Time) { + mctx.binlogWriterLagMutex.Lock() + defer mctx.binlogWriterLagMutex.Unlock() + + mctx.lastAppliedBinlogEventTime = t +} + +// MarkBinlogEventStreamed records that the streamer just delivered an event for +// the moved table. Used to distinguish "falling behind" from "source is idle". +func (mctx *MigrationContext) MarkBinlogEventStreamed() { + mctx.binlogWriterLagMutex.Lock() + defer mctx.binlogWriterLagMutex.Unlock() + + mctx.lastBinlogEventStreamedTime = time.Now() +} + +// BumpBinlogWriterLagIfIdle treats prolonged streamer silence as "caught up": +// if no event has been streamed for the moved table within idleThreshold, the +// last-applied timestamp is advanced to now so writer lag does not climb forever +// while the source is quiet. +func (mctx *MigrationContext) BumpBinlogWriterLagIfIdle(idleThreshold time.Duration) { + mctx.binlogWriterLagMutex.Lock() + defer mctx.binlogWriterLagMutex.Unlock() + + if mctx.lastBinlogEventStreamedTime.IsZero() || time.Since(mctx.lastBinlogEventStreamedTime) >= idleThreshold { + mctx.lastAppliedBinlogEventTime = time.Now() + } +} + +// GetBinlogWriterLag returns now - last applied event timestamp, the move-tables +// writer lag. It returns 0 before any event has been applied. +func (mctx *MigrationContext) GetBinlogWriterLag() time.Duration { + mctx.binlogWriterLagMutex.Lock() + defer mctx.binlogWriterLagMutex.Unlock() + + if mctx.lastAppliedBinlogEventTime.IsZero() { + return 0 + } + lag := time.Since(mctx.lastAppliedBinlogEventTime) + if lag < 0 { + return 0 + } + return lag +} + func (mctx *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) { if heartbeatIntervalMilliseconds < 100 { heartbeatIntervalMilliseconds = 100 diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index 7620281d2..c1216054c 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -7,6 +7,7 @@ package binlog import ( "fmt" + "time" "github.com/github/gh-ost/go/mysql" ) @@ -14,7 +15,11 @@ import ( // BinlogEntry describes an entry in the binary log type BinlogEntry struct { Coordinates mysql.BinlogCoordinates - DmlEvent *BinlogDMLEvent + // Timestamp is the wall-clock time recorded in the binlog event header of the + // event that produced this entry. It is used in move-tables mode to measure + // writer lag (now - last applied event timestamp). + Timestamp time.Time + DmlEvent *BinlogDMLEvent } // NewBinlogEntryAt creates an empty, ready to go BinlogEntry object diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 06203be90..e0667daf4 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -99,6 +99,7 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent continue } binlogEntry := NewBinlogEntryAt(currentCoords) + binlogEntry.Timestamp = time.Unix(int64(ev.Header.Timestamp), 0) binlogEntry.DmlEvent = NewBinlogDMLEvent( string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table), diff --git a/go/logic/hooks.go b/go/logic/hooks.go index ceec4b6b9..2a48a24be 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -235,7 +235,17 @@ func (he *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []s env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", he.migrationContext.Hostname)) env = append(env, fmt.Sprintf("GH_OST_TARGET_HOST=%s", he.migrationContext.GetTargetHostname())) env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", he.migrationContext.GetCurrentLagDuration().Seconds())) - env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", he.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) + // In move-tables mode there is no changelog heartbeat; writer lag (now - last + // applied binlog event timestamp) replaces the heartbeat-derived lag. Re-point + // GH_OST_HEARTBEAT_LAG at it so existing hooks keep seeing a meaningful value, + // and also expose it explicitly as GH_OST_BINLOG_WRITER_LAG_SECONDS. + heartbeatLagSeconds := he.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds() + binlogWriterLagSeconds := he.migrationContext.GetBinlogWriterLag().Seconds() + if he.migrationContext.IsMoveTablesMode() { + heartbeatLagSeconds = binlogWriterLagSeconds + } + env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", heartbeatLagSeconds)) + env = append(env, fmt.Sprintf("GH_OST_BINLOG_WRITER_LAG_SECONDS=%f", binlogWriterLagSeconds)) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", he.migrationContext.GetProgressPct())) env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", he.migrationContext.GetETASeconds())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", he.migrationContext.HooksHintMessage)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index f3658b397..0bae5e5a4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -55,9 +55,10 @@ type lockProcessedStruct struct { } type applyEventStruct struct { - writeFunc *tableWriteFunc - dmlEvent *binlog.BinlogDMLEvent - coords mysql.BinlogCoordinates + writeFunc *tableWriteFunc + dmlEvent *binlog.BinlogDMLEvent + coords mysql.BinlogCoordinates + eventTimestamp time.Time } func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { @@ -66,7 +67,7 @@ func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { } func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct { - result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates} + result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates, eventTimestamp: dmlEntry.Timestamp} return result } @@ -2170,14 +2171,28 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + // Lag reporting differs by mode. Standard mode reports the inspected replica + // "Lag" (changelog heartbeat) plus "HeartbeatLag". Move-tables mode has no + // changelog heartbeat, and the source-side replication "Lag" is meaningless + // (writes go to the target, so migration-induced replica lag appears on target + // replicas and is handled separately by throttling). The "Lag" field is + // therefore dropped and writer lag (now - last applied binlog event timestamp) + // is reported as "WriterLag" instead. + lagStatus := fmt.Sprintf("Lag: %.2fs, HeartbeatLag: %.2fs", + mgtr.migrationContext.GetCurrentLagDuration().Seconds(), + mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), + ) + if mgtr.migrationContext.IsMoveTablesMode() { + lagStatus = fmt.Sprintf("WriterLag: %.2fs", mgtr.migrationContext.GetBinlogWriterLag().Seconds()) + } + + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; %s, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates.DisplayString(), - mgtr.migrationContext.GetCurrentLagDuration().Seconds(), - mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), + lagStatus, state, eta, ) @@ -2242,6 +2257,24 @@ func (mgtr *Migrator) initiateStreaming() error { mgtr.migrationContext.SetRecentBinlogCoordinates(mgtr.eventsStreamer.GetCurrentBinlogCoordinates()) } }() + + // In move-tables mode there is no changelog heartbeat. Writer lag is derived + // from binlog-header timestamps of applied events; when the streamer has been + // silent for the heartbeat interval, treat that silence as "caught up" and bump + // the last-applied timestamp to now so lag does not climb forever while idle. + if mgtr.migrationContext.IsMoveTablesMode() { + go func() { + interval := time.Duration(mgtr.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 { + return + } + mgtr.migrationContext.BumpBinlogWriterLagIfIdle(interval) + } + }() + } return nil } @@ -2258,6 +2291,11 @@ func (mgtr *Migrator) addDMLEventsListener() error { mgtr.migrationContext.DatabaseName, originalTableName, func(dmlEntry *binlog.BinlogEntry) error { + // Record that the streamer just delivered an event for the moved table, + // so the idle-bump rule can tell "falling behind" from "source is quiet". + if mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.MarkBinlogEventStreamed() + } // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits // This is critical because this callback blocks the event streamer return base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.applyEventsQueue, newApplyEventStructByDML(dmlEntry)) @@ -2492,6 +2530,7 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if eventStruct.dmlEvent != nil { dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) + lastEventTimestamp := eventStruct.eventTimestamp var nonDmlStructToApply *applyEventStruct availableEvents := len(mgtr.applyEventsQueue) @@ -2509,6 +2548,7 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + lastEventTimestamp = additionalStruct.eventTimestamp } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { @@ -2522,6 +2562,12 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { mgtr.applier.CurrentCoordinates = eventStruct.coords mgtr.applier.CurrentCoordinatesMutex.Unlock() + // In move-tables mode there is no changelog heartbeat; writer lag is derived + // from the binlog-header timestamp of the last event we just applied. + if mgtr.migrationContext.IsMoveTablesMode() && !lastEventTimestamp.IsZero() { + mgtr.migrationContext.UpdateLastAppliedBinlogEventTime(lastEventTimestamp) + } + if nonDmlStructToApply != nil { // We pulled DML events from the queue, and then we hit a non-DML event. Wait! // We need to handle it!