Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/clickhouse/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operati
}
}

if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, defaultRestoreTimeout, appCtx.Logger); err != nil {
if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, appCtx.JobTimeout, appCtx.Logger); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func addBackupConfigFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&flags.ConfigMapName, "configmap", "suse-observability-backup-config", "ConfigMap name containing backup configuration")
cmd.PersistentFlags().StringVar(&flags.SecretName, "secret", "suse-observability-backup-config", "Secret name containing backup configuration")
cmd.PersistentFlags().StringVarP(&flags.OutputFormat, "output", "o", "table", "Output format (table, json)")
cmd.PersistentFlags().IntVar(&flags.JobTimeout, "job-timeout", config.DefaultJobTimeoutMinutes, "Timeout in minutes for waiting on job/restore completion")
_ = cmd.MarkPersistentFlagRequired("namespace")
}

Expand Down
1 change: 1 addition & 0 deletions cmd/settings/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error {
CleanupPVC: false,
WaitForJob: waitForJob,
Log: appCtx.Logger,
Timeout: appCtx.JobTimeout,
})
}
2 changes: 1 addition & 1 deletion cmd/settings/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func getBackupListFromPVC(appCtx *app.Context) ([]BackupFileInfo, error) {
}
}()

if err := restore.WaitForJobCompletion(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger); err != nil {
if err := restore.WaitForJobCompletion(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout); err != nil {
return nil, err
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/settings/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ func runRestore(appCtx *app.Context) error {
return nil
}

return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger)
return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout)
}

// waitAndCleanupRestoreJob waits for job completion and cleans up resources
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error {
restore.PrintWaitingMessage(log, "settings", jobName, namespace)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false, jobTimeout)
}

// getLatestBackup retrieves the most recent backup from all sources (S3 and PVC)
Expand Down
1 change: 1 addition & 0 deletions cmd/stackgraph/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error {
CleanupPVC: true,
WaitForJob: waitForJob,
Log: appCtx.Logger,
Timeout: appCtx.JobTimeout,
})
}
6 changes: 3 additions & 3 deletions cmd/stackgraph/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ func runRestore(appCtx *app.Context) error {
return nil
}

return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger)
return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout)
}

// waitAndCleanupRestoreJob waits for job completion and cleans up resources
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error {
restore.PrintWaitingMessage(log, "stackgraph", jobName, namespace)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true, jobTimeout)
}

// getLatestBackup retrieves the most recent backup from S3
Expand Down
1 change: 1 addition & 0 deletions cmd/victoriametrics/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ func runCheckAndFinalize(appCtx *app.Context) error {
CleanupPVC: false,
WaitForJob: waitForJob,
Log: appCtx.Logger,
Timeout: appCtx.JobTimeout,
})
}
6 changes: 3 additions & 3 deletions cmd/victoriametrics/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func runRestore(appCtx *app.Context) error {
return nil
}

return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger)
return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger, appCtx.JobTimeout)
}

// waitAndCleanupRestoreJob waits for job completion and cleans up resources
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error {
restore.PrintWaitingMessage(log, "victoria-metrics", jobName, namespace)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false, jobTimeout)
}

// getLatestBackup retrieves the most recent backup from S3
Expand Down
39 changes: 21 additions & 18 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse"
"github.com/stackvista/stackstate-backup-cli/internal/clients/elasticsearch"
Expand All @@ -16,15 +17,16 @@ import (

// Context holds all dependencies for cli commands
type Context struct {
K8sClient *k8s.Client
Namespace string
S3Client s3.Interface
ESClient elasticsearch.Interface
CHClient clickhouse.Interface
Config *config.Config
Logger *logger.Logger
Formatter *output.Formatter
Context context.Context
K8sClient *k8s.Client
Namespace string
S3Client s3.Interface
ESClient elasticsearch.Interface
CHClient clickhouse.Interface
Config *config.Config
Logger *logger.Logger
Formatter *output.Formatter
Context context.Context
JobTimeout time.Duration
}

// NewContext creates production dependencies
Expand Down Expand Up @@ -68,14 +70,15 @@ func NewContext(flags *config.CLIGlobalFlags) (*Context, error) {
formatter := output.NewFormatter(os.Stdout, flags.OutputFormat)

return &Context{
K8sClient: k8sClient,
Namespace: flags.Namespace,
Config: cfg,
S3Client: s3Client,
ESClient: esClient,
CHClient: chClient,
Logger: logger.New(flags.Quiet, flags.Debug),
Formatter: formatter,
Context: context.Background(),
K8sClient: k8sClient,
Namespace: flags.Namespace,
Config: cfg,
S3Client: s3Client,
ESClient: esClient,
CHClient: chClient,
Logger: logger.New(flags.Quiet, flags.Debug),
Formatter: formatter,
Context: context.Background(),
JobTimeout: time.Duration(flags.JobTimeout) * time.Minute,
}, nil
}
6 changes: 6 additions & 0 deletions internal/foundation/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ func LoadConfig(clientset kubernetes.Interface, namespace, configMapName, secret
return config, nil
}

const (
// DefaultJobTimeoutMinutes is the default timeout for job completion in minutes
DefaultJobTimeoutMinutes = 30
)

type CLIGlobalFlags struct {
Namespace string
Kubeconfig string
Expand All @@ -349,6 +354,7 @@ type CLIGlobalFlags struct {
ConfigMapName string
SecretName string
OutputFormat string // table, json
JobTimeout int // job completion timeout in minutes
}

func NewCLIGlobalFlags() *CLIGlobalFlags {
Expand Down
6 changes: 5 additions & 1 deletion internal/orchestration/restore/finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"fmt"
"time"

"github.com/stackvista/stackstate-backup-cli/internal/clients/k8s"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/logger"
Expand Down Expand Up @@ -74,13 +75,14 @@ type WaitAndFinalizeParams struct {
ScaleSelector string
CleanupPVC bool
Log *logger.Logger
Timeout time.Duration
}

// WaitAndFinalize waits for job completion and then cleans up
func WaitAndFinalize(params WaitAndFinalizeParams) error {
PrintWaitingMessage(params.Log, params.ServiceName, params.JobName, params.Namespace)

if err := WaitForJobCompletion(params.K8sClient, params.Namespace, params.JobName, params.Log); err != nil {
if err := WaitForJobCompletion(params.K8sClient, params.Namespace, params.JobName, params.Log, params.Timeout); err != nil {
params.Log.Errorf("Job failed: %v", err)
// Still cleanup even if failed
params.Log.Println()
Expand Down Expand Up @@ -113,6 +115,7 @@ type CheckAndFinalizeParams struct {
CleanupPVC bool
WaitForJob bool
Log *logger.Logger
Timeout time.Duration
}

// CheckAndFinalize checks the status of a background restore job and cleans up resources
Expand Down Expand Up @@ -157,6 +160,7 @@ func CheckAndFinalize(params CheckAndFinalizeParams) error {
ScaleSelector: params.ScaleSelector,
CleanupPVC: params.CleanupPVC,
Log: params.Log,
Timeout: params.Timeout,
})
}

Expand Down
69 changes: 44 additions & 25 deletions internal/orchestration/restore/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,45 @@ const (
)

// WaitForJobCompletion waits for a Kubernetes job to complete
func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
timeout := time.After(defaultJobCompletionTimeout)
func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, jobTimeout time.Duration) error {
if jobTimeout <= 0 {
jobTimeout = defaultJobCompletionTimeout
}

// Initial check before starting the ticker
checkJob := func() (bool, error) {
job, err := k8sClient.GetJob(namespace, jobName)
if err != nil {
return false, fmt.Errorf("failed to get job status: %w", err)
}

if job.Status.Succeeded > 0 {
return true, nil
}

if job.Status.Failed > 0 {
// Get and print logs from failed job
log.Println()
log.Errorf("Job failed. Fetching logs...")
log.Println()
if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil {
log.Warningf("Failed to fetch job logs: %v", err)
}
return false, fmt.Errorf("job failed")
}

log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d",
job.Status.Active, job.Status.Succeeded, job.Status.Failed)
return false, nil
}

// Perform initial check
done, err := checkJob()
if err != nil || done {
return err
}

timeout := time.After(jobTimeout)
ticker := time.NewTicker(defaultJobStatusCheckInterval)
defer ticker.Stop()

Expand All @@ -24,28 +61,10 @@ func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log
case <-timeout:
return fmt.Errorf("timeout waiting for job to complete")
case <-ticker.C:
job, err := k8sClient.GetJob(namespace, jobName)
if err != nil {
return fmt.Errorf("failed to get job status: %w", err)
done, err := checkJob()
if err != nil || done {
return err
}

if job.Status.Succeeded > 0 {
return nil
}

if job.Status.Failed > 0 {
// Get and print logs from failed job
log.Println()
log.Errorf("Job failed. Fetching logs...")
log.Println()
if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil {
log.Warningf("Failed to fetch job logs: %v", err)
}
return fmt.Errorf("job failed")
}

log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d",
job.Status.Active, job.Status.Succeeded, job.Status.Failed)
}
}
}
Expand Down Expand Up @@ -112,8 +131,8 @@ func PrintRunningJobStatus(log *logger.Logger, serviceName, jobName, namespace s
}

// WaitAndCleanup waits for job completion and cleans up resources
func WaitAndCleanup(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, cleanupPVC bool) error {
if err := WaitForJobCompletion(k8sClient, namespace, jobName, log); err != nil {
func WaitAndCleanup(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, cleanupPVC bool, jobTimeout time.Duration) error {
if err := WaitForJobCompletion(k8sClient, namespace, jobName, log, jobTimeout); err != nil {
log.Errorf("Job failed: %v", err)
log.Println()
log.Infof("Cleanup commands:")
Expand Down
Loading