Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 81 additions & 25 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,31 +177,38 @@ type MigrationContext struct {
CutOverType CutOver
ReplicaServerId uint

Hostname string
AssumeMasterHostname string
ApplierTimeZone string
ApplierWaitTimeout int64
TableEngine string
RowsEstimate int64
RowsDeltaEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
HasSuperPrivilege bool
OriginalBinlogFormat string
OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig
InspectorMySQLVersion string
ApplierConnectionConfig *mysql.ConnectionConfig
ApplierMySQLVersion string
StartTime time.Time
RowCopyStartTime time.Time
RowCopyEndTime time.Time
LockTablesStartTime time.Time
RenameTablesStartTime time.Time
RenameTablesEndTime time.Time
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
lastHeartbeatOnChangelogTime time.Time
lastHeartbeatOnChangelogMutex *sync.Mutex
Hostname string
AssumeMasterHostname string
ApplierTimeZone string
ApplierWaitTimeout int64
TableEngine string
RowsEstimate int64
RowsDeltaEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
HasSuperPrivilege bool
OriginalBinlogFormat string
OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig
InspectorMySQLVersion string
ApplierConnectionConfig *mysql.ConnectionConfig
ApplierMySQLVersion string
StartTime time.Time
RowCopyStartTime time.Time
RowCopyEndTime time.Time
LockTablesStartTime time.Time
RenameTablesStartTime time.Time
RenameTablesEndTime time.Time
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
lastHeartbeatOnChangelogTime time.Time
lastHeartbeatOnChangelogMutex *sync.Mutex
// lastAppliedBinlogEventTime is the binlog-header timestamp of the last event
// applied to the target in move-tables mode. lastBinlogEventStreamedTime is the
// wall-clock time the streamer last delivered an event for the moved table.
// Together they drive the move-tables writer-lag metric (see GetBinlogWriterLag).
lastAppliedBinlogEventTime time.Time
lastBinlogEventStreamedTime time.Time
binlogWriterLagMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
Expand Down Expand Up @@ -358,6 +365,7 @@ func NewMigrationContext() *MigrationContext {
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
binlogWriterLagMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
ctx: ctx,
Expand Down Expand Up @@ -737,6 +745,54 @@ func (mctx *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time {
return mctx.lastHeartbeatOnChangelogTime
}

// UpdateLastAppliedBinlogEventTime records the binlog-header timestamp of the
// last event successfully applied to the target. Used by move-tables mode to
// derive writer lag.
func (mctx *MigrationContext) UpdateLastAppliedBinlogEventTime(t time.Time) {
mctx.binlogWriterLagMutex.Lock()
defer mctx.binlogWriterLagMutex.Unlock()

mctx.lastAppliedBinlogEventTime = t
}

// MarkBinlogEventStreamed records that the streamer just delivered an event for
// the moved table. Used to distinguish "falling behind" from "source is idle".
func (mctx *MigrationContext) MarkBinlogEventStreamed() {
mctx.binlogWriterLagMutex.Lock()
defer mctx.binlogWriterLagMutex.Unlock()

mctx.lastBinlogEventStreamedTime = time.Now()
}

// BumpBinlogWriterLagIfIdle treats prolonged streamer silence as "caught up":
// if no event has been streamed for the moved table within idleThreshold, the
// last-applied timestamp is advanced to now so writer lag does not climb forever
// while the source is quiet.
func (mctx *MigrationContext) BumpBinlogWriterLagIfIdle(idleThreshold time.Duration) {
mctx.binlogWriterLagMutex.Lock()
defer mctx.binlogWriterLagMutex.Unlock()

if mctx.lastBinlogEventStreamedTime.IsZero() || time.Since(mctx.lastBinlogEventStreamedTime) >= idleThreshold {
mctx.lastAppliedBinlogEventTime = time.Now()
}
}

// GetBinlogWriterLag returns now - last applied event timestamp, the move-tables
// writer lag. It returns 0 before any event has been applied.
func (mctx *MigrationContext) GetBinlogWriterLag() time.Duration {
mctx.binlogWriterLagMutex.Lock()
defer mctx.binlogWriterLagMutex.Unlock()

if mctx.lastAppliedBinlogEventTime.IsZero() {
return 0
}
lag := time.Since(mctx.lastAppliedBinlogEventTime)
if lag < 0 {
return 0
}
return lag
}

func (mctx *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
if heartbeatIntervalMilliseconds < 100 {
heartbeatIntervalMilliseconds = 100
Expand Down
7 changes: 6 additions & 1 deletion go/binlog/binlog_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ package binlog

import (
"fmt"
"time"

"github.com/github/gh-ost/go/mysql"
)

// BinlogEntry describes an entry in the binary log
type BinlogEntry struct {
Coordinates mysql.BinlogCoordinates
DmlEvent *BinlogDMLEvent
// Timestamp is the wall-clock time recorded in the binlog event header of the
// event that produced this entry. It is used in move-tables mode to measure
// writer lag (now - last applied event timestamp).
Timestamp time.Time
DmlEvent *BinlogDMLEvent
}

// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
Expand Down
1 change: 1 addition & 0 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent
continue
}
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.Timestamp = time.Unix(int64(ev.Header.Timestamp), 0)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
Expand Down
12 changes: 11 additions & 1 deletion go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,17 @@ func (he *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []s
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", he.migrationContext.Hostname))
env = append(env, fmt.Sprintf("GH_OST_TARGET_HOST=%s", he.migrationContext.GetTargetHostname()))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", he.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", he.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
// In move-tables mode there is no changelog heartbeat; writer lag (now - last
// applied binlog event timestamp) replaces the heartbeat-derived lag. Re-point
// GH_OST_HEARTBEAT_LAG at it so existing hooks keep seeing a meaningful value,
// and also expose it explicitly as GH_OST_BINLOG_WRITER_LAG_SECONDS.
heartbeatLagSeconds := he.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()
binlogWriterLagSeconds := he.migrationContext.GetBinlogWriterLag().Seconds()
if he.migrationContext.IsMoveTablesMode() {
heartbeatLagSeconds = binlogWriterLagSeconds
}
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", heartbeatLagSeconds))
env = append(env, fmt.Sprintf("GH_OST_BINLOG_WRITER_LAG_SECONDS=%f", binlogWriterLagSeconds))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", he.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", he.migrationContext.GetETASeconds()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", he.migrationContext.HooksHintMessage))
Expand Down
60 changes: 53 additions & 7 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type lockProcessedStruct struct {
}

type applyEventStruct struct {
writeFunc *tableWriteFunc
dmlEvent *binlog.BinlogDMLEvent
coords mysql.BinlogCoordinates
writeFunc *tableWriteFunc
dmlEvent *binlog.BinlogDMLEvent
coords mysql.BinlogCoordinates
eventTimestamp time.Time
}

func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
Expand All @@ -66,7 +67,7 @@ func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
}

func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct {
result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates}
result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates, eventTimestamp: dmlEntry.Timestamp}
return result
}

Expand Down Expand Up @@ -2170,14 +2171,28 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {

currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates()

status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
// Lag reporting differs by mode. Standard mode reports the inspected replica
// "Lag" (changelog heartbeat) plus "HeartbeatLag". Move-tables mode has no
// changelog heartbeat, and the source-side replication "Lag" is meaningless
// (writes go to the target, so migration-induced replica lag appears on target
// replicas and is handled separately by throttling). The "Lag" field is
// therefore dropped and writer lag (now - last applied binlog event timestamp)
// is reported as "WriterLag" instead.
lagStatus := fmt.Sprintf("Lag: %.2fs, HeartbeatLag: %.2fs",
mgtr.migrationContext.GetCurrentLagDuration().Seconds(),
mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
)
if mgtr.migrationContext.IsMoveTablesMode() {
lagStatus = fmt.Sprintf("WriterLag: %.2fs", mgtr.migrationContext.GetBinlogWriterLag().Seconds())
}

status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; %s, State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied),
len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue),
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates.DisplayString(),
mgtr.migrationContext.GetCurrentLagDuration().Seconds(),
mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
lagStatus,
state,
eta,
)
Expand Down Expand Up @@ -2242,6 +2257,24 @@ func (mgtr *Migrator) initiateStreaming() error {
mgtr.migrationContext.SetRecentBinlogCoordinates(mgtr.eventsStreamer.GetCurrentBinlogCoordinates())
}
}()

// In move-tables mode there is no changelog heartbeat. Writer lag is derived
// from binlog-header timestamps of applied events; when the streamer has been
// silent for the heartbeat interval, treat that silence as "caught up" and bump
// the last-applied timestamp to now so lag does not climb forever while idle.
if mgtr.migrationContext.IsMoveTablesMode() {
go func() {
interval := time.Duration(mgtr.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 {
return
}
mgtr.migrationContext.BumpBinlogWriterLagIfIdle(interval)
}
}()
}
return nil
}

Expand All @@ -2258,6 +2291,11 @@ func (mgtr *Migrator) addDMLEventsListener() error {
mgtr.migrationContext.DatabaseName,
originalTableName,
func(dmlEntry *binlog.BinlogEntry) error {
// Record that the streamer just delivered an event for the moved table,
// so the idle-bump rule can tell "falling behind" from "source is quiet".
if mgtr.migrationContext.IsMoveTablesMode() {
mgtr.migrationContext.MarkBinlogEventStreamed()
}
// Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits
// This is critical because this callback blocks the event streamer
return base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.applyEventsQueue, newApplyEventStructByDML(dmlEntry))
Expand Down Expand Up @@ -2492,6 +2530,7 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
if eventStruct.dmlEvent != nil {
dmlEvents := [](*binlog.BinlogDMLEvent){}
dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
lastEventTimestamp := eventStruct.eventTimestamp
var nonDmlStructToApply *applyEventStruct

availableEvents := len(mgtr.applyEventsQueue)
Expand All @@ -2509,6 +2548,7 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
break
}
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
lastEventTimestamp = additionalStruct.eventTimestamp
}
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
var applyEventFunc tableWriteFunc = func() error {
Expand All @@ -2522,6 +2562,12 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
mgtr.applier.CurrentCoordinates = eventStruct.coords
mgtr.applier.CurrentCoordinatesMutex.Unlock()

// In move-tables mode there is no changelog heartbeat; writer lag is derived
// from the binlog-header timestamp of the last event we just applied.
if mgtr.migrationContext.IsMoveTablesMode() && !lastEventTimestamp.IsZero() {
mgtr.migrationContext.UpdateLastAppliedBinlogEventTime(lastEventTimestamp)
}

if nonDmlStructToApply != nil {
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
// We need to handle it!
Expand Down
6 changes: 3 additions & 3 deletions script/move-tables/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ script/move-tables/setup

Verify data is present in the source cluster.
```bash
script/move-tables/mysql-source-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;"
script/move-tables/mysql-source-primary -D test -e "SELECT * FROM gh_ost_test;"
```

Verify the empty database is present in the target cluster.
```bash
script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SHOW TABLES;"
script/move-tables/mysql-target-primary -D test -e "SHOW TABLES;"
```

### Testing `gh-ost`
Expand All @@ -24,7 +24,7 @@ script/build --cli

Run gh-ost to move tables:
```bash
./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose --checkpoint --checkpoint-seconds 10 --initially-drop-socket-file
./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=test --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=test --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose --checkpoint --checkpoint-seconds 10 --initially-drop-socket-file
```

Start continuous inserts against the source.
Expand Down
2 changes: 1 addition & 1 deletion script/move-tables/insert-source-primary-loop
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ start_i="${1:-100000}"
delay="${2:-0.2}"
rows_per_batch="${3:-1}"
i="$start_i"
DATABASE="${DATABASE:-gh_ost_test_db}
DATABASE="${DATABASE:-test}"

echo "Starting continuous inserts on source primary. Press Ctrl+C to stop."
echo "start_column1=$start_i sleep_seconds=$delay rows_per_batch=$rows_per_batch"
Expand Down
4 changes: 2 additions & 2 deletions script/move-tables/reset
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ set -euo pipefail

GH_OST_ROOT=$(git rev-parse --show-toplevel)
SCRIPT_PATH="${GH_OST_ROOT}/script/move-tables"
DATABASE_NAME="${GH_OST_TEST_DB:-gh_ost_test_db}"
DATABASE_NAME="${GH_OST_TEST_DB:-test}"

# Reset source table state regardless of whether cutover renamed it.
${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" -e "DROP TABLE IF EXISTS _gh_ost_test_del, gh_ost_test;"

# Recreate and seed source table data, same fixture as setup uses.
${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" < "${GH_OST_ROOT}/localtests/move-tables/create.sql"
${SCRIPT_PATH}/mysql-source-primary -D "${DATABASE_NAME}" < "${GH_OST_ROOT}/localtests/move-tables/single/create.sql"

${SCRIPT_PATH}/mysql-target-primary -D "${DATABASE_NAME}" -e "DROP TABLE IF EXISTS gh_ost_test, _gh_ost_test_ghk;"

Expand Down
Loading