Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cmd/rdsync/rdsync
tests/logs/
.idea
.DS_Store
15 changes: 10 additions & 5 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (

// App is main application structure
type App struct {
dcs dcs.DCS
dcsDivergeTime time.Time
replFailTime time.Time
critical atomic.Value
ctx context.Context
nodeFailTime map[string]time.Time
dcs dcs.DCS
config *config.Config
splitTime map[string]time.Time
dcsDivergeTime time.Time
replFailTime time.Time
logger *slog.Logger
config *config.Config
nodeFailTime map[string]time.Time
shard *valkey.Shard
cache *valkey.SentiCacheNode
daemonLock *flock.Flock
timings *TimingReporter
mode appMode
aofMode aofMode
state appState
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
config: conf,
}
app.critical.Store(false)
app.timings = newTimingReporter(conf, logger)
return app, nil
}

Expand Down Expand Up @@ -130,6 +132,9 @@ func (app *App) unlockDaemonFile() {
func (app *App) Run() int {
app.lockDaemonFile()
defer app.unlockDaemonFile()

defer app.timings.Close()

err := app.connectDCS()
if err != nil {
app.logger.Error("Unable to connect to dcs", slog.Any("error", err))
Expand Down
130 changes: 130 additions & 0 deletions internal/app/event_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package app

import (
"context"
"fmt"
"log/slog"
"os/exec"
"strings"
"time"

"github.com/yandex/rdsync/internal/config"
)

// timingEvent represents a single timing event to be reported
type timingEvent struct {
eventType string
duration time.Duration
}

// TimingReporter handles reporting event durations to an external program
type TimingReporter struct {
ctx context.Context
logger *slog.Logger
events chan timingEvent
cancel context.CancelFunc
command string
argsFmt []string
}

func newTimingReporter(conf *config.Config, logger *slog.Logger) *TimingReporter {
if conf.EventTimingNotifyCommand == "" {
return nil
}

ctx, cancel := context.WithCancel(context.Background())
r := &TimingReporter{
command: conf.EventTimingNotifyCommand,
argsFmt: conf.EventTimingNotifyArgs,
logger: logger,
events: make(chan timingEvent, 64), // buffered channel to prevent blocking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

64 is reused later in test. Why isn't it a constant?

ctx: ctx,
cancel: cancel,
}

// Start worker goroutine to process events
go r.worker()

return r
}

// reportTiming sends an event duration to the external program asynchronously.
// If the reporter is nil (not configured), this is a no-op.
// Never blocks the caller — uses a buffered channel.
func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) {
if r == nil {
return
}

// Non-blocking send - drop event if channel is full
select {
case r.events <- timingEvent{eventType: eventType, duration: duration}:
default:
r.logger.Warn("Timing reporter: event channel full, dropping event",
slog.String("event", eventType))
}
}

// Close shuts down the reporter and waits for pending events to be processed
func (r *TimingReporter) Close() {
if r == nil {
return
}

// Signal worker to stop accepting new events
r.cancel()

// Close the channel to signal worker to finish processing
close(r.events)
}

// worker processes events from the channel
func (r *TimingReporter) worker() {
for {
select {
case <-r.ctx.Done():
// Drain remaining events before exiting
for event := range r.events {
r.send(event.eventType, event.duration)
}
return
case event, ok := <-r.events:
if !ok {
return
}
r.send(event.eventType, event.duration)
}
}
}

func (r *TimingReporter) send(eventType string, duration time.Duration) {
// Build placeholder map
replacements := map[string]string{
"{event}": eventType,
"{duration_ms}": fmt.Sprintf("%d", duration.Milliseconds()),
}

// Apply replacements to each argument
args := make([]string, len(r.argsFmt))
for i, argTemplate := range r.argsFmt {
arg := argTemplate
for placeholder, value := range replacements {
arg = strings.ReplaceAll(arg, placeholder, value)
}
args[i] = arg
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

cmd := exec.CommandContext(ctx, r.command, args...)
output, err := cmd.CombinedOutput()
if err != nil {
r.logger.Warn("Timing reporter: external command failed",
slog.String("event", eventType),
slog.Any("error", err),
slog.String("output", string(output)),
slog.String("command", r.command),
slog.Any("args", args))
}
}
88 changes: 88 additions & 0 deletions internal/app/event_reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package app

import (
"log/slog"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/yandex/rdsync/internal/config"
)

func TestNewTimingReporterNil(t *testing.T) {
conf := &config.Config{
EventTimingNotifyCommand: "", // empty command
EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"},
}
logger := slog.Default()

reporter := newTimingReporter(conf, logger)
require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingNotifyCommand is empty")
}

func TestReportTimingNilSafe(t *testing.T) {
var reporter *TimingReporter // nil reporter

// Should not panic when calling reportTiming on nil reporter
require.NotPanics(t, func() {
reporter.reportTiming("test_event", 100*time.Millisecond)
}, "reportTiming should be nil-safe and not panic")

// Should not panic when calling Close on nil reporter
require.NotPanics(t, func() {
reporter.Close()
}, "Close should be nil-safe and not panic")
}

func TestReportTimingSendsEvent(t *testing.T) {
conf := &config.Config{
EventTimingNotifyCommand: "true", // command that always succeeds
EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"},
}
logger := slog.Default()

reporter := newTimingReporter(conf, logger)
require.NotNil(t, reporter)
defer reporter.Close()

// Send an event
reporter.reportTiming("test_event", 150*time.Millisecond)

// Give worker time to process the event
time.Sleep(100 * time.Millisecond)

// If we reach here without panic or deadlock, the test passes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this test is meaningful? We don't even check that the command was executed.

}

func TestReportTimingChannelFull(t *testing.T) {
conf := &config.Config{
EventTimingNotifyCommand: "sleep",
EventTimingNotifyArgs: []string{"10"}, // slow command to keep worker busy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleep in linux could sleep for infinity.

}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))

reporter := newTimingReporter(conf, logger)
require.NotNil(t, reporter)
defer reporter.Close()

// Fill the channel with 64 events (the buffer size)
for i := 0; i < 64; i++ {
reporter.reportTiming("fill_event", time.Duration(i)*time.Millisecond)
}

// Try to send one more event - it should not block (will be dropped)
done := make(chan bool, 1)
go func() {
reporter.reportTiming("overflow_event", 999*time.Millisecond)
done <- true
}()

// Wait with timeout to ensure it doesn't block
select {
case <-done:
// Success - the call returned immediately (event was dropped)
case <-time.After(1 * time.Second):
t.Fatal("reportTiming blocked when channel was full - should have dropped the event")
}
}
10 changes: 10 additions & 0 deletions internal/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (app *App) stateManager() appState {
if app.nodeFailTime[master].IsZero() {
app.nodeFailTime[master] = time.Now()
}
// Report master unavailability duration when failover is approved
if failTime, ok := app.nodeFailTime[master]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("master_unavailable", dur)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should move all such strings into constants to ease the maintenance (imagine that we need to rename the string).

}
err = app.approveFailover(shardState, activeNodes, master)
if err == nil {
app.logger.Info("Failover approved")
Expand Down Expand Up @@ -218,6 +223,11 @@ func (app *App) stateManager() appState {
}
return stateCandidate
}
// Report master unavailability duration when master recovers
if failTime, ok := app.nodeFailTime[master]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("master_unavailable", dur)
}
delete(app.nodeFailTime, master)
delete(app.splitTime, master)
app.repairShard(shardState, activeNodes, master)
Expand Down
10 changes: 10 additions & 0 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func (app *App) repairLocalNode(master string) bool {
}
}
} else if !offline {
// Report node_offline duration when node comes back online naturally
if failTime, ok := app.nodeFailTime[local.FQDN()]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("node_offline", dur)
}
delete(app.nodeFailTime, local.FQDN())
}

Expand Down Expand Up @@ -312,6 +317,11 @@ func (app *App) repairLocalNode(master string) bool {
return false
}
}
// Report node_offline duration when node is brought back online after repair
if failTime, ok := app.nodeFailTime[local.FQDN()]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("node_offline", dur)
}
err = local.SetOnline(app.ctx)
if err != nil {
app.logger.Error("Unable to set local node online", slog.Any("error", err))
Expand Down
17 changes: 17 additions & 0 deletions internal/app/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ func (app *App) failSwitchover(switchover *Switchover, err error) error {
switchover.Result.Ok = false
switchover.Result.Error = err.Error()
switchover.Result.FinishedAt = time.Now()

// Report failure timing
dur := time.Since(switchover.StartedAt)
eventName := "switchover_failed"
if switchover.Cause == CauseAuto {
eventName = "failover_failed"
}
app.timings.reportTiming(eventName, dur)

return app.dcs.Set(pathCurrentSwitch, switchover)
}

Expand Down Expand Up @@ -99,6 +108,14 @@ func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error

if switchErr != nil {
switchover.Result.Error = switchErr.Error()
} else {
// Report success timing
dur := time.Since(switchover.StartedAt)
eventName := "switchover_complete"
if switchover.Cause == CauseAuto {
eventName = "failover_complete"
}
app.timings.reportTiming(eventName, dur)
}

err := app.dcs.Delete(pathCurrentSwitch)
Expand Down
Loading
Loading