diff --git a/cmd/clickhouse/check_and_finalize.go b/cmd/clickhouse/check_and_finalize.go index 3c9d0cf..696ed46 100644 --- a/cmd/clickhouse/check_and_finalize.go +++ b/cmd/clickhouse/check_and_finalize.go @@ -117,7 +117,7 @@ func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operati } } - if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, defaultRestoreTimeout, appCtx.Logger); err != nil { + if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, appCtx.JobTimeout, appCtx.Logger); err != nil { return err } diff --git a/cmd/root.go b/cmd/root.go index f2d7686..b03edea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -27,6 +27,7 @@ func addBackupConfigFlags(cmd *cobra.Command) { cmd.PersistentFlags().StringVar(&flags.ConfigMapName, "configmap", "suse-observability-backup-config", "ConfigMap name containing backup configuration") cmd.PersistentFlags().StringVar(&flags.SecretName, "secret", "suse-observability-backup-config", "Secret name containing backup configuration") cmd.PersistentFlags().StringVarP(&flags.OutputFormat, "output", "o", "table", "Output format (table, json)") + cmd.PersistentFlags().IntVar(&flags.JobTimeout, "job-timeout", config.DefaultJobTimeoutMinutes, "Timeout in minutes for waiting on job/restore completion") _ = cmd.MarkPersistentFlagRequired("namespace") } diff --git a/cmd/settings/check_and_finalize.go b/cmd/settings/check_and_finalize.go index c923fb5..3ef3dea 100644 --- a/cmd/settings/check_and_finalize.go +++ b/cmd/settings/check_and_finalize.go @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error { CleanupPVC: false, WaitForJob: waitForJob, Log: appCtx.Logger, + Timeout: appCtx.JobTimeout, }) } diff --git a/cmd/settings/list.go b/cmd/settings/list.go index 265e0d5..9bf612c 100644 --- a/cmd/settings/list.go +++ b/cmd/settings/list.go @@ -185,7 +185,7 @@ func getBackupListFromPVC(appCtx *app.Context) ([]BackupFileInfo, error) { } }() - if err := restore.WaitForJobCompletion(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger); err != nil { + if err := restore.WaitForJobCompletion(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout); err != nil { return nil, err } diff --git a/cmd/settings/restore.go b/cmd/settings/restore.go index 2bd9fb5..2a744de 100644 --- a/cmd/settings/restore.go +++ b/cmd/settings/restore.go @@ -126,13 +126,13 @@ func runRestore(appCtx *app.Context) error { return nil } - return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) + return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout) } // waitAndCleanupRestoreJob waits for job completion and cleans up resources -func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { +func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error { restore.PrintWaitingMessage(log, "settings", jobName, namespace) - return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false) + return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false, jobTimeout) } // getLatestBackup retrieves the most recent backup from all sources (S3 and PVC) diff --git a/cmd/stackgraph/check_and_finalize.go b/cmd/stackgraph/check_and_finalize.go index 2895eb0..ce959e1 100644 --- a/cmd/stackgraph/check_and_finalize.go +++ b/cmd/stackgraph/check_and_finalize.go @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error { CleanupPVC: true, WaitForJob: waitForJob, Log: appCtx.Logger, + Timeout: appCtx.JobTimeout, }) } diff --git a/cmd/stackgraph/restore.go b/cmd/stackgraph/restore.go index 1f61c42..2f8dcb8 100644 --- a/cmd/stackgraph/restore.go +++ b/cmd/stackgraph/restore.go @@ -133,13 +133,13 @@ func runRestore(appCtx *app.Context) error { return nil } - return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) + return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout) } // waitAndCleanupRestoreJob waits for job completion and cleans up resources -func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { +func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error { restore.PrintWaitingMessage(log, "stackgraph", jobName, namespace) - return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true) + return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true, jobTimeout) } // getLatestBackup retrieves the most recent backup from S3 diff --git a/cmd/victoriametrics/check_and_finalize.go b/cmd/victoriametrics/check_and_finalize.go index a20b433..bb98ffa 100644 --- a/cmd/victoriametrics/check_and_finalize.go +++ b/cmd/victoriametrics/check_and_finalize.go @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error { CleanupPVC: false, WaitForJob: waitForJob, Log: appCtx.Logger, + Timeout: appCtx.JobTimeout, }) } diff --git a/cmd/victoriametrics/restore.go b/cmd/victoriametrics/restore.go index 231eb81..de69bcc 100644 --- a/cmd/victoriametrics/restore.go +++ b/cmd/victoriametrics/restore.go @@ -130,13 +130,13 @@ func runRestore(appCtx *app.Context) error { return nil } - return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) + return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout) } // waitAndCleanupRestoreJob waits for job completion and cleans up resources -func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { +func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error { restore.PrintWaitingMessage(log, "victoria-metrics", jobName, namespace) - return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false) + return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false, jobTimeout) } // getLatestBackup retrieves the most recent backup from S3 diff --git a/internal/app/app.go b/internal/app/app.go index 7cd1d94..e5fe5e6 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse" "github.com/stackvista/stackstate-backup-cli/internal/clients/elasticsearch" @@ -16,15 +17,16 @@ import ( // Context holds all dependencies for cli commands type Context struct { - K8sClient *k8s.Client - Namespace string - S3Client s3.Interface - ESClient elasticsearch.Interface - CHClient clickhouse.Interface - Config *config.Config - Logger *logger.Logger - Formatter *output.Formatter - Context context.Context + K8sClient *k8s.Client + Namespace string + S3Client s3.Interface + ESClient elasticsearch.Interface + CHClient clickhouse.Interface + Config *config.Config + Logger *logger.Logger + Formatter *output.Formatter + Context context.Context + JobTimeout time.Duration } // NewContext creates production dependencies @@ -68,14 +70,15 @@ func NewContext(flags *config.CLIGlobalFlags) (*Context, error) { formatter := output.NewFormatter(os.Stdout, flags.OutputFormat) return &Context{ - K8sClient: k8sClient, - Namespace: flags.Namespace, - Config: cfg, - S3Client: s3Client, - ESClient: esClient, - CHClient: chClient, - Logger: logger.New(flags.Quiet, flags.Debug), - Formatter: formatter, - Context: context.Background(), + K8sClient: k8sClient, + Namespace: flags.Namespace, + Config: cfg, + S3Client: s3Client, + ESClient: esClient, + CHClient: chClient, + Logger: logger.New(flags.Quiet, flags.Debug), + Formatter: formatter, + Context: context.Background(), + JobTimeout: time.Duration(flags.JobTimeout) * time.Minute, }, nil } diff --git a/internal/foundation/config/config.go b/internal/foundation/config/config.go index 64e01d9..39430e2 100644 --- a/internal/foundation/config/config.go +++ b/internal/foundation/config/config.go @@ -341,6 +341,11 @@ func LoadConfig(clientset kubernetes.Interface, namespace, configMapName, secret return config, nil } +const ( + // DefaultJobTimeoutMinutes is the default timeout for job completion in minutes + DefaultJobTimeoutMinutes = 30 +) + type CLIGlobalFlags struct { Namespace string Kubeconfig string @@ -349,6 +354,7 @@ type CLIGlobalFlags struct { ConfigMapName string SecretName string OutputFormat string // table, json + JobTimeout int // job completion timeout in minutes } func NewCLIGlobalFlags() *CLIGlobalFlags { diff --git a/internal/orchestration/restore/finalize.go b/internal/orchestration/restore/finalize.go index ae3ba1f..9a61eea 100644 --- a/internal/orchestration/restore/finalize.go +++ b/internal/orchestration/restore/finalize.go @@ -2,6 +2,7 @@ package restore import ( "fmt" + "time" "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" @@ -74,13 +75,14 @@ type WaitAndFinalizeParams struct { ScaleSelector string CleanupPVC bool Log *logger.Logger + Timeout time.Duration } // WaitAndFinalize waits for job completion and then cleans up func WaitAndFinalize(params WaitAndFinalizeParams) error { PrintWaitingMessage(params.Log, params.ServiceName, params.JobName, params.Namespace) - if err := WaitForJobCompletion(params.K8sClient, params.Namespace, params.JobName, params.Log); err != nil { + if err := WaitForJobCompletion(params.K8sClient, params.Namespace, params.JobName, params.Log, params.Timeout); err != nil { params.Log.Errorf("Job failed: %v", err) // Still cleanup even if failed params.Log.Println() @@ -113,6 +115,7 @@ type CheckAndFinalizeParams struct { CleanupPVC bool WaitForJob bool Log *logger.Logger + Timeout time.Duration } // CheckAndFinalize checks the status of a background restore job and cleans up resources @@ -157,6 +160,7 @@ func CheckAndFinalize(params CheckAndFinalizeParams) error { ScaleSelector: params.ScaleSelector, CleanupPVC: params.CleanupPVC, Log: params.Log, + Timeout: params.Timeout, }) } diff --git a/internal/orchestration/restore/job.go b/internal/orchestration/restore/job.go index 023f84b..b41bdf3 100644 --- a/internal/orchestration/restore/job.go +++ b/internal/orchestration/restore/job.go @@ -14,8 +14,45 @@ const ( ) // WaitForJobCompletion waits for a Kubernetes job to complete -func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { - timeout := time.After(defaultJobCompletionTimeout) +func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error { + if jobTimeout <= 0 { + jobTimeout = defaultJobCompletionTimeout + } + + // Initial check before starting the ticker + checkJob := func() (bool, error) { + job, err := k8sClient.GetJob(namespace, jobName) + if err != nil { + return false, fmt.Errorf("failed to get job status: %w", err) + } + + if job.Status.Succeeded > 0 { + return true, nil + } + + if job.Status.Failed > 0 { + // Get and print logs from failed job + log.Println() + log.Errorf("Job failed. Fetching logs...") + log.Println() + if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil { + log.Warningf("Failed to fetch job logs: %v", err) + } + return false, fmt.Errorf("job failed") + } + + log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d", + job.Status.Active, job.Status.Succeeded, job.Status.Failed) + return false, nil + } + + // Perform initial check + done, err := checkJob() + if err != nil || done { + return err + } + + timeout := time.After(jobTimeout) ticker := time.NewTicker(defaultJobStatusCheckInterval) defer ticker.Stop() @@ -24,28 +61,10 @@ func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log case <-timeout: return fmt.Errorf("timeout waiting for job to complete") case <-ticker.C: - job, err := k8sClient.GetJob(namespace, jobName) - if err != nil { - return fmt.Errorf("failed to get job status: %w", err) + done, err := checkJob() + if err != nil || done { + return err } - - if job.Status.Succeeded > 0 { - return nil - } - - if job.Status.Failed > 0 { - // Get and print logs from failed job - log.Println() - log.Errorf("Job failed. Fetching logs...") - log.Println() - if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil { - log.Warningf("Failed to fetch job logs: %v", err) - } - return fmt.Errorf("job failed") - } - - log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d", - job.Status.Active, job.Status.Succeeded, job.Status.Failed) } } } @@ -112,8 +131,8 @@ func PrintRunningJobStatus(log *logger.Logger, serviceName, jobName, namespace s } // WaitAndCleanup waits for job completion and cleans up resources -func WaitAndCleanup(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, cleanupPVC bool) error { - if err := WaitForJobCompletion(k8sClient, namespace, jobName, log); err != nil { +func WaitAndCleanup(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, cleanupPVC bool, jobTimeout time.Duration) error { + if err := WaitForJobCompletion(k8sClient, namespace, jobName, log, jobTimeout); err != nil { log.Errorf("Job failed: %v", err) log.Println() log.Infof("Cleanup commands:") diff --git a/internal/orchestration/restore/job_test.go b/internal/orchestration/restore/job_test.go new file mode 100644 index 0000000..0cd60ee --- /dev/null +++ b/internal/orchestration/restore/job_test.go @@ -0,0 +1,168 @@ +package restore + +import ( + "context" + "testing" + "time" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace = "test-ns" + testJobName = "test-job" +) + +// newTestJob creates a minimal batchv1.Job with the given status values. +func newTestJob(succeeded, failed int32) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: testJobName, + Namespace: testNamespace, + }, + Status: batchv1.JobStatus{ + Succeeded: succeeded, + Failed: failed, + }, + } +} + +// updateJobStatus writes an updated status back via Update (not UpdateStatus) so that +// the fake client's object tracker reflects the change on the next Get call. +func updateJobStatus(t *testing.T, fakeClient *fake.Clientset, succeeded, failed int32) { + t.Helper() + job, err := fakeClient.BatchV1().Jobs(testNamespace).Get(context.Background(), testJobName, metav1.GetOptions{}) + require.NoError(t, err) + job.Status.Succeeded = succeeded + job.Status.Failed = failed + _, err = fakeClient.BatchV1().Jobs(testNamespace).Update(context.Background(), job, metav1.UpdateOptions{}) + require.NoError(t, err) +} + +// TestWaitForJobCompletion_Success verifies that WaitForJobCompletion returns nil when the +// job is already in a succeeded state on the first poll. +func TestWaitForJobCompletion_Success(t *testing.T) { + // Pre-populate with a job that has already succeeded + fakeClient := fake.NewSimpleClientset(newTestJob(1, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + err := WaitForJobCompletion(client, testNamespace, testJobName, log, 5*time.Second) + require.NoError(t, err) +} + +// TestWaitForJobCompletion_JobFailed verifies that WaitForJobCompletion returns an error +// when the job is already in a failed state. +func TestWaitForJobCompletion_JobFailed(t *testing.T) { + // Pre-populate with a job that has already failed + fakeClient := fake.NewSimpleClientset(newTestJob(0, 1)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + err := WaitForJobCompletion(client, testNamespace, testJobName, log, 5*time.Second) + + require.Error(t, err) + assert.Contains(t, err.Error(), "job failed") +} + +// TestWaitForJobCompletion_Timeout verifies that WaitForJobCompletion returns a timeout +// error when the job remains pending for the full duration. +func TestWaitForJobCompletion_Timeout(t *testing.T) { + fakeClient := fake.NewSimpleClientset(newTestJob(0, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Very short timeout; job never transitions + err := WaitForJobCompletion(client, testNamespace, testJobName, log, 100*time.Millisecond) + + require.Error(t, err) + assert.Contains(t, err.Error(), "timeout waiting for job to complete") +} + +// TestWaitForJobCompletion_JobNotFound verifies that WaitForJobCompletion returns an error +// immediately when the job cannot be found (first Get returns 404). +func TestWaitForJobCompletion_JobNotFound(t *testing.T) { + // Empty fake client — no job pre-created + fakeClient := fake.NewSimpleClientset() + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // The first poll will fail to find the job; give enough timeout so we're not racing + err := WaitForJobCompletion(client, testNamespace, testJobName, log, 5*time.Second) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get job status") +} + +// TestWaitForJobCompletion_DefaultTimeout verifies the value of the fallback constant. +func TestWaitForJobCompletion_DefaultTimeout(t *testing.T) { + assert.Equal(t, 30*time.Minute, defaultJobCompletionTimeout) +} + +// TestWaitForJobCompletion_CustomTimeout verifies that a caller-supplied short timeout +// is respected: the function returns quickly and produces the right error. +func TestWaitForJobCompletion_CustomTimeout(t *testing.T) { + fakeClient := fake.NewSimpleClientset(newTestJob(0, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + customTimeout := 150 * time.Millisecond + start := time.Now() + + err := WaitForJobCompletion(client, testNamespace, testJobName, log, customTimeout) + elapsed := time.Since(start) + + require.Error(t, err) + assert.Contains(t, err.Error(), "timeout waiting for job to complete") + // Should have returned near customTimeout, well under 5 seconds + assert.Less(t, elapsed, 5*time.Second, "returned much later than the custom timeout") +} + +// TestWaitForJobCompletion_SuccessAfterPending verifies that the function keeps polling +// and detects a job that transitions to succeeded after a short delay. +func TestWaitForJobCompletion_SuccessAfterPending(t *testing.T) { + fakeClient := fake.NewSimpleClientset(newTestJob(0, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Mark the job as succeeded after one polling interval (10s) would have fired; + // but since we poll with a ticker the test just needs status set before the timeout. + // We set it in a goroutine shortly after the function starts. + go func() { + time.Sleep(200 * time.Millisecond) + updateJobStatus(t, fakeClient, 1, 0) + }() + + // Allow plenty of time — function should return early once the status is updated + err := WaitForJobCompletion(client, testNamespace, testJobName, log, 30*time.Second) + require.NoError(t, err) +} + +// TestWaitAndCleanup_Success verifies the full wait-and-cleanup path when the job +// is already in a succeeded state. +func TestWaitAndCleanup_Success(t *testing.T) { + fakeClient := fake.NewSimpleClientset(newTestJob(1, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + err := WaitAndCleanup(client, testNamespace, testJobName, log, false, 5*time.Second) + require.NoError(t, err) +} + +// TestWaitAndCleanup_Timeout verifies that WaitAndCleanup propagates a timeout error. +func TestWaitAndCleanup_Timeout(t *testing.T) { + fakeClient := fake.NewSimpleClientset(newTestJob(0, 0)) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + err := WaitAndCleanup(client, testNamespace, testJobName, log, false, 100*time.Millisecond) + + require.Error(t, err) + assert.Contains(t, err.Error(), "timeout waiting for job to complete") +}