-
Notifications
You must be signed in to change notification settings - Fork 3
Add ability to log timings of some events... #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
4d5e675
b0e2bda
e572d5d
f257da9
bcb1d88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| cmd/rdsync/rdsync | ||
| tests/logs/ | ||
| .idea | ||
| .DS_Store |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| package app | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "os/exec" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/yandex/rdsync/internal/config" | ||
| ) | ||
|
|
||
| // timingEvent represents a single timing event to be reported | ||
| type timingEvent struct { | ||
| eventType string | ||
| duration time.Duration | ||
| } | ||
|
|
||
| // TimingReporter handles reporting event durations to an external program | ||
| type TimingReporter struct { | ||
| ctx context.Context | ||
| logger *slog.Logger | ||
| events chan timingEvent | ||
| cancel context.CancelFunc | ||
| command string | ||
| argsFmt []string | ||
| } | ||
|
|
||
| func newTimingReporter(conf *config.Config, logger *slog.Logger) *TimingReporter { | ||
| if conf.EventTimingNotifyCommand == "" { | ||
| return nil | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| r := &TimingReporter{ | ||
| command: conf.EventTimingNotifyCommand, | ||
| argsFmt: conf.EventTimingNotifyArgs, | ||
| logger: logger, | ||
| events: make(chan timingEvent, 64), // buffered channel to prevent blocking | ||
| ctx: ctx, | ||
| cancel: cancel, | ||
| } | ||
|
|
||
| // Start worker goroutine to process events | ||
| go r.worker() | ||
|
|
||
| return r | ||
| } | ||
|
|
||
| // reportTiming sends an event duration to the external program asynchronously. | ||
| // If the reporter is nil (not configured), this is a no-op. | ||
| // Never blocks the caller — uses a buffered channel. | ||
| func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) { | ||
| if r == nil { | ||
| return | ||
| } | ||
|
|
||
| // Non-blocking send - drop event if channel is full | ||
| select { | ||
| case r.events <- timingEvent{eventType: eventType, duration: duration}: | ||
| default: | ||
| r.logger.Warn("Timing reporter: event channel full, dropping event", | ||
| slog.String("event", eventType)) | ||
| } | ||
| } | ||
|
|
||
| // Close shuts down the reporter and waits for pending events to be processed | ||
| func (r *TimingReporter) Close() { | ||
| if r == nil { | ||
| return | ||
| } | ||
|
|
||
| // Signal worker to stop accepting new events | ||
| r.cancel() | ||
|
|
||
| // Close the channel to signal worker to finish processing | ||
| close(r.events) | ||
| } | ||
|
|
||
| // worker processes events from the channel | ||
| func (r *TimingReporter) worker() { | ||
| for { | ||
| select { | ||
| case <-r.ctx.Done(): | ||
| // Drain remaining events before exiting | ||
| for event := range r.events { | ||
| r.send(event.eventType, event.duration) | ||
| } | ||
| return | ||
| case event, ok := <-r.events: | ||
| if !ok { | ||
| return | ||
| } | ||
| r.send(event.eventType, event.duration) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (r *TimingReporter) send(eventType string, duration time.Duration) { | ||
| // Build placeholder map | ||
| replacements := map[string]string{ | ||
| "{event}": eventType, | ||
| "{duration_ms}": fmt.Sprintf("%d", duration.Milliseconds()), | ||
| } | ||
|
|
||
| // Apply replacements to each argument | ||
| args := make([]string, len(r.argsFmt)) | ||
| for i, argTemplate := range r.argsFmt { | ||
| arg := argTemplate | ||
| for placeholder, value := range replacements { | ||
| arg = strings.ReplaceAll(arg, placeholder, value) | ||
| } | ||
| args[i] = arg | ||
| } | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() | ||
|
|
||
| cmd := exec.CommandContext(ctx, r.command, args...) | ||
| output, err := cmd.CombinedOutput() | ||
| if err != nil { | ||
| r.logger.Warn("Timing reporter: external command failed", | ||
| slog.String("event", eventType), | ||
| slog.Any("error", err), | ||
| slog.String("output", string(output)), | ||
| slog.String("command", r.command), | ||
| slog.Any("args", args)) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| package app | ||
|
|
||
| import ( | ||
| "log/slog" | ||
| "os" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| "github.com/yandex/rdsync/internal/config" | ||
| ) | ||
|
|
||
| func TestNewTimingReporterNil(t *testing.T) { | ||
| conf := &config.Config{ | ||
| EventTimingNotifyCommand: "", // empty command | ||
| EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"}, | ||
| } | ||
| logger := slog.Default() | ||
|
|
||
| reporter := newTimingReporter(conf, logger) | ||
| require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingNotifyCommand is empty") | ||
| } | ||
|
|
||
| func TestReportTimingNilSafe(t *testing.T) { | ||
| var reporter *TimingReporter // nil reporter | ||
|
|
||
| // Should not panic when calling reportTiming on nil reporter | ||
| require.NotPanics(t, func() { | ||
| reporter.reportTiming("test_event", 100*time.Millisecond) | ||
| }, "reportTiming should be nil-safe and not panic") | ||
|
|
||
| // Should not panic when calling Close on nil reporter | ||
| require.NotPanics(t, func() { | ||
| reporter.Close() | ||
| }, "Close should be nil-safe and not panic") | ||
| } | ||
|
|
||
| func TestReportTimingSendsEvent(t *testing.T) { | ||
| conf := &config.Config{ | ||
| EventTimingNotifyCommand: "true", // command that always succeeds | ||
| EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"}, | ||
| } | ||
| logger := slog.Default() | ||
|
|
||
| reporter := newTimingReporter(conf, logger) | ||
| require.NotNil(t, reporter) | ||
| defer reporter.Close() | ||
|
|
||
| // Send an event | ||
| reporter.reportTiming("test_event", 150*time.Millisecond) | ||
|
|
||
| // Give worker time to process the event | ||
| time.Sleep(100 * time.Millisecond) | ||
|
|
||
| // If we reach here without panic or deadlock, the test passes | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure this test is meaningful? We don't even check that the command was executed. |
||
| } | ||
|
|
||
| func TestReportTimingChannelFull(t *testing.T) { | ||
| conf := &config.Config{ | ||
| EventTimingNotifyCommand: "sleep", | ||
| EventTimingNotifyArgs: []string{"10"}, // slow command to keep worker busy | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sleep in linux could sleep for infinity. |
||
| } | ||
| logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) | ||
|
|
||
| reporter := newTimingReporter(conf, logger) | ||
| require.NotNil(t, reporter) | ||
| defer reporter.Close() | ||
|
|
||
| // Fill the channel with 64 events (the buffer size) | ||
| for i := 0; i < 64; i++ { | ||
| reporter.reportTiming("fill_event", time.Duration(i)*time.Millisecond) | ||
| } | ||
|
|
||
| // Try to send one more event - it should not block (will be dropped) | ||
| done := make(chan bool, 1) | ||
| go func() { | ||
| reporter.reportTiming("overflow_event", 999*time.Millisecond) | ||
| done <- true | ||
| }() | ||
|
|
||
| // Wait with timeout to ensure it doesn't block | ||
| select { | ||
| case <-done: | ||
| // Success - the call returned immediately (event was dropped) | ||
| case <-time.After(1 * time.Second): | ||
| t.Fatal("reportTiming blocked when channel was full - should have dropped the event") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,6 +143,11 @@ func (app *App) stateManager() appState { | |
| if app.nodeFailTime[master].IsZero() { | ||
| app.nodeFailTime[master] = time.Now() | ||
| } | ||
| // Report master unavailability duration when failover is approved | ||
| if failTime, ok := app.nodeFailTime[master]; ok { | ||
| dur := time.Since(failTime) | ||
| app.timings.reportTiming("master_unavailable", dur) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should move all such strings into constants to ease the maintenance (imagine that we need to rename the string). |
||
| } | ||
| err = app.approveFailover(shardState, activeNodes, master) | ||
| if err == nil { | ||
| app.logger.Info("Failover approved") | ||
|
|
@@ -218,6 +223,11 @@ func (app *App) stateManager() appState { | |
| } | ||
| return stateCandidate | ||
| } | ||
| // Report master unavailability duration when master recovers | ||
| if failTime, ok := app.nodeFailTime[master]; ok { | ||
| dur := time.Since(failTime) | ||
| app.timings.reportTiming("master_unavailable", dur) | ||
| } | ||
| delete(app.nodeFailTime, master) | ||
| delete(app.splitTime, master) | ||
| app.repairShard(shardState, activeNodes, master) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
64 is reused later in test. Why isn't it a constant?