diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5c51505e5..5b377a80e 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -130,7 +130,7 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) -func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error { +func (mgtr *Migrator) sleepWhileTrue(stage string, operation func() (bool, error)) error { for { // Check for abort before continuing if err := mgtr.checkAbort(); err != nil { @@ -143,6 +143,7 @@ func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error { if !shouldSleep { return nil } + metrics.RecordSleep(mgtr.migrationContext.Metrics, stage, time.Second) time.Sleep(time.Second) } } @@ -166,7 +167,9 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo for i := 0; i < maxRetries; i++ { if i != 0 { // sleep after previous iteration - RetrySleepFn(1 * time.Second) + sleepDuration := 1 * time.Second + metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration) + RetrySleepFn(sleepDuration) } // Check for abort/context cancellation before each retry if abortErr := mgtr.checkAbort(); abortErr != nil { @@ -207,7 +210,9 @@ func (mgtr *Migrator) retryOperationWithExponentialBackoff(operation func() erro ) if i != 0 { - RetrySleepFn(time.Duration(interval) * time.Second) + sleepDuration := time.Duration(interval) * time.Second + metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration) + RetrySleepFn(sleepDuration) } // Check for abort/context cancellation before each retry if abortErr := mgtr.checkAbort(); abortErr != nil { @@ -842,6 +847,7 @@ func (mgtr *Migrator) cutOver() (err error) { mgtr.migrationContext.MarkPointOfInterest() mgtr.migrationContext.Log.Debugf("checking for cut-over postpone") if err := mgtr.sleepWhileTrue( + "cut_over_postpone", func() (bool, error) { heartbeatLag := mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&mgtr.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond @@ -1761,7 +1767,9 @@ func (mgtr *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { return chk, err } mgtr.applier.CurrentCoordinatesMutex.Unlock() - time.Sleep(500 * time.Millisecond) + sleepDuration := 500 * time.Millisecond + metrics.RecordSleep(mgtr.migrationContext.Metrics, "replica_wait", sleepDuration) + time.Sleep(sleepDuration) } } @@ -1866,7 +1874,12 @@ func (mgtr *Migrator) executeWriteFuncs() error { copyRowsDuration := time.Since(copyRowsStartTime) sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond - time.Sleep(sleepTime) + if sleepTime > 0 { + if sleepTime >= time.Millisecond { + metrics.RecordSleep(mgtr.migrationContext.Metrics, "chunk_throttle", sleepTime) + } + time.Sleep(sleepTime) + } } } default: diff --git a/go/metrics/emit.go b/go/metrics/emit.go index 86dc97556..ed9c3a506 100644 --- a/go/metrics/emit.go +++ b/go/metrics/emit.go @@ -129,3 +129,15 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) { emit.Histogram("throttle.duration_seconds", duration.Seconds(), tags...) emit.Count("throttle.events_total", 1, tags...) } + +// RecordSleep emits per-stage sleep/wait metrics (namespace is applied by the client): +// gh_ost.sleep.duration_milliseconds and gh_ost.sleep.total_milliseconds, both tagged by stage. +func RecordSleep(emit Emitter, stage string, d time.Duration) { + if emit == nil || stage == "" || d < 0 { + return + } + tags := []string{"stage:" + stage} + milliseconds := d.Milliseconds() + emit.Histogram("sleep.duration_milliseconds", float64(milliseconds), tags...) + emit.Count("sleep.total_milliseconds", milliseconds, tags...) +} diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go index 695d809d9..48de64c8b 100644 --- a/go/metrics/emit_test.go +++ b/go/metrics/emit_test.go @@ -8,6 +8,7 @@ package metrics import ( "context" "runtime" + "slices" "testing" "time" ) @@ -256,3 +257,70 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) { EmitThrottleInterval(nil, time.Second, "test") EmitThrottleInterval(&gaugeSpy{}, time.Second, "test") } + +type sleepSpy struct { + histogramNames []string + histogramValues []float64 + histogramTags [][]string + countNames []string + countValues []int64 + countTags [][]string +} + +func (s *sleepSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (s *sleepSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.histogramTags = append(s.histogramTags, tags) +} + +func (s *sleepSpy) Count(name string, value int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countValues = append(s.countValues, value) + s.countTags = append(s.countTags, tags) +} + +func TestRecordSleep(t *testing.T) { + spy := &sleepSpy{} + + RecordSleep(spy, "retry_backoff", 2*time.Second) + + if len(spy.histogramNames) != 1 { + t.Fatalf("got %d histograms, want 1", len(spy.histogramNames)) + } + if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 2000 { + t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=2000", spy.histogramNames[0], spy.histogramValues[0]) + } + if !slices.Equal(spy.histogramTags[0], []string{"stage:retry_backoff"}) { + t.Fatalf("got histogram tags %#v", spy.histogramTags[0]) + } + if len(spy.countNames) != 1 { + t.Fatalf("got %d counts, want 1", len(spy.countNames)) + } + if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 2000 { + t.Fatalf("got count %s=%v, want sleep.total_milliseconds=2000", spy.countNames[0], spy.countValues[0]) + } + if !slices.Equal(spy.countTags[0], []string{"stage:retry_backoff"}) { + t.Fatalf("got count tags %#v", spy.countTags[0]) + } +} + +func TestRecordSleepSubSecond(t *testing.T) { + spy := &sleepSpy{} + + RecordSleep(spy, "replica_wait", 500*time.Millisecond) + + if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 500 { + t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=500", spy.histogramNames[0], spy.histogramValues[0]) + } + if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 500 { + t.Fatalf("got count %s=%v, want sleep.total_milliseconds=500", spy.countNames[0], spy.countValues[0]) + } +} + +func TestRecordSleepNilSafe(t *testing.T) { + RecordSleep(nil, "retry_backoff", time.Second) + RecordSleep(&sleepSpy{}, "", time.Second) + RecordSleep(&sleepSpy{}, "retry_backoff", -time.Second) +}