Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/pgwatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ func main() {

// check if some sub-command was executed and exit
if opts.CommandCompleted {
if err != nil {
var upgErr *cmdopts.ErrUpgradeNotSupported
if errors.As(err, &upgErr) {
log.GetLogger(mainCtx).Warnf(
"[%s] configuration storage does not support upgrade, skipping",
upgErr.Target,
)
exitCode.Store(cmdopts.ExitCodeOK)
return
}

log.GetLogger(mainCtx).Error(err)
exitCode.Store(opts.ExitCode)
return
}

exitCode.Store(opts.ExitCode)
return
}
Expand Down
20 changes: 11 additions & 9 deletions cmd/pgwatch/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ import "fmt"

// version output variables
var (
commit = "unknown"
version = "unknown"
date = "unknown"
dbapi = "00824"
commit = "unknown"
version = "unknown"
date = "unknown"
configSchema = "00824"
sinkSchema = "01110"
)

func printVersion() {
fmt.Printf(`
Version info:
Version: %s
DB Schema: %s
Git Commit: %s
Built: %s
Version: %s
ConfigSchema: %s
SinkSchema: %s
Git Commit: %s
Built: %s

`, version, dbapi, commit, date)
`, version, configSchema, sinkSchema, commit, date)
}
80 changes: 53 additions & 27 deletions internal/cmdopts/cmdconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,46 +85,72 @@ type ConfigUpgradeCommand struct {
// Execute upgrades the configuration schema.
func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
opts := cmd.owner
if err = opts.ValidateConfig(); err != nil {
return
}
ctx := context.Background()
// Upgrade metrics/sources configuration if it's postgres
if opts.IsPgConnStr(opts.Metrics.Metrics) && opts.IsPgConnStr(opts.Sources.Sources) {
err = opts.InitMetricReader(ctx)
if err != nil {
upgraded := false

// ---- Metrics upgrade ----
if opts.Metrics.Metrics != "" {
if !opts.IsPgConnStr(opts.Metrics.Metrics) {
return &ErrUpgradeNotSupported{
Target: "metrics.yaml",
}
}
if err = opts.InitMetricReader(ctx); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
m, ok := opts.MetricsReaderWriter.(metrics.Migrator)
if !ok {
return errors.New("metrics backend does not implement migrator")
}
if err = m.Migrate(); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
if m, ok := opts.MetricsReaderWriter.(metrics.Migrator); ok {
err = m.Migrate()
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
upgraded = true
}

// ---- Sources upgrade ----
if opts.Sources.Sources != "" {
if !opts.IsPgConnStr(opts.Sources.Sources) {
return &ErrUpgradeNotSupported{
Target: "metrics.yaml",
}
}
} else {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("configuration storage does not support upgrade")
if err = opts.InitSourceReader(ctx); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
m, ok := opts.SourcesReaderWriter.(metrics.Migrator)
if !ok {
return errors.New("sources backend does not implement migrator")
}
if err = m.Migrate(); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
upgraded = true
}
// Upgrade sinks configuration if it's postgres

// ---- Sinks upgrade ----
if len(opts.Sinks.Sinks) > 0 {
err = opts.InitSinkWriter(ctx)
if err != nil {
if err = opts.InitSinkWriter(ctx); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
if m, ok := opts.SinksWriter.(metrics.Migrator); ok {
err = m.Migrate()
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
} else {
m, ok := opts.SinksWriter.(metrics.Migrator)
if !ok {
return errors.New("sinks backend does not implement migrator")
}
if err = m.Migrate(); err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("sink storage does not support upgrade")
return
}
upgraded = true
}
if !upgraded {
return errors.New("nothing to upgrade: please specify --metrics, --sources, or --sink")
}
opts.CompleteCommand(ExitCodeOK)
return
return nil
}
7 changes: 3 additions & 4 deletions internal/cmdopts/cmdconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestConfigInitCommand_Execute(t *testing.T) {
func TestConfigUpgradeCommand_Execute(t *testing.T) {
a := assert.New(t)

t.Run("sources and metrics are empty", func(*testing.T) {
t.Run("no upgrade target specified", func(*testing.T) {
os.Args = []string{0: "config_test", "config", "upgrade"}
_, err := New(io.Discard)
a.Error(err)
Expand All @@ -85,8 +85,7 @@ func TestConfigUpgradeCommand_Execute(t *testing.T) {
os.Args = []string{0: "config_test", "--metrics=" + fname, "config", "upgrade"}
c, err := New(io.Discard)
a.Error(err)
a.True(c.CommandCompleted)
a.Equal(ExitCodeConfigError, c.ExitCode)
a.False(c.CommandCompleted)
})

}
Expand Down Expand Up @@ -310,7 +309,7 @@ func TestConfigUpgradeCommand_Errors(t *testing.T) {
cmd := ConfigUpgradeCommand{owner: opts}
err := cmd.Execute(nil)
a.Error(err)
a.ErrorContains(err, "does not support upgrade")

})

t.Run("init metrics reader fails", func(*testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/cmdopts/cmdoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func (c *Options) NeedsSchemaUpgrade() (upgrade bool, err error) {
}

// ValidateConfig checks if the configuration is valid.
// Configuration database can be specified for one of the --sources or --metrics.
// Configuration database can be specified for one of the --sources or --metrics or --sink.
// If one is specified, the other one is set to the same value.
func (c *Options) ValidateConfig() error {
if len(c.Sources.Sources)+len(c.Metrics.Metrics) == 0 {
return errors.New("both --sources and --metrics are empty")
if len(c.Sources.Sources) == 0 && len(c.Metrics.Metrics) == 0 && len(c.Sinks.Sinks) == 0 {
return errors.New("at least one of --sources, --metrics, or --sink must be provided")
}
switch { // if specified configuration database, use it for both sources and metrics
case c.Sources.Sources == "" && c.IsPgConnStr(c.Metrics.Metrics):
Expand Down
11 changes: 11 additions & 0 deletions internal/cmdopts/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cmdopts

// ErrUpgradeNotSupported is returned when a config backend
// does not support schema upgrades (e.g. YAML files).
type ErrUpgradeNotSupported struct {
Target string // sources.yaml / metrics.yaml / sinks
}

func (e *ErrUpgradeNotSupported) Error() string {
return "configuration storage does not support upgrade"
}
Loading