diff --git a/.gitignore b/.gitignore index 0e32eb6..3f6b8fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ cmd/rdsync/rdsync tests/logs/ .idea +.DS_Store diff --git a/internal/app/app.go b/internal/app/app.go index a460dd2..029293a 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -19,18 +19,19 @@ import ( // App is main application structure type App struct { - dcs dcs.DCS + dcsDivergeTime time.Time + replFailTime time.Time critical atomic.Value ctx context.Context - nodeFailTime map[string]time.Time + dcs dcs.DCS + config *config.Config splitTime map[string]time.Time - dcsDivergeTime time.Time - replFailTime time.Time logger *slog.Logger - config *config.Config + nodeFailTime map[string]time.Time shard *valkey.Shard cache *valkey.SentiCacheNode daemonLock *flock.Flock + timings *TimingReporter mode appMode aofMode aofMode state appState @@ -95,6 +96,7 @@ func NewApp(configFile, logLevel string) (*App, error) { config: conf, } app.critical.Store(false) + app.timings = newTimingReporter(conf, logger) return app, nil } @@ -130,6 +132,9 @@ func (app *App) unlockDaemonFile() { func (app *App) Run() int { app.lockDaemonFile() defer app.unlockDaemonFile() + + defer app.timings.Close() + err := app.connectDCS() if err != nil { app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) diff --git a/internal/app/event_reporter.go b/internal/app/event_reporter.go new file mode 100644 index 0000000..a8a1513 --- /dev/null +++ b/internal/app/event_reporter.go @@ -0,0 +1,130 @@ +package app + +import ( + "context" + "fmt" + "log/slog" + "os/exec" + "strings" + "time" + + "github.com/yandex/rdsync/internal/config" +) + +// timingEvent represents a single timing event to be reported +type timingEvent struct { + eventType string + duration time.Duration +} + +// TimingReporter handles reporting event durations to an external program +type TimingReporter struct { + ctx context.Context + logger *slog.Logger + events chan timingEvent + cancel context.CancelFunc + command string + argsFmt []string +} + +func newTimingReporter(conf *config.Config, logger *slog.Logger) *TimingReporter { + if conf.EventTimingNotifyCommand == "" { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + r := &TimingReporter{ + command: conf.EventTimingNotifyCommand, + argsFmt: conf.EventTimingNotifyArgs, + logger: logger, + events: make(chan timingEvent, 64), // buffered channel to prevent blocking + ctx: ctx, + cancel: cancel, + } + + // Start worker goroutine to process events + go r.worker() + + return r +} + +// reportTiming sends an event duration to the external program asynchronously. +// If the reporter is nil (not configured), this is a no-op. +// Never blocks the caller — uses a buffered channel. +func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) { + if r == nil { + return + } + + // Non-blocking send - drop event if channel is full + select { + case r.events <- timingEvent{eventType: eventType, duration: duration}: + default: + r.logger.Warn("Timing reporter: event channel full, dropping event", + slog.String("event", eventType)) + } +} + +// Close shuts down the reporter and waits for pending events to be processed +func (r *TimingReporter) Close() { + if r == nil { + return + } + + // Signal worker to stop accepting new events + r.cancel() + + // Close the channel to signal worker to finish processing + close(r.events) +} + +// worker processes events from the channel +func (r *TimingReporter) worker() { + for { + select { + case <-r.ctx.Done(): + // Drain remaining events before exiting + for event := range r.events { + r.send(event.eventType, event.duration) + } + return + case event, ok := <-r.events: + if !ok { + return + } + r.send(event.eventType, event.duration) + } + } +} + +func (r *TimingReporter) send(eventType string, duration time.Duration) { + // Build placeholder map + replacements := map[string]string{ + "{event}": eventType, + "{duration_ms}": fmt.Sprintf("%d", duration.Milliseconds()), + } + + // Apply replacements to each argument + args := make([]string, len(r.argsFmt)) + for i, argTemplate := range r.argsFmt { + arg := argTemplate + for placeholder, value := range replacements { + arg = strings.ReplaceAll(arg, placeholder, value) + } + args[i] = arg + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, r.command, args...) + output, err := cmd.CombinedOutput() + if err != nil { + r.logger.Warn("Timing reporter: external command failed", + slog.String("event", eventType), + slog.Any("error", err), + slog.String("output", string(output)), + slog.String("command", r.command), + slog.Any("args", args)) + } +} diff --git a/internal/app/event_reporter_test.go b/internal/app/event_reporter_test.go new file mode 100644 index 0000000..a8b5d0a --- /dev/null +++ b/internal/app/event_reporter_test.go @@ -0,0 +1,88 @@ +package app + +import ( + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/yandex/rdsync/internal/config" +) + +func TestNewTimingReporterNil(t *testing.T) { + conf := &config.Config{ + EventTimingNotifyCommand: "", // empty command + EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"}, + } + logger := slog.Default() + + reporter := newTimingReporter(conf, logger) + require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingNotifyCommand is empty") +} + +func TestReportTimingNilSafe(t *testing.T) { + var reporter *TimingReporter // nil reporter + + // Should not panic when calling reportTiming on nil reporter + require.NotPanics(t, func() { + reporter.reportTiming("test_event", 100*time.Millisecond) + }, "reportTiming should be nil-safe and not panic") + + // Should not panic when calling Close on nil reporter + require.NotPanics(t, func() { + reporter.Close() + }, "Close should be nil-safe and not panic") +} + +func TestReportTimingSendsEvent(t *testing.T) { + conf := &config.Config{ + EventTimingNotifyCommand: "true", // command that always succeeds + EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"}, + } + logger := slog.Default() + + reporter := newTimingReporter(conf, logger) + require.NotNil(t, reporter) + defer reporter.Close() + + // Send an event + reporter.reportTiming("test_event", 150*time.Millisecond) + + // Give worker time to process the event + time.Sleep(100 * time.Millisecond) + + // If we reach here without panic or deadlock, the test passes +} + +func TestReportTimingChannelFull(t *testing.T) { + conf := &config.Config{ + EventTimingNotifyCommand: "sleep", + EventTimingNotifyArgs: []string{"10"}, // slow command to keep worker busy + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + reporter := newTimingReporter(conf, logger) + require.NotNil(t, reporter) + defer reporter.Close() + + // Fill the channel with 64 events (the buffer size) + for i := 0; i < 64; i++ { + reporter.reportTiming("fill_event", time.Duration(i)*time.Millisecond) + } + + // Try to send one more event - it should not block (will be dropped) + done := make(chan bool, 1) + go func() { + reporter.reportTiming("overflow_event", 999*time.Millisecond) + done <- true + }() + + // Wait with timeout to ensure it doesn't block + select { + case <-done: + // Success - the call returned immediately (event was dropped) + case <-time.After(1 * time.Second): + t.Fatal("reportTiming blocked when channel was full - should have dropped the event") + } +} diff --git a/internal/app/manager.go b/internal/app/manager.go index 865c370..975e3fc 100644 --- a/internal/app/manager.go +++ b/internal/app/manager.go @@ -143,6 +143,11 @@ func (app *App) stateManager() appState { if app.nodeFailTime[master].IsZero() { app.nodeFailTime[master] = time.Now() } + // Report master unavailability duration when failover is approved + if failTime, ok := app.nodeFailTime[master]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("master_unavailable", dur) + } err = app.approveFailover(shardState, activeNodes, master) if err == nil { app.logger.Info("Failover approved") @@ -218,6 +223,11 @@ func (app *App) stateManager() appState { } return stateCandidate } + // Report master unavailability duration when master recovers + if failTime, ok := app.nodeFailTime[master]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("master_unavailable", dur) + } delete(app.nodeFailTime, master) delete(app.splitTime, master) app.repairShard(shardState, activeNodes, master) diff --git a/internal/app/repair.go b/internal/app/repair.go index 6eb131e..6e6dcbd 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -217,6 +217,11 @@ func (app *App) repairLocalNode(master string) bool { } } } else if !offline { + // Report node_offline duration when node comes back online naturally + if failTime, ok := app.nodeFailTime[local.FQDN()]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("node_offline", dur) + } delete(app.nodeFailTime, local.FQDN()) } @@ -312,6 +317,11 @@ func (app *App) repairLocalNode(master string) bool { return false } } + // Report node_offline duration when node is brought back online after repair + if failTime, ok := app.nodeFailTime[local.FQDN()]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("node_offline", dur) + } err = local.SetOnline(app.ctx) if err != nil { app.logger.Error("Unable to set local node online", slog.Any("error", err)) diff --git a/internal/app/switchover.go b/internal/app/switchover.go index 49f479e..dd254f8 100644 --- a/internal/app/switchover.go +++ b/internal/app/switchover.go @@ -70,6 +70,15 @@ func (app *App) failSwitchover(switchover *Switchover, err error) error { switchover.Result.Ok = false switchover.Result.Error = err.Error() switchover.Result.FinishedAt = time.Now() + + // Report failure timing + dur := time.Since(switchover.StartedAt) + eventName := "switchover_failed" + if switchover.Cause == CauseAuto { + eventName = "failover_failed" + } + app.timings.reportTiming(eventName, dur) + return app.dcs.Set(pathCurrentSwitch, switchover) } @@ -99,6 +108,14 @@ func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error if switchErr != nil { switchover.Result.Error = switchErr.Error() + } else { + // Report success timing + dur := time.Since(switchover.StartedAt) + eventName := "switchover_complete" + if switchover.Cause == CauseAuto { + eventName = "failover_complete" + } + app.timings.reportTiming(eventName, dur) } err := app.dcs.Delete(pathCurrentSwitch) diff --git a/internal/config/config.go b/internal/config/config.go index a13fd99..507e48e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -62,23 +62,25 @@ type SentinelModeConfig struct { // Config contains rdsync application configuration type Config struct { - Mode string `yaml:"mode"` - InfoFile string `yaml:"info_file"` - Hostname string `yaml:"hostname"` - LogLevel string `yaml:"loglevel"` - AofMode string `yaml:"aof_mode"` - MaintenanceFile string `yaml:"maintenance_file"` - DaemonLockFile string `yaml:"daemon_lock_file"` - PprofAddr string `yaml:"pprof_addr"` - SentinelMode SentinelModeConfig `yaml:"sentinel_mode"` - Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"` - Valkey ValkeyConfig `yaml:"valkey"` - HealthCheckInterval time.Duration `yaml:"healthcheck_interval"` - InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"` - InactivationDelay time.Duration `yaml:"inactivation_delay"` - DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"` - TickInterval time.Duration `yaml:"tick_interval"` - PingStable int `yaml:"ping_stable"` + Mode string `yaml:"mode"` + InfoFile string `yaml:"info_file"` + Hostname string `yaml:"hostname"` + LogLevel string `yaml:"loglevel"` + AofMode string `yaml:"aof_mode"` + MaintenanceFile string `yaml:"maintenance_file"` + DaemonLockFile string `yaml:"daemon_lock_file"` + PprofAddr string `yaml:"pprof_addr"` + EventTimingNotifyCommand string `config:"event_timing_notify_command" yaml:"event_timing_notify_command"` + EventTimingNotifyArgs []string `config:"event_timing_notify_args" yaml:"event_timing_notify_args"` + SentinelMode SentinelModeConfig `yaml:"sentinel_mode"` + Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"` + Valkey ValkeyConfig `yaml:"valkey"` + HealthCheckInterval time.Duration `yaml:"healthcheck_interval"` + InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"` + InactivationDelay time.Duration `yaml:"inactivation_delay"` + DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"` + TickInterval time.Duration `yaml:"tick_interval"` + PingStable int `yaml:"ping_stable"` } // DefaultValkeyConfig returns default configuration for valkey connection info and params @@ -159,23 +161,25 @@ func DefaultConfig() (Config, error) { return Config{}, err } config := Config{ - AofMode: "Unspecified", - LogLevel: "Info", - Hostname: hostname, - Mode: "Sentinel", - InfoFile: "/var/run/rdsync/rdsync.info", - DaemonLockFile: "/var/run/rdsync/rdsync.lock", - MaintenanceFile: "/var/run/rdsync/rdsync.maintenance", - PingStable: 3, - TickInterval: 5 * time.Second, - InactivationDelay: 30 * time.Second, - HealthCheckInterval: 5 * time.Second, - InfoFileHandlerInterval: 30 * time.Second, - PprofAddr: "", - Zookeeper: zkConfig, - DcsWaitTimeout: 10 * time.Second, - Valkey: DefaultValkeyConfig(), - SentinelMode: sentinelConf, + AofMode: "Unspecified", + LogLevel: "Info", + Hostname: hostname, + Mode: "Sentinel", + InfoFile: "/var/run/rdsync/rdsync.info", + DaemonLockFile: "/var/run/rdsync/rdsync.lock", + MaintenanceFile: "/var/run/rdsync/rdsync.maintenance", + EventTimingNotifyCommand: "", + EventTimingNotifyArgs: []string{"{event}", "{duration_ms}"}, + PingStable: 3, + TickInterval: 5 * time.Second, + InactivationDelay: 30 * time.Second, + HealthCheckInterval: 5 * time.Second, + InfoFileHandlerInterval: 30 * time.Second, + PprofAddr: "", + Zookeeper: zkConfig, + DcsWaitTimeout: 10 * time.Second, + Valkey: DefaultValkeyConfig(), + SentinelMode: sentinelConf, } return config, nil } diff --git a/tests/features/02_cluster_switchover_from.feature b/tests/features/02_cluster_switchover_from.feature index d238a70..33457b8 100644 --- a/tests/features/02_cluster_switchover_from.feature +++ b/tests/features/02_cluster_switchover_from.feature @@ -24,6 +24,10 @@ Feature: Cluster mode switchover from old master "is_master": false } """ + And I run command on hosts "valkey1,valkey2,valkey3" + """ + rm -f /var/log/rdsync_events.log + """ When I run command on host "valkey1" """ rdsync switch --from valkey1 @@ -48,6 +52,10 @@ Feature: Cluster mode switchover from old master When I wait for "30" seconds Then path "/var/lib/valkey/appendonlydir" exists on "valkey1" Then path "/var/lib/valkey/appendonlydir" does not exist on "{{.new_master}}" + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + switchover_complete \d+ + """ Scenario: Cluster mode switchover (from) with unhealthy replicas is rejected Given clustered shard is up and running diff --git a/tests/features/02_sentinel_switchover_from.feature b/tests/features/02_sentinel_switchover_from.feature index 8bde22d..ae1651b 100644 --- a/tests/features/02_sentinel_switchover_from.feature +++ b/tests/features/02_sentinel_switchover_from.feature @@ -24,6 +24,10 @@ Feature: Sentinel mode switchover from old master "is_master": false } """ + And I run command on hosts "valkey1,valkey2,valkey3" + """ + rm -f /var/log/rdsync_events.log + """ When I run command on host "valkey1" """ rdsync switch --from valkey1 @@ -51,6 +55,10 @@ Feature: Sentinel mode switchover from old master When I wait for "30" seconds Then path "/var/lib/valkey/appendonlydir" exists on "valkey1" Then path "/var/lib/valkey/appendonlydir" does not exist on "{{.new_master}}" + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + switchover_complete \d+ + """ Scenario: Sentinel mode switchover with unhealthy replicas is rejected Given sentinel shard is up and running diff --git a/tests/features/04_cluster_failover.feature b/tests/features/04_cluster_failover.feature index ddfab9d..d0fe113 100644 --- a/tests/features/04_cluster_failover.feature +++ b/tests/features/04_cluster_failover.feature @@ -24,6 +24,10 @@ Feature: Cluster mode failover from dead master "is_master": false } """ + And I run command on hosts "valkey1,valkey2,valkey3" + """ + rm -f /var/log/rdsync_events.log + """ When host "valkey1" is stopped Then valkey host "valkey1" should become unavailable within "10" seconds And zookeeper node "/test/manager" should match regexp within "30" seconds @@ -43,6 +47,10 @@ Feature: Cluster mode failover from dead master When I get zookeeper node "/test/master" And I save zookeeper query result as "new_master" Then valkey host "{{.new_master}}" should be master + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + failover_complete \d+ + """ When host "valkey1" is started Then valkey host "valkey1" should become available within "20" seconds And valkey host "valkey1" should become replica of "{{.new_master}}" within "30" seconds diff --git a/tests/features/04_sentinel_failover.feature b/tests/features/04_sentinel_failover.feature index 2d5b9bb..18862c6 100644 --- a/tests/features/04_sentinel_failover.feature +++ b/tests/features/04_sentinel_failover.feature @@ -24,6 +24,10 @@ Feature: Sentinel mode failover from dead master "is_master": false } """ + And I run command on hosts "valkey1,valkey2,valkey3" + """ + rm -f /var/log/rdsync_events.log + """ When host "valkey1" is stopped Then valkey host "valkey1" should become unavailable within "10" seconds And zookeeper node "/test/manager" should match regexp within "30" seconds @@ -43,6 +47,10 @@ Feature: Sentinel mode failover from dead master When I get zookeeper node "/test/master" And I save zookeeper query result as "new_master" Then valkey host "{{.new_master}}" should be master + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + failover_complete \d+ + """ When host "valkey1" is started Then valkey host "valkey1" should become available within "20" seconds And valkey host "valkey1" should become replica of "{{.new_master}}" within "30" seconds diff --git a/tests/images/valkey/rdsync_cluster.yaml b/tests/images/valkey/rdsync_cluster.yaml index 7c30d0f..9a8a0ea 100644 --- a/tests/images/valkey/rdsync_cluster.yaml +++ b/tests/images/valkey/rdsync_cluster.yaml @@ -5,6 +5,8 @@ pprof_addr: ":8081" info_file: /var/run/rdsync.info maintenance_file: /var/run/rdsync.maintenance daemon_lock_file: /var/run/rdsync.lock +event_timing_notify_command: "sh" +event_timing_notify_args: ["-c", "echo {event} {duration_ms} >> /var/log/rdsync_events.log"] valkey: auth_password: functestpassword restart_command: supervisorctl restart valkey diff --git a/tests/images/valkey/rdsync_sentinel.yaml b/tests/images/valkey/rdsync_sentinel.yaml index eea26af..082d3db 100644 --- a/tests/images/valkey/rdsync_sentinel.yaml +++ b/tests/images/valkey/rdsync_sentinel.yaml @@ -5,6 +5,8 @@ pprof_addr: ":8081" info_file: /var/run/rdsync.info maintenance_file: /var/run/rdsync.maintenance daemon_lock_file: /var/run/rdsync.lock +event_timing_notify_command: "sh" +event_timing_notify_args: ["-c", "echo {event} {duration_ms} >> /var/log/rdsync_events.log"] valkey: auth_password: functestpassword restart_command: supervisorctl restart valkey diff --git a/tests/rdsync_test.go b/tests/rdsync_test.go index 2f00e67..5c30d9a 100644 --- a/tests/rdsync_test.go +++ b/tests/rdsync_test.go @@ -685,6 +685,23 @@ func (tctx *testContext) stepIRunCommandOnHost(host string, body *godog.DocStrin return err } +func (tctx *testContext) stepIRunCommandOnHosts(hostsStr string, body *godog.DocString) error { + cmd := strings.TrimSpace(body.Content) + hosts := strings.Split(hostsStr, ",") + for i := range hosts { + hosts[i] = strings.TrimSpace(hosts[i]) + } + + for _, host := range hosts { + var err error + tctx.commandRetcode, tctx.commandOutput, err = tctx.composer.RunCommand(host, cmd, 10*time.Second) + if err != nil { + return fmt.Errorf("failed to run command on host %s: %s", host, err) + } + } + return nil +} + func (tctx *testContext) stepIRunAsyncCommandOnHost(host string, body *godog.DocString) error { cmd := strings.TrimSpace(body.Content) return tctx.composer.RunAsyncCommand(host, cmd) @@ -1052,6 +1069,60 @@ func (tctx *testContext) stepInfoFileOnHostMatch(filepath, host, matcher string, return err } +func (tctx *testContext) stepFileOnHostsShouldMatchRegexpWithin(filepath, hostsStr string, timeout int, body *godog.DocString) error { + pattern := strings.TrimSpace(body.Content) + matcher, err := matchers.GetMatcher("regexp") + if err != nil { + return err + } + + hosts := strings.Split(hostsStr, ",") + for i := range hosts { + hosts[i] = strings.TrimSpace(hosts[i]) + } + + var lastError error + var outputs map[string]string + found := false + + testutil.Retry(func() bool { + outputs = make(map[string]string) + + for _, host := range hosts { + cmd := fmt.Sprintf("cat '%s'", filepath) + _, output, cmdErr := tctx.composer.RunCommand(host, cmd, 10*time.Second) + if cmdErr != nil { + outputs[host] = fmt.Sprintf("error: %s", cmdErr) + continue + } + outputs[host] = output + + // Check if this host's output matches + if matchErr := matcher(output, pattern); matchErr == nil { + found = true + return true // Success! Found a match + } + } + return false // No match yet, keep retrying + }, time.Duration(timeout)*time.Second, time.Second) + + if found { + return nil + } + + // If we get here, no host matched within the timeout + var details strings.Builder + details.WriteString(fmt.Sprintf("file %s did not match pattern on any host after %d seconds.\n", filepath, timeout)) + details.WriteString("Pattern:\n") + details.WriteString(fmt.Sprintf(" %s\n", pattern)) + details.WriteString("Outputs from each host:\n") + for host, output := range outputs { + details.WriteString(fmt.Sprintf(" %s: %s\n", host, output)) + } + lastError = fmt.Errorf("%s", details.String()) + return lastError +} + func InitializeScenario(s *godog.ScenarioContext) { tctx, err := newTestContext() if err != nil { @@ -1115,6 +1186,7 @@ func InitializeScenario(s *godog.ScenarioContext) { // command execution s.Step(`^I run command on host "([^"]*)"$`, tctx.stepIRunCommandOnHost) + s.Step(`^I run command on hosts "([^"]*)"$`, tctx.stepIRunCommandOnHosts) s.Step(`^I run command on host "([^"]*)" with timeout "(\d+)" seconds$`, tctx.stepIRunCommandOnHostWithTimeout) s.Step(`^I run async command on host "([^"]*)"$`, tctx.stepIRunAsyncCommandOnHost) s.Step(`^I run command on host "([^"]*)" until result match regexp "([^"]*)" with timeout "(\d+)" seconds$`, tctx.stepIRunCommandOnHostUntilResultMatch) @@ -1168,6 +1240,7 @@ func InitializeScenario(s *godog.ScenarioContext) { // misc s.Step(`^I wait for "(\d+)" seconds$`, tctx.stepIWaitFor) s.Step(`^info file "([^"]*)" on "([^"]*)" match (\w+)$`, tctx.stepInfoFileOnHostMatch) + s.Step(`^file "([^"]*)" on any of hosts "([^"]*)" should match regexp within "(\d+)" seconds$`, tctx.stepFileOnHostsShouldMatchRegexpWithin) } func TestRdsync(t *testing.T) {