From e12054f943f23cba7c704d60037fd06be3851f59 Mon Sep 17 00:00:00 2001 From: Rod Boev Date: Wed, 24 Jun 2026 11:35:54 -0500 Subject: [PATCH 1/4] feat: keep named PG targets from sharing push state Named PostgreSQL targets need separate push state, routing, service status, and validation so one configured target cannot accidentally inherit state or defaults from another. Includes: - docs(postgres): clarify named target routing (#517) - fix(postgres): isolate named target resolution (#517) - fix(postgres): align service install with named targets (#517) - test(postgres): prove selected-target routing and scoped state (#517) - test(postgres): cover aggregate and service status paths (#517) - refactor(postgres): centralize named target defaults (#517) - test(postgres): isolate command proofs from local state (#517) - Keep named PG target failures actionable - test(postgres): keep reserved target names explicit (#517) - Lock reserved PG target names into tests - chore(postgres): drop duplicate review-generated test file (#517) - Keep PG owner markers stable across named targets - Keep PG target docs aligned with the CLI - Keep empty PG service status from hiding healthy state - Keep named PG parsing case-stable - Keep the PG parser regression test honest - Keep reserved PG target names documented - Preserve direct Sync constructions after sync-state scoping - Keep named PG target validation scoped to PG flows --- README.md | 43 ++- cmd/agentsview/archive_write_backend.go | 44 +-- cmd/agentsview/archive_write_backend_test.go | 4 +- cmd/agentsview/classifier.go | 2 +- cmd/agentsview/classifier_wiring_test.go | 8 + cmd/agentsview/cli.go | 50 ++- cmd/agentsview/daemon_push.go | 20 +- cmd/agentsview/pg.go | 285 +++++++++++--- cmd/agentsview/pg_service.go | 42 +- cmd/agentsview/pg_service_manager.go | 14 +- cmd/agentsview/pg_service_test.go | 103 ++++- cmd/agentsview/pg_test.go | 176 +++++++++ cmd/agentsview/pg_watch.go | 61 ++- cmd/agentsview/pg_watch_test.go | 162 +++++++- cmd/agentsview/session.go | 2 +- docs/commands.md | 7 +- docs/pg-sync.md | 60 ++- internal/config/config.go | 380 +++++++++++++++++-- internal/config/config_test.go | 178 ++++++++- internal/config/pg_target_case_test.go | 39 ++ internal/parser/gemini_parser_test.go | 2 +- internal/postgres/connect.go | 2 +- internal/postgres/push.go | 47 ++- internal/postgres/push_test.go | 35 ++ internal/postgres/sync.go | 185 ++++++++- internal/postgres/sync_test.go | 165 ++++++++ internal/postgres/time.go | 1 + internal/server/huma_routes_push.go | 18 +- 28 files changed, 1915 insertions(+), 220 deletions(-) create mode 100644 internal/config/pg_target_case_test.go diff --git a/README.md b/README.md index d32cf46c1..ca81b6498 100644 --- a/README.md +++ b/README.md @@ -384,10 +384,40 @@ or read them, and treats sidecars as untrusted structured input -- see Push session data to a shared PostgreSQL instance for team dashboards: ```bash -agentsview pg push # push local data to PG -agentsview pg serve # serve web UI from PG (read-only) +agentsview pg push # push local data to the default PG target +agentsview pg push archive # push to one named PG target +agentsview pg push --all # push every configured PG target sequentially +agentsview pg status # show status for the default PG target +agentsview pg status archive # show status for one named PG target +agentsview pg status --all # show status for every configured PG target +agentsview pg serve # serve web UI from the default PG target (read-only) ``` +Single-target configs still use the legacy `[pg]` block. To manage more than one +PostgreSQL destination, define named `[pg.NAME]` blocks and set `default_pg` +when more than one target exists: + +```toml +default_pg = "work" + +[pg.work] +url = "postgres://user:pass@work-db/agentsview" +machine_name = "laptop" + +[pg.archive] +url = "postgres://user:pass@archive-db/agentsview" +machine_name = "laptop-archive" +exclude_projects = ["scratch"] +``` + +Named target names are normalized case-insensitively. `all`, `local`, and the +legacy `[pg]` field names `url`, `schema`, `machine_name`, `allow_insecure`, +`projects`, and `exclude_projects` cannot be used for `[pg.NAME]`. + +`AGENTSVIEW_PG_URL`, `AGENTSVIEW_PG_SCHEMA`, and `AGENTSVIEW_PG_MACHINE` still +work, but in named-target mode they apply only to the effective default target. +They do not rewrite every named `[pg.NAME]` entry. + ### Automatic push (background service) To keep a shared PostgreSQL database current without running `pg push` by hand, @@ -396,10 +426,15 @@ after new sessions are recorded, with a periodic floor as a safety net: ```bash agentsview pg push --watch # foreground, Ctrl-C to stop +agentsview pg push archive --watch # watch one named PG target agentsview pg push --watch --debounce 1m # custom coalesce window agentsview pg push --watch --interval 5m # custom floor interval ``` +`--watch` follows the default PG target unless you pass one target name. +`--all --watch` is rejected; multi-target background watch remains out of scope +for now. + The daemon reads the same `[pg]` config as `pg push`, so the PostgreSQL DSN must be set in your config file (or an environment variable it expands). Protect the config file, since it holds credentials: @@ -418,6 +453,10 @@ agentsview pg service logs -f # follow the service log agentsview pg service uninstall # stop and remove ``` +`pg serve` and `pg service` always use the effective default PG target. In +named-target mode, set `default_pg` to choose which target those long-running +commands use. + **Linux headless machines:** systemd `--user` services stop at logout and do not start at boot unless lingering is enabled for your user. `install` detects this and prints the command; you can also run it yourself: diff --git a/cmd/agentsview/archive_write_backend.go b/cmd/agentsview/archive_write_backend.go index 148440263..8595fc19d 100644 --- a/cmd/agentsview/archive_write_backend.go +++ b/cmd/agentsview/archive_write_backend.go @@ -19,7 +19,7 @@ import ( type archiveWriteBackend interface { PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -33,7 +33,7 @@ type archiveWriteBackend interface { ) (duckdbsync.PushResult, error) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -87,7 +87,7 @@ type daemonArchiveWriteBackend struct { func (b daemonArchiveWriteBackend) PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -95,10 +95,12 @@ func (b daemonArchiveWriteBackend) PGPush( return postDaemonPush[postgres.PushResult]( ctx, b.tr, b.appCfg.AuthToken, "/api/v1/push/pg", daemonPushRequest{ - Full: cfg.Full, - Projects: projects, - ExcludeProjects: excludeProjects, - PG: &pgCfg, + Full: cfg.Full, + Projects: projects, + ExcludeProjects: excludeProjects, + PG: &target.PG, + SyncStateTarget: target.SyncStateTarget, + MigrateLegacySyncState: target.MigrateLegacySyncState, }, ) } @@ -141,7 +143,7 @@ func absolutizeDuckDBPath( func (b daemonArchiveWriteBackend) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, exclude []string, @@ -170,7 +172,7 @@ func (b daemonArchiveWriteBackend) PGPushWatch( } defer cleanup() res, err := backend.PGPush( - pctx, pgCfg, pushCfg, projects, exclude, + pctx, target, pushCfg, projects, exclude, ) if err != nil { return err @@ -213,7 +215,7 @@ type localArchiveWriteBackend struct { func (b *localArchiveWriteBackend) PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -228,12 +230,9 @@ func (b *localArchiveWriteBackend) PGPush( connectStart := time.Now() applyClassifierConfig(b.appCfg) ps, err := postgres.New( - pgCfg.URL, pgCfg.Schema, b.database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{ - Projects: projects, - ExcludeProjects: excludeProjects, - }, + target.PG.URL, target.PG.Schema, b.database, + target.PG.MachineName, target.PG.AllowInsecure, + target.syncOptions(projects, excludeProjects), ) if err != nil { return postgres.PushResult{}, err @@ -324,7 +323,7 @@ func (b *localArchiveWriteBackend) DuckDBPush( func (b *localArchiveWriteBackend) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, exclude []string, @@ -367,12 +366,9 @@ func (b *localArchiveWriteBackend) PGPushWatch( connect: func() (pgTarget, error) { applyClassifierConfig(b.appCfg) s, cErr := postgres.New( - pgCfg.URL, pgCfg.Schema, b.database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{ - Projects: projects, - ExcludeProjects: exclude, - }, + target.PG.URL, target.PG.Schema, b.database, + target.PG.MachineName, target.PG.AllowInsecure, + target.syncOptions(projects, exclude), ) if cErr != nil { return nil, cErr @@ -385,7 +381,7 @@ func (b *localArchiveWriteBackend) PGPushWatch( fmt.Printf( "agentsview pg watch: pushing to PostgreSQL as %q "+ "(debounce %s, floor %s)\n", - pgCfg.MachineName, debounce, interval, + target.PG.MachineName, debounce, interval, ) if err := pusher.push(ctx, reasonStartup, didResync); err != nil { diff --git a/cmd/agentsview/archive_write_backend_test.go b/cmd/agentsview/archive_write_backend_test.go index 46259fcfc..ef39b471d 100644 --- a/cmd/agentsview/archive_write_backend_test.go +++ b/cmd/agentsview/archive_write_backend_test.go @@ -18,7 +18,7 @@ func TestLocalArchiveWriteBackendPGPushStopsAfterCanceledLocalSync(t *testing.T) testLocalArchivePushStopsAfterCanceledSync(t, func(backend *localArchiveWriteBackend, ctx context.Context) error { _, err := backend.PGPush( - ctx, config.PGConfig{}, PGPushConfig{}, nil, nil, + ctx, pgTargetSelection{}, PGPushConfig{}, nil, nil, ) return err }) @@ -58,7 +58,7 @@ func TestLocalArchiveWriteBackendPGPushWatchCanceledStartupIsClean(t *testing.T) err := backend.PGPushWatch( canceledContext(), - config.PGConfig{}, + pgTargetSelection{}, PGPushConfig{}, nil, nil, diff --git a/cmd/agentsview/classifier.go b/cmd/agentsview/classifier.go index 9bbb3ce70..e80397ea6 100644 --- a/cmd/agentsview/classifier.go +++ b/cmd/agentsview/classifier.go @@ -137,7 +137,7 @@ func runClassifierRebuild( } if pgCfg.URL == "" { return errors.New( - "pg url not configured; set AGENTSVIEW_PG_URL or [pg].url", + "pg url not configured; set AGENTSVIEW_PG_URL, use a legacy [pg].url, or configure default_pg with named [pg.NAME] targets", ) } if err := clearPGClassifierHash(ctx, cfg, pgCfg); err != nil { diff --git a/cmd/agentsview/classifier_wiring_test.go b/cmd/agentsview/classifier_wiring_test.go index 655941771..e6bcf11e7 100644 --- a/cmd/agentsview/classifier_wiring_test.go +++ b/cmd/agentsview/classifier_wiring_test.go @@ -32,6 +32,11 @@ var triggerCalls = map[string]struct{}{ const wiringHelper = "applyClassifierConfig" +var inheritedWiringFuncs = map[string]struct{}{ + "runPGPushTarget": {}, + "runPGStatusTarget": {}, +} + // TestEveryStoreOpenPathIsWired enforces the rule documented // in the design spec: every code path in cmd/agentsview that // opens or initializes a store must first call @@ -83,6 +88,9 @@ func scanFile( if fn.Body == nil { return true } + if _, ok := inheritedWiringFuncs[fn.Name.Name]; ok { + return true + } if v := checkBody( fset, fn.Body, funcLabel(fset, fn), ); v != "" { diff --git a/cmd/agentsview/cli.go b/cmd/agentsview/cli.go index 869f80442..b26f36c4d 100644 --- a/cmd/agentsview/cli.go +++ b/cmd/agentsview/cli.go @@ -531,22 +531,40 @@ func newPGCommand() *cobra.Command { func newPGPushCommand() *cobra.Command { var cfg PGPushConfig cmd := &cobra.Command{ - Use: "push", + Use: "push [target]", Short: "Push local data to PostgreSQL", SilenceUsage: true, - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + targetName := "" + if len(args) == 1 { + targetName = args[0] + } + if cfg.AllTargets && cfg.Watch { + return fmt.Errorf( + "pg push --watch: %w", + fmt.Errorf( + "--all cannot be combined with --watch", + ), + ) + } if cfg.Watch { - runPGPushWatch(cfg) - return + if err := runPGPushWatch(cfg, targetName); err != nil { + return fmt.Errorf("pg push --watch: %w", err) + } + return nil } if cmd.Flags().Changed("debounce") || cmd.Flags().Changed("interval") { fmt.Fprintln(os.Stderr, "warning: --debounce and --interval have no effect without --watch") } - runPGPush(cfg) + if err := runPGPush(cfg, targetName); err != nil { + return fmt.Errorf("pg push: %w", err) + } + return nil }, } + cmd.Flags().BoolVar(&cfg.AllTargets, "all", false, "Push every configured PG target sequentially") cmd.Flags().BoolVar(&cfg.Full, "full", false, "Force full local resync and PG push") cmd.Flags().StringVar(&cfg.ProjectsFlag, "projects", "", "Comma-separated list of projects to push (inclusive)") cmd.Flags().StringVar(&cfg.ExcludeProjects, "exclude-projects", "", "Comma-separated list of projects to exclude from push") @@ -558,15 +576,25 @@ func newPGPushCommand() *cobra.Command { } func newPGStatusCommand() *cobra.Command { - return &cobra.Command{ - Use: "status", + var allTargets bool + cmd := &cobra.Command{ + Use: "status [target]", Short: "Show PG sync status", SilenceUsage: true, - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - runPGStatus() + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + targetName := "" + if len(args) == 1 { + targetName = args[0] + } + if err := runPGStatus(targetName, allTargets); err != nil { + return fmt.Errorf("pg status: %w", err) + } + return nil }, } + cmd.Flags().BoolVar(&allTargets, "all", false, "Show status for every configured PG target") + return cmd } func newPGServeCommand() *cobra.Command { diff --git a/cmd/agentsview/daemon_push.go b/cmd/agentsview/daemon_push.go index 97924730a..b80c6c892 100644 --- a/cmd/agentsview/daemon_push.go +++ b/cmd/agentsview/daemon_push.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -13,11 +14,13 @@ import ( ) type daemonPushRequest struct { - Full bool `json:"full"` - Projects []string `json:"projects,omitempty"` - ExcludeProjects []string `json:"exclude_projects,omitempty"` - PG *config.PGConfig `json:"pg,omitempty"` - DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + Full bool `json:"full"` + Projects []string `json:"projects,omitempty"` + ExcludeProjects []string `json:"exclude_projects,omitempty"` + PG *config.PGConfig `json:"pg,omitempty"` + DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + SyncStateTarget string `json:"sync_state_target,omitempty"` + MigrateLegacySyncState bool `json:"migrate_legacy_sync_state,omitempty"` } func postDaemonPush[T any]( @@ -51,6 +54,13 @@ func postDaemonPush[T any]( defer resp.Body.Close() if resp.StatusCode != http.StatusOK { msg, _ := io.ReadAll(resp.Body) + var apiErr struct { + Error string `json:"error"` + } + if err := json.Unmarshal(msg, &apiErr); err == nil && + apiErr.Error != "" { + return zero, errors.New(apiErr.Error) + } return zero, fmt.Errorf( "HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(msg)), ) diff --git a/cmd/agentsview/pg.go b/cmd/agentsview/pg.go index dadec4536..5292f550f 100644 --- a/cmd/agentsview/pg.go +++ b/cmd/agentsview/pg.go @@ -14,12 +14,14 @@ import ( "github.com/spf13/cobra" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/db" "go.kenn.io/agentsview/internal/postgres" "go.kenn.io/agentsview/internal/server" ) type PGPushConfig struct { Full bool + AllTargets bool ProjectsFlag string ExcludeProjects string AllProjects bool @@ -28,27 +30,52 @@ type PGPushConfig struct { Interval time.Duration } -func runPGPush(cfg PGPushConfig) { - appCfg, err := config.LoadMinimal() - if err != nil { - log.Fatalf("loading config: %v", err) +type pgTargetSelection struct { + Name string + PG config.PGConfig + IsDefault bool + SyncStateTarget string + MigrateLegacySyncState bool +} + +func (s pgTargetSelection) label() string { + if s.Name == "" { + return "default" } - if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + if s.IsDefault { + return s.Name + " (default)" } - setupLogFile(appCfg.DataDir) + return s.Name +} - pgCfg, err := appCfg.ResolvePG() +func (s pgTargetSelection) syncOptions( + projects, excludeProjects []string, +) postgres.SyncOptions { + return postgres.SyncOptions{ + Projects: projects, + ExcludeProjects: excludeProjects, + SyncStateTarget: s.SyncStateTarget, + MigrateLegacySyncState: s.MigrateLegacySyncState, + } +} + +func runPGPush( + cfg PGPushConfig, targetName string, +) error { + appCfg, err := config.LoadMinimal() if err != nil { - fatal("pg push: %v", err) + return fmt.Errorf("loading config: %w", err) } - if pgCfg.URL == "" { - fatal("pg push: url not configured") + if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { + return fmt.Errorf("creating data dir: %w", err) } + setupLogFile(appCfg.DataDir) - projects, excludeProjects, err := resolvePushProjects(pgCfg, cfg) + targets, err := resolvePGTargetSelections( + appCfg, targetName, cfg.AllTargets, + ) if err != nil { - fatal("pg push: %v", err) + return err } applyClassifierConfig(appCfg) @@ -59,21 +86,78 @@ func runPGPush(cfg PGPushConfig) { backend, cleanup, err := resolveArchiveWriteBackend(ctx, appCfg) if err != nil { - fatal("opening writer: %v", err) + return fmt.Errorf("opening writer: %w", err) } defer cleanup() + var failures []string + for i, target := range targets { + if len(targets) > 1 { + if i > 0 { + fmt.Println() + } + fmt.Printf("Target: %s\n", target.label()) + } + if err := runPGPushTarget( + ctx, backend, appCfg, cfg, target, + ); err != nil { + if len(targets) == 1 { + return err + } + failures = append( + failures, + fmt.Sprintf("%s: %v", target.label(), err), + ) + fmt.Fprintf( + os.Stderr, + "warning: pg push target %s failed: %v\n", + target.label(), err, + ) + } + } + if len(failures) > 0 { + return fmt.Errorf( + "%d pg target(s) failed: %s", + len(failures), + strings.Join(failures, "; "), + ) + } + return nil +} + +func runPGPushTarget( + ctx context.Context, + backend archiveWriteBackend, + appCfg config.Config, + cfg PGPushConfig, + target pgTargetSelection, +) error { + target, err := resolvePGTargetConfig(appCfg, target) + if err != nil { + return err + } + if target.PG.URL == "" { + return fmt.Errorf("url not configured") + } + + projects, excludeProjects, err := resolvePushProjects( + target.PG, cfg, + ) + if err != nil { + return err + } + result, err := backend.PGPush( - ctx, pgCfg, cfg, projects, excludeProjects, + ctx, target, cfg, projects, excludeProjects, ) if err != nil { - fatal("pg push: %v", err) + return err } writePGPushSummary(os.Stdout, result) if result.Errors > 0 { - fatal("pg push: %d session(s) failed", - result.Errors) + return fmt.Errorf("%d session(s) failed", result.Errors) } + return nil } func printPGPushProgress(p postgres.PushProgress) { @@ -118,38 +202,86 @@ func writePGPushSummary(w io.Writer, result postgres.PushResult) { ) } -func runPGStatus() { +func runPGStatus( + targetName string, + allTargets bool, +) error { appCfg, err := config.LoadMinimal() if err != nil { - log.Fatalf("loading config: %v", err) + return fmt.Errorf("loading config: %w", err) } if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + return fmt.Errorf("creating data dir: %w", err) } setupLogFile(appCfg.DataDir) + targets, err := resolvePGTargetSelections( + appCfg, targetName, allTargets, + ) + if err != nil { + return err + } + applyClassifierConfig(appCfg) - database, err := openReadOnlyDB(appCfg) + database, err := openDB(appCfg) if err != nil { - fatal("opening database: %v", err) + return fmt.Errorf("opening database: %w", err) } defer database.Close() - pgCfg, err := appCfg.ResolvePG() + var failures []string + for i, target := range targets { + if len(targets) > 1 || target.Name != "" { + if i > 0 { + fmt.Println() + } + fmt.Printf("Target: %s\n", target.label()) + } + if err := runPGStatusTarget(database, appCfg, target); err != nil { + if len(targets) == 1 { + return err + } + failures = append( + failures, + fmt.Sprintf("%s: %v", target.label(), err), + ) + fmt.Fprintf( + os.Stderr, + "warning: pg status target %s failed: %v\n", + target.label(), err, + ) + } + } + if len(failures) > 0 { + return fmt.Errorf( + "%d pg target(s) failed: %s", + len(failures), + strings.Join(failures, "; "), + ) + } + return nil +} + +func runPGStatusTarget( + database *db.DB, + appCfg config.Config, + target pgTargetSelection, +) error { + target, err := resolvePGTargetConfig(appCfg, target) if err != nil { - fatal("pg status: %v", err) + return err } - if pgCfg.URL == "" { - fatal("pg status: url not configured") + if target.PG.URL == "" { + return fmt.Errorf("url not configured") } ps, err := postgres.New( - pgCfg.URL, pgCfg.Schema, database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{}, + target.PG.URL, target.PG.Schema, database, + target.PG.MachineName, target.PG.AllowInsecure, + target.syncOptions(nil, nil), ) if err != nil { - fatal("pg status: %v", err) + return err } defer ps.Close() @@ -160,13 +292,14 @@ func runPGStatus() { status, err := ps.Status(ctx) if err != nil { - fatal("pg status: %v", err) + return err } fmt.Printf("Machine: %s\n", status.Machine) fmt.Printf("Last push: %s\n", valueOrNever(status.LastPushAt)) fmt.Printf("PG sessions: %d\n", status.PGSessions) fmt.Printf("PG messages: %d\n", status.PGMessages) + return nil } func loadPGServeConfig(cmd *cobra.Command) (config.Config, string, error) { @@ -186,7 +319,6 @@ func loadPGServeConfig(cmd *cobra.Command) (config.Config, string, error) { func runPGServe(appCfg config.Config, basePath string) { setupLogFile(appCfg.DataDir) - // Generate auth token when auth is explicitly required. if appCfg.RequireAuth { if err := appCfg.EnsureAuthToken(); err != nil { fatal("pg serve: generating auth token: %v", err) @@ -224,12 +356,6 @@ func runPGServe(appCfg config.Config, basePath string) { ) defer stop() - // Attempt to apply any missing schema migrations before - // the compatibility check. This handles upgrades (e.g. - // new tables like tool_result_events) without requiring a - // manual schema drop. If the PG role is read-only the - // migration is skipped and the compat check reports what - // is missing. if err := postgres.EnsureSchema( ctx, store.DB(), pgCfg.Schema, ); err != nil { @@ -328,10 +454,6 @@ func runPGServe(appCfg config.Config, basePath string) { } } -// resolvePushProjects merges configured project filters with CLI -// flag overrides. A CLI include or exclude flag fully replaces the -// configured lists; --all-projects clears both. Include and exclude -// are mutually exclusive. func resolvePushProjects( pgCfg config.PGConfig, cfg PGPushConfig, ) (projects, exclude []string, err error) { @@ -369,8 +491,81 @@ func resolvePushProjects( return projects, exclude, nil } -// splitProjectList splits a comma-separated string into trimmed, -// non-empty project names. +func resolvePGTargetSelections( + appCfg config.Config, + targetName string, + allTargets bool, +) ([]pgTargetSelection, error) { + if allTargets && strings.TrimSpace(targetName) != "" { + return nil, fmt.Errorf( + "target name cannot be combined with --all", + ) + } + if len(appCfg.PGTargets) == 0 { + if strings.TrimSpace(targetName) != "" { + return nil, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + targetName, + ) + } + return []pgTargetSelection{{ + IsDefault: true, + }}, nil + } + names, defaultName, err := appCfg.PGTargetNames() + if err != nil { + return nil, err + } + selections := make([]pgTargetSelection, 0, len(names)) + for _, name := range names { + selection := pgTargetSelection{ + Name: name, + IsDefault: name == defaultName, + SyncStateTarget: name, + MigrateLegacySyncState: name == defaultName, + } + selections = append(selections, selection) + } + if allTargets { + return selections, nil + } + normalizedTarget := strings.TrimSpace( + strings.ToLower(targetName), + ) + if normalizedTarget == "" { + return selections[:1], nil + } + for _, target := range selections { + if target.Name == normalizedTarget { + return []pgTargetSelection{target}, nil + } + } + return nil, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) +} + +func resolvePGTargetConfig( + appCfg config.Config, + target pgTargetSelection, +) (pgTargetSelection, error) { + var ( + pgCfg config.PGConfig + err error + ) + if target.Name == "" { + pgCfg, err = appCfg.ResolvePG() + } else { + pgCfg, err = appCfg.ResolvePGTarget(target.Name) + } + if err != nil { + return pgTargetSelection{}, err + } + target.PG = pgCfg + return target, nil +} + func splitProjectList(s string) []string { parts := strings.Split(s, ",") out := make([]string, 0, len(parts)) diff --git a/cmd/agentsview/pg_service.go b/cmd/agentsview/pg_service.go index 148e9904c..7705e6d2b 100644 --- a/cmd/agentsview/pg_service.go +++ b/cmd/agentsview/pg_service.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/cobra" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/postgres" ) func newPGServiceCommand() *cobra.Command { @@ -174,22 +175,51 @@ func runServiceStatus() { } ctx := context.Background() out, _ := mgr.status(ctx) - fmt.Print(out) - if out != "" && !strings.HasSuffix(out, "\n") { - fmt.Println() - } // Show the last successful push time from local sync state. appCfg := loadServiceConfig() database, derr := openReadOnlyDB(appCfg) if derr != nil { + writeServiceStatus(os.Stdout, out, "", false) return } defer database.Close() - lastPush, gerr := database.GetSyncState("last_push_at") + lastPush, gerr := readServiceLastPush(appCfg, database) if gerr != nil { + writeServiceStatus(os.Stdout, out, "", false) + return + } + writeServiceStatus(os.Stdout, out, lastPush, true) +} + +func writeServiceStatus( + out io.Writer, + serviceOut, lastPush string, + lastPushAvailable bool, +) { + fmt.Fprint(out, serviceOut) + if serviceOut != "" && !strings.HasSuffix(serviceOut, "\n") { + fmt.Fprintln(out) + } + if !lastPushAvailable { return } - fmt.Printf("Last push: %s\n", valueOrNever(lastPush)) + fmt.Fprintf(out, "Last push: %s\n", valueOrNever(lastPush)) +} + +func readServiceLastPush( + appCfg config.Config, + database postgres.SyncStateStore, +) (string, error) { + targets, err := resolvePGTargetSelections(appCfg, "", false) + if err != nil { + return "", err + } + target := targets[0] + return postgres.ReadLastPushAt( + database, + target.SyncStateTarget, + target.MigrateLegacySyncState, + ) } func runServiceSimple(action string) { diff --git a/cmd/agentsview/pg_service_manager.go b/cmd/agentsview/pg_service_manager.go index 4ae2a8e01..8b8f5aec2 100644 --- a/cmd/agentsview/pg_service_manager.go +++ b/cmd/agentsview/pg_service_manager.go @@ -26,7 +26,8 @@ func rejectEnvDependentServicePGURL(rawURL string) error { if os.Getenv("AGENTSVIEW_PG_URL") != "" { return fmt.Errorf( "AGENTSVIEW_PG_URL is set; pg service install requires a " + - "literal pg.url in config.toml because background " + + "literal PostgreSQL URL in config.toml, either " + + "legacy [pg].url or the default_pg-selected [pg.NAME].url, because background " + "services do not inherit your shell environment", ) } @@ -35,7 +36,8 @@ func rejectEnvDependentServicePGURL(rawURL string) error { if config.IsEnvDependentURL(rawURL) { return fmt.Errorf( "pg.url uses environment variable expansion; pg service " + - "install requires a literal pg.url in config.toml because " + + "install requires a literal PostgreSQL URL in config.toml, either " + + "legacy [pg].url or the default_pg-selected [pg.NAME].url, because " + "background services do not inherit your shell environment", ) } @@ -113,7 +115,11 @@ func isUnsafeServiceRune(r rune) bool { // build a spec when the PG URL is not resolvable so the service is // only ever created in a working state. func buildServiceSpec(appCfg config.Config) (serviceSpec, error) { - if err := rejectEnvDependentServicePGURL(appCfg.PG.URL); err != nil { + rawPG, err := appCfg.RawPGTarget("") + if err != nil { + return serviceSpec{}, err + } + if err := rejectEnvDependentServicePGURL(rawPG.URL); err != nil { return serviceSpec{}, err } pgCfg, err := appCfg.ResolvePG() @@ -122,7 +128,7 @@ func buildServiceSpec(appCfg config.Config) (serviceSpec, error) { } if pgCfg.URL == "" { return serviceSpec{}, fmt.Errorf( - "pg.url not configured; set it before installing the service", + "pg url not configured; configure a legacy [pg].url or the default_pg-selected [pg.NAME].url before installing the service", ) } exe, err := os.Executable() diff --git a/cmd/agentsview/pg_service_test.go b/cmd/agentsview/pg_service_test.go index 24f9b2493..5f27eccb0 100644 --- a/cmd/agentsview/pg_service_test.go +++ b/cmd/agentsview/pg_service_test.go @@ -10,12 +10,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/db" ) func TestBuildServiceSpec_RequiresURL(t *testing.T) { t.Setenv("AGENTSVIEW_PG_URL", "") _, err := buildServiceSpec(config.Config{}) require.Error(t, err, "expected error when pg.url is not configured") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestBuildServiceSpec_PopulatesFields(t *testing.T) { @@ -42,6 +44,30 @@ func TestBuildServiceSpec_PopulatesFields(t *testing.T) { } } +func TestBuildServiceSpec_UsesNamedDefaultTarget(t *testing.T) { + t.Setenv("AGENTSVIEW_PG_URL", "") + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + dataDir := t.TempDir() + spec, err := buildServiceSpec(config.Config{ + DataDir: dataDir, + DefaultPG: "archive", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://u:p@localhost/archive?sslmode=disable", + MachineName: "archivebox", + }, + }, + }) + require.NoError(t, err) + assert.Equal(t, dataDir, spec.DataDir) + assert.Equal(t, filepath.Join(dataDir, "pg-watch.log"), spec.LogPath) + assert.NotEmpty(t, spec.BinPath) +} + func TestBuildServiceSpec_RejectsEnvPGURL(t *testing.T) { t.Setenv("AGENTSVIEW_PG_URL", "postgres://from-env") _, err := buildServiceSpec(config.Config{ @@ -53,7 +79,8 @@ func TestBuildServiceSpec_RejectsEnvPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "AGENTSVIEW_PG_URL") - assert.Contains(t, err.Error(), "literal pg.url") + assert.Contains(t, err.Error(), "literal PostgreSQL URL") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { @@ -68,6 +95,7 @@ func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "environment variable expansion") + assert.Contains(t, err.Error(), "literal PostgreSQL URL") _, err = buildServiceSpec(config.Config{ DataDir: t.TempDir(), @@ -78,6 +106,7 @@ func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "environment variable expansion") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestValidateServiceSpec_RejectsUnsafeChars(t *testing.T) { @@ -183,6 +212,78 @@ func TestWarnUninheritedServiceEnv(t *testing.T) { assert.Contains(t, out, "config.toml") } +func TestReadServiceLastPush_UsesDefaultTargetScope(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err) + defer local.Close() + + require.NoError(t, local.SetSyncState( + "last_push_at:work", + "2026-03-11T12:34:56.123Z", + )) + + lastPush, err := readServiceLastPush(config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + }, local) + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) +} + +func TestReadServiceLastPush_ReadsLegacyDefaultStateWithoutMigration(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err) + defer local.Close() + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + lastPush, err := readServiceLastPush(config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + }, local) + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) + + legacyValue, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) + + scopedValue, err := local.GetSyncState("last_push_at:work") + require.NoError(t, err) + assert.Empty(t, scopedValue) +} + +func TestWriteServiceStatus_AppendsScopedLastPush(t *testing.T) { + var out strings.Builder + writeServiceStatus( + &out, "Service is active", + "2026-03-11T12:34:56.123Z", true, + ) + assert.Equal(t, + "Service is active\nLast push: 2026-03-11T12:34:56.123Z\n", + out.String(), + ) +} + +func TestWriteServiceStatus_PrintsNeverWhenNoPushCompleted(t *testing.T) { + var out strings.Builder + writeServiceStatus(&out, "Service is active\n", "", true) + assert.Equal(t, "Service is active\nLast push: never\n", out.String()) +} + +func TestWriteServiceStatus_SkipsLastPushWhenUnavailable(t *testing.T) { + var out strings.Builder + writeServiceStatus(&out, "Service is active\n", "", false) + assert.Equal(t, "Service is active\n", out.String()) +} + // recordingRunner captures shell-out calls for assertions. type recordingRunner struct { calls [][]string diff --git a/cmd/agentsview/pg_test.go b/cmd/agentsview/pg_test.go index adb8856fa..fb91753e5 100644 --- a/cmd/agentsview/pg_test.go +++ b/cmd/agentsview/pg_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "log" "os" "os/exec" "path/filepath" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/parser" "go.kenn.io/agentsview/internal/postgres" ) @@ -24,6 +26,36 @@ func loadPGServeConfigForTest(t *testing.T, args ...string) (config.Config, stri return loadPGServeConfig(cmd) } +func restoreTestLogger(t *testing.T) { + t.Helper() + oldWriter := log.Writer() + t.Cleanup(func() { + if file, ok := log.Writer().(*os.File); ok && file != os.Stderr && file != os.Stdout { + _ = file.Close() + } + log.SetOutput(oldWriter) + }) +} + +func clearConfiguredAgentEnvVars(t *testing.T) { + t.Helper() + for _, def := range parser.Registry { + if def.EnvVar != "" { + t.Setenv(def.EnvVar, "") + } + } +} + +func isolateDefaultAgentDirs(t *testing.T, root string) { + t.Helper() + t.Setenv("HOME", root) + t.Setenv("USERPROFILE", root) + t.Setenv("APPDATA", root) + t.Setenv("LOCALAPPDATA", root) + t.Setenv("HOMEDRIVE", filepath.VolumeName(root)) + t.Setenv("HOMEPATH", `\`) +} + func TestLoadPGServeConfigDoesNotInheritServeProxySettings(t *testing.T) { dataDir := testDataDir(t) @@ -107,6 +139,150 @@ func TestPGServeConfigAcceptsManagedCaddyFlags(t *testing.T) { assert.Empty(t, basePath, "basePath should be empty") } +func TestRunPGPush_IgnoresBrokenUnselectedTarget(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGPush(PGPushConfig{}, "archive") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") +} + +func TestRunPGStatus_IgnoresBrokenUnselectedTarget(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGStatus("archive", false) + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") +} + +func TestRunPGPushAll_AggregatesTargetFailures(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "work" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGPush(PGPushConfig{AllTargets: true}, "") + require.Error(t, err) + assert.Contains(t, err.Error(), "2 pg target(s) failed") + assert.Contains(t, err.Error(), "work (default): expanding url: environment variable(s) not set: BROKEN_WORK_TARGET") + assert.Contains(t, err.Error(), "archive: pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") +} + +func TestRunPGStatusAll_AggregatesTargetFailures(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "work" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGStatus("", true) + require.Error(t, err) + assert.Contains(t, err.Error(), "2 pg target(s) failed") + assert.Contains(t, err.Error(), "work (default): expanding url: environment variable(s) not set: BROKEN_WORK_TARGET") + assert.Contains(t, err.Error(), "archive: pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") +} + +func TestPGPushCommandPrefixesErrors(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + + _, err := executeCommand(newRootCommand(), "pg", "push", "archive") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg push: pg connection to archive permits plaintext") +} + +func TestPGPushWatchCommandPrefixesErrors(t *testing.T) { + _, err := executeCommand(newRootCommand(), "pg", "push", "--watch", "--all") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg push --watch: --all cannot be combined with --watch") +} + +func TestPGStatusCommandPrefixesErrors(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + + _, err := executeCommand(newRootCommand(), "pg", "status", "archive") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg status: pg connection to archive permits plaintext") +} + func TestRunPGServeRejectsInvalidManagedCaddyConfigBeforePGSetup(t *testing.T) { dataDir := t.TempDir() diff --git a/cmd/agentsview/pg_watch.go b/cmd/agentsview/pg_watch.go index 38ab763aa..664d946e6 100644 --- a/cmd/agentsview/pg_watch.go +++ b/cmd/agentsview/pg_watch.go @@ -110,21 +110,34 @@ func (p *pgPusher) reset() { // resolveWatchTargets validates PG config and resolves the project // filters for a watch run. func resolveWatchTargets( - appCfg config.Config, cfg PGPushConfig, -) (pgCfg config.PGConfig, projects, exclude []string, err error) { - pgCfg, err = appCfg.ResolvePG() + appCfg config.Config, + cfg PGPushConfig, + targetName string, +) ( + target pgTargetSelection, + projects, exclude []string, + err error, +) { + targets, err := resolvePGTargetSelections( + appCfg, targetName, false, + ) + if err != nil { + return pgTargetSelection{}, nil, nil, err + } + target = targets[0] + target, err = resolvePGTargetConfig(appCfg, target) if err != nil { - return config.PGConfig{}, nil, nil, err + return pgTargetSelection{}, nil, nil, err } - if pgCfg.URL == "" { - return config.PGConfig{}, nil, nil, + if target.PG.URL == "" { + return pgTargetSelection{}, nil, nil, fmt.Errorf("url not configured") } - projects, exclude, err = resolvePushProjects(pgCfg, cfg) + projects, exclude, err = resolvePushProjects(target.PG, cfg) if err != nil { - return config.PGConfig{}, nil, nil, err + return pgTargetSelection{}, nil, nil, err } - return pgCfg, projects, exclude, nil + return target, projects, exclude, nil } const ( @@ -135,19 +148,24 @@ const ( // runPGPushWatch runs the long-lived auto-push daemon: an initial // catch-up push, then pushes triggered by file changes (debounced) // and a periodic floor tick, until interrupted. -func runPGPushWatch(cfg PGPushConfig) { +func runPGPushWatch( + cfg PGPushConfig, + targetName string, +) error { appCfg, err := config.LoadMinimal() if err != nil { - log.Fatalf("loading config: %v", err) + return fmt.Errorf("loading config: %w", err) } if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + return fmt.Errorf("creating data dir: %w", err) } setupLogFileNamed(appCfg.DataDir, "pg-watch.log") - pgCfg, projects, exclude, err := resolveWatchTargets(appCfg, cfg) + target, projects, exclude, err := resolveWatchTargets( + appCfg, cfg, targetName, + ) if err != nil { - fatal("pg push --watch: %v", err) + return err } debounce := cfg.Debounce @@ -165,15 +183,15 @@ func runPGPushWatch(cfg PGPushConfig) { Prefix: "pg-watch", }).LockPath() if err != nil { - fatal("pg push --watch: %v", err) + return err } lock := flock.New(lockPath) locked, err := lock.TryLock() if err != nil { - fatal("pg push --watch: locking %s: %v", lockPath, err) + return fmt.Errorf("locking %s: %w", lockPath, err) } if !locked { - fatal("pg push --watch: already locked (%s)", lockPath) + return fmt.Errorf("already locked (%s)", lockPath) } defer func() { if rerr := lock.Unlock(); rerr != nil { @@ -188,17 +206,18 @@ func runPGPushWatch(cfg PGPushConfig) { log.Printf( "pg watch: starting (machine=%q debounce=%s interval=%s)", - pgCfg.MachineName, debounce, interval, + target.PG.MachineName, debounce, interval, ) backend, cleanup, err := resolveArchiveWriteBackend(ctx, appCfg) if err != nil { - fatal("opening writer: %v", err) + return fmt.Errorf("opening writer: %w", err) } defer cleanup() if err := backend.PGPushWatch( - ctx, pgCfg, cfg, projects, exclude, debounce, interval, + ctx, target, cfg, projects, exclude, debounce, interval, ); err != nil { - fatal("pg push --watch: %v", err) + return err } + return nil } diff --git a/cmd/agentsview/pg_watch_test.go b/cmd/agentsview/pg_watch_test.go index a03876e2e..932506869 100644 --- a/cmd/agentsview/pg_watch_test.go +++ b/cmd/agentsview/pg_watch_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "net/http" + "os" "path/filepath" "testing" "time" @@ -82,6 +83,8 @@ func TestArchiveWriteBackendPGPushPostsToDaemon(t *testing.T) { assert.Equal(t, "mirror", req.PG.Schema) assert.Equal(t, "laptop", req.PG.MachineName) assert.True(t, req.PG.AllowInsecure) + assert.Equal(t, "work", req.SyncStateTarget) + assert.True(t, req.MigrateLegacySyncState) writeTestJSON(t, w, postgres.PushResult{ SessionsPushed: 2, MessagesPushed: 3, @@ -94,11 +97,15 @@ func TestArchiveWriteBackendPGPushPostsToDaemon(t *testing.T) { ) result, err := backend.PGPush( context.Background(), - config.PGConfig{ - URL: "postgres://user:pass@host/db", - Schema: "mirror", - MachineName: "laptop", - AllowInsecure: true, + pgTargetSelection{ + PG: config.PGConfig{ + URL: "postgres://user:pass@host/db", + Schema: "mirror", + MachineName: "laptop", + AllowInsecure: true, + }, + SyncStateTarget: "work", + MigrateLegacySyncState: true, }, PGPushConfig{Full: true}, []string{"a"}, @@ -162,7 +169,11 @@ func TestArchiveWriteBackendPGPushWatchReResolvesDaemon(t *testing.T) { ) err := backend.PGPushWatch( ctx, - config.PGConfig{URL: "postgres://user:pass@host/db"}, + pgTargetSelection{ + PG: config.PGConfig{ + URL: "postgres://user:pass@host/db", + }, + }, PGPushConfig{}, nil, nil, @@ -306,7 +317,9 @@ func TestPgPusher_LogsSkippedConflicts(t *testing.T) { func TestResolveWatchTargets_ErrorsOnEmptyURL(t *testing.T) { appCfg := config.Config{} // no PG URL - _, _, _, err := resolveWatchTargets(appCfg, PGPushConfig{}) + _, _, _, err := resolveWatchTargets( + appCfg, PGPushConfig{}, "", + ) require.Error(t, err, "expected error when url not configured") } @@ -317,10 +330,139 @@ func TestResolveWatchTargets_ResolvesProjects(t *testing.T) { MachineName: "box1", }, } - pg, inc, _, err := resolveWatchTargets( - appCfg, PGPushConfig{ProjectsFlag: "a,b"}, + target, inc, _, err := resolveWatchTargets( + appCfg, PGPushConfig{ProjectsFlag: "a,b"}, "", ) require.NoError(t, err) - assert.NotEmpty(t, pg.URL, "expected resolved URL") + assert.NotEmpty(t, target.PG.URL, "expected resolved URL") assert.Equal(t, []string{"a", "b"}, inc) } + +func TestResolveWatchTargets_IgnoresBrokenUnselectedTarget(t *testing.T) { + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + } + + target, _, _, err := resolveWatchTargets( + appCfg, PGPushConfig{}, "archive", + ) + require.NoError(t, err) + assert.Equal(t, "archive", target.Name) + assert.Equal(t, "postgres://archive", target.PG.URL) +} + +func TestResolvePGTargetSelections_DefaultAndAll(t *testing.T) { + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work", MachineName: "workbox"}, + "archive": {URL: "postgres://archive", MachineName: "archivebox"}, + }, + } + + defaultTarget, err := resolvePGTargetSelections( + appCfg, "", false, + ) + require.NoError(t, err) + require.Len(t, defaultTarget, 1) + assert.Equal(t, "work", defaultTarget[0].Name) + assert.True(t, defaultTarget[0].IsDefault) + assert.Equal(t, "work", defaultTarget[0].SyncStateTarget) + assert.True(t, defaultTarget[0].MigrateLegacySyncState) + assert.Empty(t, defaultTarget[0].PG.URL) + + allTargets, err := resolvePGTargetSelections( + appCfg, "", true, + ) + require.NoError(t, err) + require.Len(t, allTargets, 2) + assert.Equal(t, "work", allTargets[0].Name) + assert.Equal(t, "archive", allTargets[1].Name) +} + +func TestResolvePGTargetConfig_IgnoresBrokenUnselectedTarget(t *testing.T) { + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + } + + target, err := resolvePGTargetConfig( + appCfg, + pgTargetSelection{Name: "archive"}, + ) + require.NoError(t, err) + assert.Equal(t, "postgres://archive", target.PG.URL) +} + +func restoreUnsetEnv(t *testing.T, name string) { + t.Helper() + oldValue, hadValue := os.LookupEnv(name) + require.NoError(t, os.Unsetenv(name)) + t.Cleanup(func() { + if hadValue { + require.NoError(t, os.Setenv(name, oldValue)) + return + } + require.NoError(t, os.Unsetenv(name)) + }) +} + +func TestResolvePGTargetSelections_RejectsLegacyNamedLookup(t *testing.T) { + appCfg := config.Config{ + PG: config.PGConfig{ + URL: "postgres://legacy", + MachineName: "legacybox", + }, + } + + _, err := resolvePGTargetSelections( + appCfg, "archive", false, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "single legacy [pg] block") +} + +func TestResolvePGTargetSelections_RejectsTargetWithAll(t *testing.T) { + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + } + + _, err := resolvePGTargetSelections( + appCfg, "work", true, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot be combined with --all") +} + +func TestNewPGPushCommandRejectsAllWatch(t *testing.T) { + cmd := newPGPushCommand() + cmd.SetArgs([]string{"--all", "--watch"}) + + err := cmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "--all cannot be combined with --watch") +} diff --git a/cmd/agentsview/session.go b/cmd/agentsview/session.go index 82f2686e2..b830f21e5 100644 --- a/cmd/agentsview/session.go +++ b/cmd/agentsview/session.go @@ -181,7 +181,7 @@ func resolvePGReadConfig( } if pgCfg.URL == "" { return config.PGConfig{}, false, errors.New( - "pg url not configured; set AGENTSVIEW_PG_URL or [pg].url", + "pg url not configured; set AGENTSVIEW_PG_URL, use a legacy [pg].url, or configure default_pg with named [pg.NAME] targets", ) } return pgCfg, true, nil diff --git a/docs/commands.md b/docs/commands.md index 6c4982599..759500824 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -397,7 +397,7 @@ Sync sessions from local SQLite to PostgreSQL. See [PostgreSQL Sync](/pg-sync/) for full documentation. ```bash -agentsview pg push [flags] +agentsview pg push [target] [flags] ``` | Flag | Default | Description | @@ -406,6 +406,7 @@ agentsview pg push [flags] | `--projects` | | Comma-separated projects to push (inclusive) | | `--exclude-projects` | | Comma-separated projects to exclude from push | | `--all-projects` | `false` | Ignore configured project filters for this run | +| `--all` | `false` | Push every configured PostgreSQL target sequentially | | `--watch` | `false` | Run continuously, pushing on change plus a periodic floor | | `--debounce` | `30s` | Coalesce window after a change before pushing (`--watch` only) | | `--interval` | `15m` | Periodic floor push interval (`--watch` only) | @@ -420,7 +421,7 @@ for details on how filtering interacts with the push watermark. Show PostgreSQL sync status. ```bash -agentsview pg status +agentsview pg status [target] [--all] ``` --- @@ -688,7 +689,7 @@ target an explicit running daemon, `AGENTSVIEW_SERVER_TOKEN` or `--server-token-file ` when that daemon requires auth, or `--pg` to read from configured PostgreSQL. -`AGENTSVIEW_PG_URL` and `[pg].url` are sync configuration only; they +`AGENTSVIEW_PG_URL`, a legacy `[pg].url`, or the effective default target from `default_pg` plus `[pg.NAME]` are sync configuration only; they do not change the default read path. Read commands use local SQLite unless `--pg` is supplied, in which case they fail fast if no connection URL is available. Mutating commands such as `session sync` diff --git a/docs/pg-sync.md b/docs/pg-sync.md index a6c307efa..0af38767b 100644 --- a/docs/pg-sync.md +++ b/docs/pg-sync.md @@ -30,6 +30,12 @@ The `machine_name` identifies which machine pushed each session. It defaults to the system hostname if omitted. It must not be `"local"` (reserved for the local SQLite sentinel). +For multiple PostgreSQL destinations, use named `[pg.NAME]` blocks and +`default_pg` instead of the legacy single `[pg]` block. Named target names +are normalized case-insensitively, and `all`, `local`, plus the legacy `[pg]` +field names `url`, `schema`, `machine_name`, `allow_insecure`, `projects`, and +`exclude_projects` are unavailable as `[pg.NAME]` names. + ### 2. Push Sessions ```bash @@ -71,11 +77,12 @@ entirely by PostgreSQL. No local SQLite, file watching, or uploads Sync sessions from the local SQLite database to PostgreSQL. ```bash -agentsview pg push [flags] +agentsview pg push [target] [flags] ``` | Flag | Default | Description | |------|---------|-------------| +| `--all` | `false` | Push every configured PG target sequentially | | `--full` | `false` | Force full local resync and re-push, bypassing change detection | | `--projects` | | Comma-separated projects to push (inclusive) | | `--exclude-projects` | | Comma-separated projects to exclude | @@ -88,6 +95,11 @@ Without `--watch`, push is on-demand — run it whenever you want to sync. With `--watch`, the command stays in the foreground and keeps pushing until interrupted. +When no target is passed, `pg push` uses the effective default target. +Pass one named target explicitly to push just that destination, or use +`--all` to fan out across every configured target. `--all --watch` is +rejected. + **What happens on push:** 1. Runs a local sync to pick up any new or modified session files @@ -138,8 +150,9 @@ Operational details: bounded final flush. - Logs are written to `pg-watch.log` under the AgentsView data directory. -- The watcher uses the same `[pg]` config, machine name, project - filters, classifier settings, and +- The watcher uses the selected PostgreSQL target, or the + `default_pg` target when no name is passed, along with the same + machine name, project filters, classifier settings, and `result_content_blocked_categories` behavior as one-shot `pg push`. @@ -214,9 +227,14 @@ filter). Show the current sync state. ```bash -agentsview pg status +agentsview pg status [target] +agentsview pg status --all ``` +Without a target name, `pg status` uses the effective default target. +Pass one named target explicitly to inspect that destination, or use +`--all` to print every configured target sequentially. + Output: ``` @@ -259,8 +277,9 @@ The generated unit runs `agentsview pg push --watch`, pins and writes logs to `~/.agentsview/pg-watch.log` unless you changed the data directory. -`install` requires a literal `[pg].url` in -`~/.agentsview/config.toml`. It intentionally rejects +`install` requires a literal PostgreSQL URL in the effective default target of +`~/.agentsview/config.toml`, either the legacy `[pg].url` or the target selected +by `default_pg` from named `[pg.NAME]` blocks. It intentionally rejects `AGENTSVIEW_PG_URL` and environment-expanded URLs such as `${PG_PASSWORD}` because background services do not inherit your interactive shell environment. Other session-directory environment @@ -396,7 +415,7 @@ filter in the sidebar to show sessions from specific machines. ## Configuration -All PostgreSQL settings live in the `[pg]` section of +Single-target PostgreSQL settings can live in the legacy `[pg]` section of `~/.agentsview/config.toml`: ```toml @@ -416,6 +435,29 @@ allow_insecure = false | `projects` | | Array of project names to include in push | | `exclude_projects` | | Array of project names to exclude from push | +To manage more than one PostgreSQL destination, define named `[pg.NAME]` blocks +and select the effective default target with `default_pg`: + +```toml +default_pg = "work" + +[pg.work] +url = "postgres://user:pass@work-db:5432/agentsview?sslmode=require" +machine_name = "my-laptop" + +[pg.archive] +url = "postgres://user:pass@archive-db:5432/agentsview?sslmode=require" +machine_name = "my-laptop-archive" +exclude_projects = ["scratch"] +``` + +`agentsview pg push` and `agentsview pg status` use the effective default target +when no target name is passed, accept one target name explicitly, and also +support `--all` for sequential multi-target runs. `agentsview pg push --watch` +follows the effective default target unless you pass one named target +explicitly. `agentsview pg serve` and `agentsview pg service` stay on the +effective default target in this release, and `--all --watch` is rejected. + !!! warning The `url` field is required for all `pg` commands. If it contains credentials, ensure `config.toml` has restricted @@ -437,7 +479,9 @@ url = "postgres://${PG_USER}:${PG_PASSWORD}@host:5432/dbname?sslmode=require" ### Environment Variables PostgreSQL settings can also be configured via environment -variables, which override `config.toml` values: +variables. In legacy single-target mode they override the `[pg]` +values. In named-target mode they apply only to the effective default +target: | Variable | Description | |----------|-------------| diff --git a/internal/config/config.go b/internal/config/config.go index a6fa9909a..7559d9cd4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ import ( "path/filepath" "regexp" "slices" + "sort" "strconv" "strings" "sync" @@ -66,6 +67,29 @@ type PGConfig struct { ExcludeProjects []string `toml:"exclude_projects" json:"exclude_projects,omitempty"` } +type pgEnvOverrides struct { + URL string + Schema string + MachineName string +} + +// ResolvedPGTarget is one PostgreSQL target after target selection, +// defaulting, and default-target env overrides are applied. +type ResolvedPGTarget struct { + Name string + Config PGConfig + IsDefault bool +} + +var pgConfigKeys = map[string]struct{}{ + "url": {}, + "schema": {}, + "machine_name": {}, + "allow_insecure": {}, + "projects": {}, + "exclude_projects": {}, +} + // DuckDBConfig holds DuckDB mirror and Quack connection settings. type DuckDBConfig struct { Path string `toml:"path" json:"path"` @@ -125,6 +149,8 @@ type Config struct { DisableUpdateCheck bool `json:"disable_update_check" toml:"disable_update_check"` NoSync bool `json:"-" toml:"-"` PG PGConfig `json:"pg,omitempty" toml:"pg"` + DefaultPG string `json:"default_pg,omitempty" toml:"default_pg"` + PGTargets map[string]PGConfig `json:"-" toml:"-"` DuckDB DuckDBConfig `json:"duckdb,omitempty" toml:"duckdb"` Automated AutomatedConfig `json:"automated,omitempty" toml:"automated"` Agent map[string]AgentConfig `json:"agent,omitempty" toml:"agent"` @@ -160,6 +186,8 @@ type Config struct { // Used to prevent auto-bind to 0.0.0.0 when the user // explicitly requested a specific host. HostExplicit bool `json:"-" toml:"-"` + + pgEnvOverrides pgEnvOverrides } type dirSource int @@ -532,6 +560,7 @@ func (c *Config) applyConfigTOML(data string) error { RequireAuth bool `toml:"require_auth"` RemoteAccess bool `toml:"remote_access"` DisableUpdateCheck bool `toml:"disable_update_check"` + DefaultPG string `toml:"default_pg"` PG PGConfig `toml:"pg"` DuckDB DuckDBConfig `toml:"duckdb"` Automated AutomatedConfig `toml:"automated"` @@ -544,6 +573,10 @@ func (c *Config) applyConfigTOML(data string) error { if err != nil { return fmt.Errorf("parsing config: %w", err) } + var raw map[string]any + if _, err := toml.Decode(data, &raw); err != nil { + return fmt.Errorf("parsing config raw: %w", err) + } if file.GithubToken != "" { c.GithubToken = file.GithubToken } @@ -576,25 +609,36 @@ func (c *Config) applyConfigTOML(data string) error { } c.RequireAuth = file.RequireAuth || file.RemoteAccess c.DisableUpdateCheck = file.DisableUpdateCheck - // Merge pg field-by-field so env vars override only - // the fields they set, preserving config-file settings. - if file.PG.URL != "" && c.PG.URL == "" { - c.PG.URL = file.PG.URL - } - if file.PG.Schema != "" && c.PG.Schema == "" { - c.PG.Schema = file.PG.Schema - } - if file.PG.MachineName != "" && c.PG.MachineName == "" { - c.PG.MachineName = file.PG.MachineName + if meta.IsDefined("default_pg") { + c.DefaultPG = normalizePGTargetName(file.DefaultPG) } - if file.PG.AllowInsecure { - c.PG.AllowInsecure = true - } - if file.PG.Projects != nil && c.PG.Projects == nil { - c.PG.Projects = file.PG.Projects + legacyPG, namedPG, err := parsePGConfigSection(raw["pg"]) + if err != nil { + return fmt.Errorf("pg: %w", err) } - if file.PG.ExcludeProjects != nil && c.PG.ExcludeProjects == nil { - c.PG.ExcludeProjects = file.PG.ExcludeProjects + if len(namedPG) > 0 { + c.PG = PGConfig{} + c.PGTargets = namedPG + } else { + c.PGTargets = nil + if legacyPG.URL != "" { + c.PG.URL = legacyPG.URL + } + if legacyPG.Schema != "" { + c.PG.Schema = legacyPG.Schema + } + if legacyPG.MachineName != "" { + c.PG.MachineName = legacyPG.MachineName + } + if legacyPG.AllowInsecure { + c.PG.AllowInsecure = true + } + if legacyPG.Projects != nil { + c.PG.Projects = legacyPG.Projects + } + if legacyPG.ExcludeProjects != nil { + c.PG.ExcludeProjects = legacyPG.ExcludeProjects + } } // Merge duckdb field-by-field so env vars override only // the fields they set, preserving config-file settings. @@ -658,10 +702,6 @@ func (c *Config) applyConfigTOML(data string) error { // Parse config-file dir arrays for agents that have a // ConfigKey. Only apply when not already set by env var. - var raw map[string]any - if _, err := toml.Decode(data, &raw); err != nil { - return fmt.Errorf("parsing config raw: %w", err) - } for _, def := range parser.Registry { if def.ConfigKey == "" { continue @@ -795,13 +835,13 @@ func (c *Config) loadEnv() { c.DataDir = v } if v := os.Getenv("AGENTSVIEW_PG_URL"); v != "" { - c.PG.URL = v + c.pgEnvOverrides.URL = v } if v := os.Getenv("AGENTSVIEW_PG_SCHEMA"); v != "" { - c.PG.Schema = v + c.pgEnvOverrides.Schema = v } if v := os.Getenv("AGENTSVIEW_PG_MACHINE"); v != "" { - c.PG.MachineName = v + c.pgEnvOverrides.MachineName = v } if v := os.Getenv("AGENTSVIEW_DUCKDB_PATH"); v != "" { c.DuckDB.Path = v @@ -1306,6 +1346,109 @@ func hostLiteral(host string) string { return host } +func normalizePGTargetName(name string) string { + return strings.TrimSpace(strings.ToLower(name)) +} + +func isReservedPGTargetName(name string) bool { + switch normalizePGTargetName(name) { + case "all", "local": + return true + default: + return false + } +} + +func decodePGConfigMap(raw map[string]any) (PGConfig, error) { + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(raw); err != nil { + return PGConfig{}, fmt.Errorf("encoding pg config: %w", err) + } + var cfg PGConfig + if _, err := toml.Decode(buf.String(), &cfg); err != nil { + return PGConfig{}, fmt.Errorf("decoding pg config: %w", err) + } + return cfg, nil +} + +func parsePGConfigSection(value any) (PGConfig, map[string]PGConfig, error) { + if value == nil { + return PGConfig{}, nil, nil + } + section, ok := value.(map[string]any) + if !ok { + return PGConfig{}, nil, fmt.Errorf("expected [pg] to be a table") + } + hasLegacyFields := false + hasNamedTargets := false + legacyRaw := make(map[string]any) + namedTargets := make(map[string]PGConfig) + seenNames := make(map[string]string) + for rawName, rawValue := range section { + name := normalizePGTargetName(rawName) + if _, ok := pgConfigKeys[name]; ok { + if _, nested := rawValue.(map[string]any); nested { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s must be a scalar or array field, not a nested table", + rawName, + ) + } + hasLegacyFields = true + legacyRaw[rawName] = rawValue + continue + } + targetRaw, ok := rawValue.(map[string]any) + if !ok { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s must be a named target table", + rawName, + ) + } + hasNamedTargets = true + if name == "" { + return PGConfig{}, nil, fmt.Errorf( + "named PG targets must not be blank", + ) + } + if isReservedPGTargetName(name) { + return PGConfig{}, nil, fmt.Errorf( + "named PG target %q is reserved", + name, + ) + } + if prev, exists := seenNames[name]; exists { + return PGConfig{}, nil, fmt.Errorf( + "named PG targets %q and %q normalize to the same name %q", + prev, rawName, name, + ) + } + seenNames[name] = rawName + targetCfg, err := decodePGConfigMap(targetRaw) + if err != nil { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s: %w", rawName, err, + ) + } + namedTargets[name] = targetCfg + } + if hasLegacyFields && hasNamedTargets { + return PGConfig{}, nil, fmt.Errorf( + "cannot mix legacy [pg] fields with named [pg.NAME] targets", + ) + } + if hasLegacyFields { + legacyCfg, err := decodePGConfigMap(legacyRaw) + if err != nil { + return PGConfig{}, nil, err + } + return legacyCfg, nil, nil + } + if hasNamedTargets { + return PGConfig{}, namedTargets, nil + } + return PGConfig{}, nil, nil +} + // ResolveDataDir returns the effective data directory by applying // defaults and environment overrides, without reading any files. // Use this to determine where migration should target before @@ -1321,10 +1464,114 @@ func ResolveDataDir() (string, error) { return cfg.DataDir, nil } -// ResolvePG returns a copy of PG config with defaults applied -// and environment variables expanded in URL. -func (c *Config) ResolvePG() (PGConfig, error) { - pg := c.PG +// DefaultPGTargetName returns the effective named PG target for this config. +func (c *Config) DefaultPGTargetName() (string, error) { + if len(c.PGTargets) == 0 { + if c.DefaultPG != "" { + return "", fmt.Errorf( + "default_pg requires named [pg.NAME] targets", + ) + } + return "", nil + } + if c.DefaultPG != "" { + if _, ok := c.PGTargets[c.DefaultPG]; !ok { + return "", fmt.Errorf( + "default_pg %q does not match any named [pg.NAME] target", + c.DefaultPG, + ) + } + return c.DefaultPG, nil + } + if len(c.PGTargets) == 1 { + for name := range c.PGTargets { + return name, nil + } + } + return "", fmt.Errorf( + "default_pg is required when more than one [pg.NAME] target is defined", + ) +} + +func (c *Config) validatePGTargets() error { + _, err := c.DefaultPGTargetName() + return err +} + +func (c *Config) PGTargetNames() ([]string, string, error) { + if err := c.validatePGTargets(); err != nil { + return nil, "", err + } + if len(c.PGTargets) == 0 { + return nil, "", nil + } + defaultName, err := c.DefaultPGTargetName() + if err != nil { + return nil, "", err + } + names := make([]string, 0, len(c.PGTargets)) + for name := range c.PGTargets { + names = append(names, name) + } + sort.Slice(names, func(i, j int) bool { + if names[i] == defaultName { + return true + } + if names[j] == defaultName { + return false + } + return names[i] < names[j] + }) + return names, defaultName, nil +} + +// RawPGTarget returns the configured PG target before env expansion and +// default-field synthesis. An empty name selects the effective default target. +func (c *Config) RawPGTarget(name string) (PGConfig, error) { + if err := c.validatePGTargets(); err != nil { + return PGConfig{}, err + } + targetName := normalizePGTargetName(name) + if len(c.PGTargets) == 0 { + if targetName != "" { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + name, + ) + } + return c.PG, nil + } + if targetName == "" { + var err error + targetName, err = c.DefaultPGTargetName() + if err != nil { + return PGConfig{}, err + } + } + targetCfg, ok := c.PGTargets[targetName] + if !ok { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) + } + return targetCfg, nil +} + +func (c *Config) resolvePGConfig( + pg PGConfig, applyDefaultEnv bool, +) (PGConfig, error) { + if applyDefaultEnv { + if c.pgEnvOverrides.URL != "" { + pg.URL = c.pgEnvOverrides.URL + } + if c.pgEnvOverrides.Schema != "" { + pg.Schema = c.pgEnvOverrides.Schema + } + if c.pgEnvOverrides.MachineName != "" { + pg.MachineName = c.pgEnvOverrides.MachineName + } + } if pg.URL != "" { expanded, err := expandBracedEnv(pg.URL) if err != nil { @@ -1345,6 +1592,83 @@ func (c *Config) ResolvePG() (PGConfig, error) { return pg, nil } +// ResolvePG returns the effective default PG target with defaults applied +// and environment variables expanded in URL. +func (c *Config) ResolvePG() (PGConfig, error) { + return c.ResolvePGTarget("") +} + +// ResolvePGTarget resolves one named PG target, or the effective default +// target when name is empty. In legacy single-target mode, only the empty +// name is valid. +func (c *Config) ResolvePGTarget(name string) (PGConfig, error) { + if err := c.validatePGTargets(); err != nil { + return PGConfig{}, err + } + targetName := normalizePGTargetName(name) + if len(c.PGTargets) == 0 { + if targetName != "" { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + name, + ) + } + return c.resolvePGConfig(c.PG, true) + } + defaultName, err := c.DefaultPGTargetName() + if err != nil { + return PGConfig{}, err + } + if targetName == "" { + targetName = defaultName + } + targetCfg, ok := c.PGTargets[targetName] + if !ok { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) + } + return c.resolvePGConfig(targetCfg, targetName == defaultName) +} + +// ResolvePGTargets resolves every configured PG target. Legacy single-target +// mode returns one unnamed default target. +func (c *Config) ResolvePGTargets() ([]ResolvedPGTarget, error) { + if err := c.validatePGTargets(); err != nil { + return nil, err + } + if len(c.PGTargets) == 0 { + pg, err := c.resolvePGConfig(c.PG, true) + if err != nil { + return nil, err + } + return []ResolvedPGTarget{{ + Config: pg, + IsDefault: true, + }}, nil + } + names, defaultName, err := c.PGTargetNames() + if err != nil { + return nil, err + } + targets := make([]ResolvedPGTarget, 0, len(names)) + for _, name := range names { + targetCfg, err := c.resolvePGConfig( + c.PGTargets[name], name == defaultName, + ) + if err != nil { + return nil, err + } + targets = append(targets, ResolvedPGTarget{ + Name: name, + Config: targetCfg, + IsDefault: name == defaultName, + }) + } + return targets, nil +} + // ResolveDuckDB returns a copy of DuckDB config with defaults applied // and environment variables expanded in path, URL, and token. func (c *Config) ResolveDuckDB() (DuckDBConfig, error) { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index aaa5e9a45..3938b336d 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -861,8 +861,182 @@ func TestLoadFile_PGConfig(t *testing.T) { cfg := f.LoadMinimal(t) - assert.Equal(t, tt.want.URL, cfg.PG.URL) - assert.Equal(t, tt.want.MachineName, cfg.PG.MachineName) + resolved, err := cfg.ResolvePG() + require.NoError(t, err) + + assert.Equal(t, tt.want.URL, resolved.URL) + if tt.want.MachineName == "" { + assert.NotEmpty(t, resolved.MachineName) + } else { + assert.Equal(t, tt.want.MachineName, resolved.MachineName) + } + }) + } +} + +func TestResolvePGTarget_NamedTargets(t *testing.T) { + cfg := Config{ + DefaultPG: "work", + PGTargets: map[string]PGConfig{ + "work": { + URL: "postgres://work", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + pgEnvOverrides: pgEnvOverrides{ + URL: "postgres://env-default", + MachineName: "envbox", + }, + } + + defaultTarget, err := cfg.ResolvePG() + require.NoError(t, err) + assert.Equal(t, "postgres://env-default", defaultTarget.URL) + assert.Equal(t, "envbox", defaultTarget.MachineName) + + archiveTarget, err := cfg.ResolvePGTarget("archive") + require.NoError(t, err) + assert.Equal(t, "postgres://archive", archiveTarget.URL) + assert.Equal(t, "archivebox", archiveTarget.MachineName) +} + +func TestResolvePGTargets_DefaultFirst(t *testing.T) { + cfg := Config{ + DefaultPG: "work", + PGTargets: map[string]PGConfig{ + "archive": {URL: "postgres://archive"}, + "work": {URL: "postgres://work"}, + }, + } + + targets, err := cfg.ResolvePGTargets() + require.NoError(t, err) + require.Len(t, targets, 2) + assert.Equal(t, "work", targets[0].Name) + assert.True(t, targets[0].IsDefault) + assert.Equal(t, "archive", targets[1].Name) + assert.False(t, targets[1].IsDefault) +} + +func TestResolvePGTargets_OneNamedTargetWithoutDefault(t *testing.T) { + cfg := Config{ + PGTargets: map[string]PGConfig{ + "work": {URL: "postgres://work"}, + }, + } + + targets, err := cfg.ResolvePGTargets() + require.NoError(t, err) + require.Len(t, targets, 1) + assert.Equal(t, "work", targets[0].Name) + assert.True(t, targets[0].IsDefault) +} + +func TestResolvePGTargets_MultipleNamedTargetsRequireDefault(t *testing.T) { + cfg := Config{ + PGTargets: map[string]PGConfig{ + "work": {URL: "postgres://work"}, + "archive": {URL: "postgres://archive"}, + }, + } + + _, err := cfg.ResolvePGTargets() + require.Error(t, err) + assert.Contains(t, err.Error(), "default_pg is required") +} + +func TestLoadMinimal_DefersNamedPGValidationForNonPGCommands(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + data := []byte(` +default_pg = "missing" + +[pg.work] +url = "postgres://work" +`) + require.NoError(t, os.WriteFile(path, data, 0o600)) + + cfg, err := LoadMinimal() + require.NoError(t, err) + + _, err = cfg.ResolvePG() + require.Error(t, err) + assert.Contains(t, err.Error(), `default_pg "missing" does not match any named [pg.NAME] target`) +} + +func TestLoadFile_PGMixedLegacyAndNamedTargetsFails(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + data := []byte(` +[pg] +url = "postgres://legacy" + +[pg.archive] +url = "postgres://archive" +`) + require.NoError(t, os.WriteFile(path, data, 0o600)) + + _, err := LoadMinimal() + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot mix legacy [pg] fields with named [pg.NAME] targets") +} + +func TestLoadFile_PGNamedTargetValidationErrors(t *testing.T) { + tests := []struct { + name string + toml string + wantErr string + }{ + { + name: "reserved all target", + toml: ` +[pg.all] +url = "postgres://all" +`, + wantErr: `named PG target "all" is reserved`, + }, + { + name: "reserved local target", + toml: ` +[pg.local] +url = "postgres://local" +`, + wantErr: `named PG target "local" is reserved`, + }, + { + name: "duplicate normalized target names", + toml: ` +[pg.Work] +url = "postgres://work" + +[pg.work] +url = "postgres://work2" +`, + wantErr: `normalize to the same name "work"`, + }, + { + name: "named target must be table", + toml: ` +[pg] +archive = "postgres://archive" +`, + wantErr: `[pg].archive must be a named target table`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + require.NoError(t, os.WriteFile(path, []byte(tt.toml), 0o600)) + + _, err := LoadMinimal() + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) }) } } diff --git a/internal/config/pg_target_case_test.go b/internal/config/pg_target_case_test.go new file mode 100644 index 000000000..c63b16a18 --- /dev/null +++ b/internal/config/pg_target_case_test.go @@ -0,0 +1,39 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadFile_PGLegacyFieldNamesRejectNestedTablesCaseInsensitive(t *testing.T) { + tests := []struct { + name string + field string + wantErr string + }{ + { + name: "url", + field: "URL", + wantErr: "[pg].URL must be a scalar or array field, not a nested table", + }, + { + name: "schema", + field: "Schema", + wantErr: "[pg].Schema must be a scalar or array field, not a nested table", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, _, err := parsePGConfigSection(map[string]any{ + tt.field: map[string]any{ + "url": "postgres://nested", + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) + }) + } +} diff --git a/internal/parser/gemini_parser_test.go b/internal/parser/gemini_parser_test.go index bb0b64ed4..547f80d86 100644 --- a/internal/parser/gemini_parser_test.go +++ b/internal/parser/gemini_parser_test.go @@ -563,4 +563,4 @@ func TestParseGeminiSession_ContextTokensDelta(t *testing.T) { assert.False(t, msgs[1].HasContextTokens) assert.Equal(t, 0, msgs[1].ContextTokens) }) -} \ No newline at end of file +} diff --git a/internal/postgres/connect.go b/internal/postgres/connect.go index 8068cb553..94828d777 100644 --- a/internal/postgres/connect.go +++ b/internal/postgres/connect.go @@ -49,7 +49,7 @@ func CheckSSL(dsn string) error { "pg connection to %s permits plaintext; "+ "set sslmode=require (or verify-full) "+ "for non-local hosts, "+ - "or set allow_insecure = true under [pg] "+ + "or set allow_insecure = true under [pg] or [pg.NAME] "+ "in config to override", cfg.Host, ) diff --git a/internal/postgres/push.go b/internal/postgres/push.go index f1059bedc..afacf5c28 100644 --- a/internal/postgres/push.go +++ b/internal/postgres/push.go @@ -35,14 +35,6 @@ const ( var errSessionOwnershipConflict = errors.New("session ownership conflict") -// syncStateStore abstracts sync state read/write operations on the -// local database. Used by push boundary state helpers. -type syncStateStore interface { - GetSyncState(key string) (string, error) - SetSyncState(key, value string) error - GetOrCreateSyncState(key, defaultValue string) (string, error) -} - type pushBoundaryState struct { Cutoff string `json:"cutoff"` Fingerprints map[string]string `json:"fingerprints"` @@ -75,6 +67,7 @@ func (s *Sync) Push( ) (PushResult, error) { start := time.Now() var result PushResult + state := s.effectiveSyncState() if err := CheckDataVersionCompat(ctx, s.pg); err != nil { return result, err @@ -84,13 +77,13 @@ func (s *Sync) Push( return result, err } - lastPush, err := s.local.GetSyncState("last_push_at") + lastPush, err := state.GetSyncState("last_push_at") if err != nil { return result, fmt.Errorf( "reading last_push_at: %w", err, ) } - storedTargetFingerprint, err := s.local.GetSyncState( + storedTargetFingerprint, err := state.GetSyncState( lastPushTargetFingerprintKey, ) if err != nil { @@ -99,7 +92,7 @@ func (s *Sync) Push( lastPushTargetFingerprintKey, err, ) } - boundaryState, err := s.local.GetSyncState( + boundaryState, err := state.GetSyncState( lastPushBoundaryStateKey, ) if err != nil { @@ -119,7 +112,7 @@ func (s *Sync) Push( "pgsync: %s; clearing local push watermark state", reason, ) - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } lastPush = "" @@ -154,7 +147,7 @@ func (s *Sync) Push( // watermark and boundary state so the next // unfiltered push also starts from scratch. if s.isFiltered() && !pushStateCleared { - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } } @@ -185,7 +178,7 @@ func (s *Sync) Push( // watermark and boundary state so the next // unfiltered push also starts from scratch. if s.isFiltered() && !pushStateCleared { - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } } @@ -218,7 +211,7 @@ func (s *Sync) Push( if !full { var bErr error priorFingerprints, _, _, bErr = readBoundaryAndFingerprints( - s.local, lastPush, + state, lastPush, ) if bErr != nil { return result, bErr @@ -300,21 +293,21 @@ func (s *Sync) Push( boundaryKey = cutoff } if err := writePushBoundaryState( - s.local, boundaryKey, sessions, + state, boundaryKey, sessions, priorFingerprints, sessionFingerprints, ); err != nil { return result, err } } else { if err := finalizePushState( - s.local, cutoff, sessions, nil, + state, cutoff, sessions, nil, sessionFingerprints, ); err != nil { return result, err } } if err := persistPushTargetFingerprint( - s.local, s.targetFingerprint, + state, s.targetFingerprint, ); err != nil { return result, err } @@ -390,7 +383,7 @@ func (s *Sync) Push( boundaryKey = cutoff } if err := writePushBoundaryState( - s.local, boundaryKey, pushed, + state, boundaryKey, pushed, priorFingerprints, sessionFingerprints, ); err != nil { return result, err @@ -409,14 +402,14 @@ func (s *Sync) Push( mergedFingerprints = priorFingerprints } if err := finalizePushState( - s.local, finalizeCutoff, pushed, + state, finalizeCutoff, pushed, mergedFingerprints, sessionFingerprints, ); err != nil { return result, err } } if err := persistPushTargetFingerprint( - s.local, s.targetFingerprint, + state, s.targetFingerprint, ); err != nil { return result, err } @@ -583,7 +576,11 @@ func normalizePushMarkerMachineAliases( // name, so a machine rename keeps the same marker, and unique per local DB, so // a different host pushing to the same PG cannot mask this host's reset. func (s *Sync) pushMarkerID() (string, error) { - id, err := s.local.GetSyncState(pushMarkerIDStateKey) + state := s.local + if state == nil { + return "", fmt.Errorf("local db is required") + } + id, err := state.GetSyncState(pushMarkerIDStateKey) if err != nil { return "", fmt.Errorf("reading push marker id: %w", err) } @@ -595,7 +592,9 @@ func (s *Sync) pushMarkerID() (string, error) { return "", fmt.Errorf("generating push marker id: %w", err) } id = hex.EncodeToString(buf) - storedID, err := s.local.GetOrCreateSyncState(pushMarkerIDStateKey, id) + storedID, err := state.GetOrCreateSyncState( + pushMarkerIDStateKey, id, + ) if err != nil { return "", fmt.Errorf("persisting push marker id: %w", err) } @@ -2621,7 +2620,7 @@ func (s *Sync) normalizeSyncTimestamps( } s.schemaDone = true } - return NormalizeLocalSyncStateTimestamps(s.local) + return NormalizeLocalSyncStateTimestamps(s.effectiveSyncState()) } // sanitizePG strips null bytes and replaces invalid UTF-8 diff --git a/internal/postgres/push_test.go b/internal/postgres/push_test.go index e5921bb49..356768610 100644 --- a/internal/postgres/push_test.go +++ b/internal/postgres/push_test.go @@ -90,6 +90,41 @@ func TestPushMarkerIDReturnsInsertWinner(t *testing.T) { assert.Equal(t, "winner-marker", stored) } +func TestPushMarkerIDUsesUnscopedStateAcrossNamedTargets(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err, "db.Open") + defer local.Close() + + workSync := &Sync{ + local: local, + syncState: newScopedSyncStateStore(local, "work", true), + } + archiveSync := &Sync{ + local: local, + syncState: newScopedSyncStateStore(local, "archive", false), + } + + workMarker, err := workSync.pushMarkerID() + require.NoError(t, err, "work pushMarkerID") + archiveMarker, err := archiveSync.pushMarkerID() + require.NoError(t, err, "archive pushMarkerID") + + assert.Equal(t, workMarker, archiveMarker) + + stored, err := local.GetSyncState(pushMarkerIDStateKey) + require.NoError(t, err, "GetSyncState") + assert.Equal(t, workMarker, stored) + + for _, key := range []string{ + pushMarkerIDStateKey + ":work", + pushMarkerIDStateKey + ":archive", + } { + value, err := local.GetSyncState(key) + require.NoError(t, err, "GetSyncState %s", key) + assert.Empty(t, value) + } +} + func TestReadPushBoundaryStateValidity(t *testing.T) { const cutoff = "2026-03-11T12:34:56.123Z" diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 9f6241aa3..78368a9f5 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -11,6 +11,115 @@ import ( "go.kenn.io/agentsview/internal/db" ) +type syncStateStore = SyncStateStore + +type scopedSyncStateStore struct { + base syncStateStore + scope string + migrateLegacy bool + migrateOnce sync.Once + migrateErr error +} + +func newScopedSyncStateStore( + base syncStateStore, + scope string, + migrateLegacy bool, +) *scopedSyncStateStore { + return &scopedSyncStateStore{ + base: base, + scope: scope, + migrateLegacy: migrateLegacy, + } +} + +func (s *scopedSyncStateStore) scopedKey(key string) string { + if s.scope == "" { + return key + } + return key + ":" + s.scope +} + +func (s *scopedSyncStateStore) ensureMigration() error { + if s.scope == "" || !s.migrateLegacy { + return nil + } + s.migrateOnce.Do(func() { + for _, key := range []string{ + "last_push_at", + lastPushBoundaryStateKey, + lastPushTargetFingerprintKey, + } { + scopedKey := s.scopedKey(key) + scopedValue, err := s.base.GetSyncState(scopedKey) + if err != nil { + s.migrateErr = fmt.Errorf( + "reading %s during PG sync-state migration: %w", + scopedKey, err, + ) + return + } + legacyValue, err := s.base.GetSyncState(key) + if err != nil { + s.migrateErr = fmt.Errorf( + "reading legacy %s during PG sync-state migration: %w", + key, err, + ) + return + } + if legacyValue == "" { + continue + } + if scopedValue == "" { + if err := s.base.SetSyncState( + scopedKey, legacyValue, + ); err != nil { + s.migrateErr = fmt.Errorf( + "writing %s during PG sync-state migration: %w", + scopedKey, err, + ) + return + } + } + if err := s.base.SetSyncState(key, ""); err != nil { + s.migrateErr = fmt.Errorf( + "clearing legacy %s during PG sync-state migration: %w", + key, err, + ) + return + } + } + }) + return s.migrateErr +} + +func (s *scopedSyncStateStore) GetSyncState(key string) (string, error) { + if err := s.ensureMigration(); err != nil { + return "", err + } + return s.base.GetSyncState(s.scopedKey(key)) +} + +func (s *scopedSyncStateStore) SetSyncState( + key, value string, +) error { + if err := s.ensureMigration(); err != nil { + return err + } + return s.base.SetSyncState(s.scopedKey(key), value) +} + +func (s *scopedSyncStateStore) GetOrCreateSyncState( + key, defaultValue string, +) (string, error) { + if err := s.ensureMigration(); err != nil { + return "", err + } + return s.base.GetOrCreateSyncState( + s.scopedKey(key), defaultValue, + ) +} + // isUndefinedTable returns true when the error indicates the // queried relation does not exist (PG SQLSTATE 42P01). We match // only the SQLSTATE code to avoid false positives from other @@ -34,11 +143,14 @@ func isUndefinedColumn(err error) bool { // Sync manages push-only sync from local SQLite to a remote // PostgreSQL database. type Sync struct { - pg *sql.DB - local *db.DB - machine string - schema string - targetFingerprint string + pg *sql.DB + local *db.DB + syncState syncStateStore + machine string + schema string + targetFingerprint string + syncStateTarget string + migrateLegacySyncState bool // Project filtering for push scope. projects []string @@ -51,6 +163,13 @@ type Sync struct { schemaDone bool } +func (s *Sync) effectiveSyncState() syncStateStore { + if s.syncState != nil { + return s.syncState + } + return s.local +} + // SyncOptions holds optional configuration for a Sync instance. type SyncOptions struct { // Projects limits push scope to these project names. @@ -59,6 +178,11 @@ type SyncOptions struct { // ExcludeProjects excludes these project names from push. // Mutually exclusive with Projects. ExcludeProjects []string + // SyncStateTarget scopes per-target push watermarks and fingerprints. + SyncStateTarget string + // MigrateLegacySyncState moves unsuffixed legacy sync-state keys into the + // named default target the first time that target runs. + MigrateLegacySyncState bool } // New creates a Sync instance and verifies the PG connection. @@ -103,13 +227,20 @@ func New( } return &Sync{ - pg: pg, - local: local, - machine: machine, - schema: schema, - targetFingerprint: targetFingerprint, - projects: opts.Projects, - excludeProjects: opts.ExcludeProjects, + pg: pg, + local: local, + syncState: newScopedSyncStateStore( + local, + opts.SyncStateTarget, + opts.MigrateLegacySyncState, + ), + machine: machine, + schema: schema, + targetFingerprint: targetFingerprint, + syncStateTarget: opts.SyncStateTarget, + migrateLegacySyncState: opts.MigrateLegacySyncState, + projects: opts.Projects, + excludeProjects: opts.ExcludeProjects, }, nil } @@ -157,7 +288,9 @@ func (s *Sync) EnsureSchema(ctx context.Context) error { func (s *Sync) Status( ctx context.Context, ) (SyncStatus, error) { - lastPush, err := s.local.GetSyncState("last_push_at") + lastPush, err := ReadLastPushAt( + s.local, s.syncStateTarget, s.migrateLegacySyncState, + ) if err != nil { log.Printf( "warning: reading last_push_at: %v", err, @@ -206,6 +339,32 @@ func (s *Sync) Status( }, nil } +func ReadLastPushAt( + local SyncStateStore, + target string, + migrateLegacy bool, +) (string, error) { + if local == nil { + return "", fmt.Errorf("local sync state is required") + } + if target == "" { + return local.GetSyncState("last_push_at") + } + store := newScopedSyncStateStore( + local, + target, + false, + ) + lastPush, err := store.GetSyncState("last_push_at") + if err != nil { + return "", err + } + if lastPush != "" || !migrateLegacy { + return lastPush, nil + } + return local.GetSyncState("last_push_at") +} + // SyncStatus holds summary information about the sync state. type SyncStatus struct { Machine string `json:"machine"` diff --git a/internal/postgres/sync_test.go b/internal/postgres/sync_test.go index 54f6a0674..278a377b2 100644 --- a/internal/postgres/sync_test.go +++ b/internal/postgres/sync_test.go @@ -72,6 +72,171 @@ func TestEnsureSchemaIdempotent(t *testing.T) { } } +func TestScopedSyncStateStoreMigratesLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + require.NoError(t, local.SetSyncState( + lastPushBoundaryStateKey, + `{"cutoff":"2026-03-11T12:34:56.123Z"}`, + )) + require.NoError(t, local.SetSyncState( + lastPushTargetFingerprintKey, + "fingerprint-a", + )) + require.NoError(t, local.SetSyncState( + pushMarkerIDStateKey, + "marker-a", + )) + + store := newScopedSyncStateStore(local, "work", true) + + lastPush, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) + + for _, key := range []string{ + "last_push_at", + lastPushBoundaryStateKey, + lastPushTargetFingerprintKey, + } { + legacyValue, err := local.GetSyncState(key) + require.NoError(t, err) + assert.Empty(t, legacyValue) + + scopedValue, err := local.GetSyncState(key + ":work") + require.NoError(t, err) + assert.NotEmpty(t, scopedValue) + } + + legacyMarker, err := local.GetSyncState(pushMarkerIDStateKey) + require.NoError(t, err) + assert.Equal(t, "marker-a", legacyMarker) + + scopedMarker, err := local.GetSyncState( + pushMarkerIDStateKey + ":work", + ) + require.NoError(t, err) + assert.Empty(t, scopedMarker) +} + +func TestScopedSyncStateStoreNonDefaultTargetDoesNotMigrateLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + store := newScopedSyncStateStore(local, "archive", false) + + got, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Empty(t, got) + + legacyValue, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) +} + +func TestScopedSyncStateStoreLegacyModeUsesUnscopedKeys(t *testing.T) { + local := testDB(t) + store := newScopedSyncStateStore(local, "", false) + + require.NoError(t, store.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + got, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", got) +} + +func TestSyncEffectiveSyncStateFallsBackToLocalDB(t *testing.T) { + local := testDB(t) + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123456789Z", + )) + + sync := &Sync{local: local} + require.NoError(t, NormalizeLocalSyncStateTimestamps( + sync.effectiveSyncState(), + )) + + got, err := sync.effectiveSyncState().GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", got) +} + +func TestSyncScopedStateUsesTargetKeys(t *testing.T) { + pgURL := testPGURL(t) + cleanPGSchema(t, pgURL) + t.Cleanup(func() { cleanPGSchema(t, pgURL) }) + + local := testDB(t) + ps, err := New( + pgURL, "agentsview", local, + "test-machine", true, + SyncOptions{ + SyncStateTarget: "work", + MigrateLegacySyncState: true, + }, + ) + require.NoError(t, err, "creating sync") + defer ps.Close() + + ctx := context.Background() + require.NoError(t, ps.EnsureSchema(ctx), "ensure schema") + + started := "2026-03-11T12:00:00Z" + require.NoError(t, local.UpsertSession(db.Session{ + ID: "sess-scoped-001", + Project: "test-project", + Machine: "local", + Agent: "claude", + StartedAt: &started, + MessageCount: 1, + }), "upsert session") + require.NoError(t, local.InsertMessages([]db.Message{{ + SessionID: "sess-scoped-001", + Ordinal: 0, + Role: "user", + Content: "hello", + }}), "insert message") + + _, err = ps.Push(ctx, false, nil) + require.NoError(t, err, "push") + + scopedLastPush, err := local.GetSyncState("last_push_at:work") + require.NoError(t, err) + assert.NotEmpty(t, scopedLastPush) + + legacyLastPush, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Empty(t, legacyLastPush) + + scopedBoundary, err := local.GetSyncState( + lastPushBoundaryStateKey + ":work", + ) + require.NoError(t, err) + assert.NotEmpty(t, scopedBoundary) + + scopedFingerprint, err := local.GetSyncState( + lastPushTargetFingerprintKey + ":work", + ) + require.NoError(t, err) + assert.NotEmpty(t, scopedFingerprint) + + status, err := ps.Status(ctx) + require.NoError(t, err, "status") + assert.Equal(t, scopedLastPush, status.LastPushAt) +} + func TestEnsureSchemaMigratesLegacySchema(t *testing.T) { pgURL := testPGURL(t) cleanPGSchema(t, pgURL) diff --git a/internal/postgres/time.go b/internal/postgres/time.go index 19b16a548..a40e2e869 100644 --- a/internal/postgres/time.go +++ b/internal/postgres/time.go @@ -99,6 +99,7 @@ func PreviousLocalSyncTimestamp( type SyncStateStore interface { GetSyncState(key string) (string, error) SetSyncState(key, value string) error + GetOrCreateSyncState(key, defaultValue string) (string, error) } // NormalizeLocalSyncStateTimestamps normalizes the last_push_at diff --git a/internal/server/huma_routes_push.go b/internal/server/huma_routes_push.go index 42c7cf3f0..fc5fb8b61 100644 --- a/internal/server/huma_routes_push.go +++ b/internal/server/huma_routes_push.go @@ -28,11 +28,13 @@ type daemonPushInput struct { } type daemonPushRequest struct { - Full bool `json:"full"` - Projects []string `json:"projects,omitempty"` - ExcludeProjects []string `json:"exclude_projects,omitempty"` - PG *config.PGConfig `json:"pg,omitempty"` - DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + Full bool `json:"full"` + Projects []string `json:"projects,omitempty"` + ExcludeProjects []string `json:"exclude_projects,omitempty"` + PG *config.PGConfig `json:"pg,omitempty"` + DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + SyncStateTarget string `json:"sync_state_target,omitempty"` + MigrateLegacySyncState bool `json:"migrate_legacy_sync_state,omitempty"` } type pgPushOutput struct { @@ -94,8 +96,10 @@ func (s *Server) humaPGPush( pgCfg.URL, pgCfg.Schema, local, pgCfg.MachineName, pgCfg.AllowInsecure, postgres.SyncOptions{ - Projects: in.Body.Projects, - ExcludeProjects: in.Body.ExcludeProjects, + Projects: in.Body.Projects, + ExcludeProjects: in.Body.ExcludeProjects, + SyncStateTarget: in.Body.SyncStateTarget, + MigrateLegacySyncState: in.Body.MigrateLegacySyncState, }, ) if err != nil { From 45c7b98cf08706e936a8fff566bd70bbd289b7ae Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 24 Jun 2026 11:54:48 -0500 Subject: [PATCH 2/4] test(postgres): keep scoped sync-state tests untagged The sync-state migration checks do not require a live PostgreSQL server, so keeping them behind the pgtest tag leaves the default test suite without coverage for the copy-then-clear safety path. Move the SQLite-only tests and helper into the untagged sync unit test file, while keeping the PostgreSQL integration coverage in the pgtest file. Also align single-target pg push output with pg status so explicitly selected targets are visible in command output. --- cmd/agentsview/pg.go | 2 +- cmd/agentsview/pg_test.go | 6 +- internal/postgres/sync_test.go | 92 ---------------------------- internal/postgres/sync_unit_test.go | 95 +++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 94 deletions(-) diff --git a/cmd/agentsview/pg.go b/cmd/agentsview/pg.go index 5292f550f..fa537e90e 100644 --- a/cmd/agentsview/pg.go +++ b/cmd/agentsview/pg.go @@ -92,7 +92,7 @@ func runPGPush( var failures []string for i, target := range targets { - if len(targets) > 1 { + if len(targets) > 1 || target.Name != "" { if i > 0 { fmt.Println() } diff --git a/cmd/agentsview/pg_test.go b/cmd/agentsview/pg_test.go index fb91753e5..3c521e430 100644 --- a/cmd/agentsview/pg_test.go +++ b/cmd/agentsview/pg_test.go @@ -157,11 +157,15 @@ machine_name = "workbox" url = "postgres://archive" `) - err := runPGPush(PGPushConfig{}, "archive") + var err error + out := captureStdout(t, func() { + err = runPGPush(PGPushConfig{}, "archive") + }) require.Error(t, err) assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") + assert.Contains(t, out, "Target: archive") } func TestRunPGStatus_IgnoresBrokenUnselectedTarget(t *testing.T) { diff --git a/internal/postgres/sync_test.go b/internal/postgres/sync_test.go index 278a377b2..5f6ac34ca 100644 --- a/internal/postgres/sync_test.go +++ b/internal/postgres/sync_test.go @@ -16,14 +16,6 @@ import ( "go.kenn.io/agentsview/internal/db" ) -func testDB(t *testing.T) *db.DB { - t.Helper() - d, err := db.Open(t.TempDir() + "/test.db") - require.NoError(t, err, "opening test db") - t.Cleanup(func() { d.Close() }) - return d -} - func cleanPGSchema(t *testing.T, pgURL string) { t.Helper() pg, err := sql.Open("pgx", pgURL) @@ -72,90 +64,6 @@ func TestEnsureSchemaIdempotent(t *testing.T) { } } -func TestScopedSyncStateStoreMigratesLegacyState(t *testing.T) { - local := testDB(t) - - require.NoError(t, local.SetSyncState( - "last_push_at", - "2026-03-11T12:34:56.123Z", - )) - require.NoError(t, local.SetSyncState( - lastPushBoundaryStateKey, - `{"cutoff":"2026-03-11T12:34:56.123Z"}`, - )) - require.NoError(t, local.SetSyncState( - lastPushTargetFingerprintKey, - "fingerprint-a", - )) - require.NoError(t, local.SetSyncState( - pushMarkerIDStateKey, - "marker-a", - )) - - store := newScopedSyncStateStore(local, "work", true) - - lastPush, err := store.GetSyncState("last_push_at") - require.NoError(t, err) - assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) - - for _, key := range []string{ - "last_push_at", - lastPushBoundaryStateKey, - lastPushTargetFingerprintKey, - } { - legacyValue, err := local.GetSyncState(key) - require.NoError(t, err) - assert.Empty(t, legacyValue) - - scopedValue, err := local.GetSyncState(key + ":work") - require.NoError(t, err) - assert.NotEmpty(t, scopedValue) - } - - legacyMarker, err := local.GetSyncState(pushMarkerIDStateKey) - require.NoError(t, err) - assert.Equal(t, "marker-a", legacyMarker) - - scopedMarker, err := local.GetSyncState( - pushMarkerIDStateKey + ":work", - ) - require.NoError(t, err) - assert.Empty(t, scopedMarker) -} - -func TestScopedSyncStateStoreNonDefaultTargetDoesNotMigrateLegacyState(t *testing.T) { - local := testDB(t) - - require.NoError(t, local.SetSyncState( - "last_push_at", - "2026-03-11T12:34:56.123Z", - )) - - store := newScopedSyncStateStore(local, "archive", false) - - got, err := store.GetSyncState("last_push_at") - require.NoError(t, err) - assert.Empty(t, got) - - legacyValue, err := local.GetSyncState("last_push_at") - require.NoError(t, err) - assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) -} - -func TestScopedSyncStateStoreLegacyModeUsesUnscopedKeys(t *testing.T) { - local := testDB(t) - store := newScopedSyncStateStore(local, "", false) - - require.NoError(t, store.SetSyncState( - "last_push_at", - "2026-03-11T12:34:56.123Z", - )) - - got, err := local.GetSyncState("last_push_at") - require.NoError(t, err) - assert.Equal(t, "2026-03-11T12:34:56.123Z", got) -} - func TestSyncEffectiveSyncStateFallsBackToLocalDB(t *testing.T) { local := testDB(t) require.NoError(t, local.SetSyncState( diff --git a/internal/postgres/sync_unit_test.go b/internal/postgres/sync_unit_test.go index ef21d7454..6f65408ef 100644 --- a/internal/postgres/sync_unit_test.go +++ b/internal/postgres/sync_unit_test.go @@ -5,8 +5,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.kenn.io/agentsview/internal/db" ) +func testDB(t *testing.T) *db.DB { + t.Helper() + d, err := db.Open(t.TempDir() + "/test.db") + require.NoError(t, err, "opening test db") + t.Cleanup(func() { d.Close() }) + return d +} + func TestIsUndefinedTable(t *testing.T) { tests := []struct { name string @@ -46,3 +57,87 @@ func TestIsUndefinedTable(t *testing.T) { }) } } + +func TestScopedSyncStateStoreMigratesLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + require.NoError(t, local.SetSyncState( + lastPushBoundaryStateKey, + `{"cutoff":"2026-03-11T12:34:56.123Z"}`, + )) + require.NoError(t, local.SetSyncState( + lastPushTargetFingerprintKey, + "fingerprint-a", + )) + require.NoError(t, local.SetSyncState( + pushMarkerIDStateKey, + "marker-a", + )) + + store := newScopedSyncStateStore(local, "work", true) + + lastPush, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) + + for _, key := range []string{ + "last_push_at", + lastPushBoundaryStateKey, + lastPushTargetFingerprintKey, + } { + legacyValue, err := local.GetSyncState(key) + require.NoError(t, err) + assert.Empty(t, legacyValue) + + scopedValue, err := local.GetSyncState(key + ":work") + require.NoError(t, err) + assert.NotEmpty(t, scopedValue) + } + + legacyMarker, err := local.GetSyncState(pushMarkerIDStateKey) + require.NoError(t, err) + assert.Equal(t, "marker-a", legacyMarker) + + scopedMarker, err := local.GetSyncState( + pushMarkerIDStateKey + ":work", + ) + require.NoError(t, err) + assert.Empty(t, scopedMarker) +} + +func TestScopedSyncStateStoreNonDefaultTargetDoesNotMigrateLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + store := newScopedSyncStateStore(local, "archive", false) + + got, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Empty(t, got) + + legacyValue, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) +} + +func TestScopedSyncStateStoreLegacyModeUsesUnscopedKeys(t *testing.T) { + local := testDB(t) + store := newScopedSyncStateStore(local, "", false) + + require.NoError(t, store.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + got, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", got) +} From 7c73a889f9a1985d9d6d569181453537b55385dc Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 24 Jun 2026 12:20:00 -0500 Subject: [PATCH 3/4] fix(postgres): keep pg status read-only PostgreSQL status should not create or migrate the local SQLite archive just to display PG counts. That read path only needs SQLite for an optional last-push watermark, so missing local state should produce an empty watermark rather than a writable open. The PG push validation tests also disable daemon autostart because they are asserting config and connection validation, not daemon lifecycle. This avoids Windows cleanup failures from background test processes holding the write-owner lock. --- cmd/agentsview/pg.go | 44 ++++++++++++++++++++++++------------ cmd/agentsview/pg_test.go | 4 ++++ internal/postgres/sync.go | 47 ++++++++++++++++++++++++++++++++++----- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/cmd/agentsview/pg.go b/cmd/agentsview/pg.go index fa537e90e..de6f52854 100644 --- a/cmd/agentsview/pg.go +++ b/cmd/agentsview/pg.go @@ -223,11 +223,16 @@ func runPGStatus( } applyClassifierConfig(appCfg) - database, err := openDB(appCfg) + database, err := openReadOnlyDB(appCfg) if err != nil { - return fmt.Errorf("opening database: %w", err) + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("opening database: %w", err) + } + database = nil + } + if database != nil { + defer database.Close() } - defer database.Close() var failures []string for i, target := range targets { @@ -275,22 +280,33 @@ func runPGStatusTarget( return fmt.Errorf("url not configured") } - ps, err := postgres.New( - target.PG.URL, target.PG.Schema, database, - target.PG.MachineName, target.PG.AllowInsecure, - target.syncOptions(nil, nil), - ) - if err != nil { - return err - } - defer ps.Close() - ctx, stop := signal.NotifyContext( context.Background(), os.Interrupt, ) defer stop() - status, err := ps.Status(ctx) + lastPush := "" + if database != nil { + lastPush, err = postgres.ReadLastPushAt( + database, + target.SyncStateTarget, + target.MigrateLegacySyncState, + ) + if err != nil { + log.Printf( + "warning: reading last_push_at: %v", err, + ) + lastPush = "" + } + } + status, err := postgres.ReadStatus( + ctx, + target.PG.URL, + target.PG.Schema, + target.PG.MachineName, + target.PG.AllowInsecure, + lastPush, + ) if err != nil { return err } diff --git a/cmd/agentsview/pg_test.go b/cmd/agentsview/pg_test.go index 3c521e430..b6efb0ded 100644 --- a/cmd/agentsview/pg_test.go +++ b/cmd/agentsview/pg_test.go @@ -142,6 +142,7 @@ func TestPGServeConfigAcceptsManagedCaddyFlags(t *testing.T) { func TestRunPGPush_IgnoresBrokenUnselectedTarget(t *testing.T) { dataDir := t.TempDir() t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") clearConfiguredAgentEnvVars(t) isolateDefaultAgentDirs(t, dataDir) restoreTestLogger(t) @@ -191,11 +192,13 @@ url = "postgres://archive" assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") + assert.NoFileExists(t, filepath.Join(dataDir, "sessions.db")) } func TestRunPGPushAll_AggregatesTargetFailures(t *testing.T) { dataDir := t.TempDir() t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") clearConfiguredAgentEnvVars(t) isolateDefaultAgentDirs(t, dataDir) restoreTestLogger(t) @@ -248,6 +251,7 @@ url = "postgres://archive" func TestPGPushCommandPrefixesErrors(t *testing.T) { dataDir := t.TempDir() t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") clearConfiguredAgentEnvVars(t) isolateDefaultAgentDirs(t, dataDir) restoreTestLogger(t) diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 78368a9f5..86304d11d 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -298,14 +298,51 @@ func (s *Sync) Status( lastPush = "" } + return readStatus(ctx, s.pg, s.machine, lastPush) +} + +// ReadStatus reads PostgreSQL status without requiring a local SQLite sync +// handle. Callers pass any local last-push watermark they want displayed. +func ReadStatus( + ctx context.Context, + pgURL, schema, machine string, + allowInsecure bool, + lastPush string, +) (SyncStatus, error) { + if machine == "" { + return SyncStatus{}, fmt.Errorf( + "machine name must not be empty", + ) + } + if machine == "local" { + return SyncStatus{}, fmt.Errorf( + "machine name %q is reserved; "+ + "choose a different pg.machine_name", + machine, + ) + } + pg, err := Open(pgURL, schema, allowInsecure) + if err != nil { + return SyncStatus{}, err + } + defer pg.Close() + return readStatus(ctx, pg, machine, lastPush) +} + +func readStatus( + ctx context.Context, + pg *sql.DB, + machine string, + lastPush string, +) (SyncStatus, error) { var pgSessions int - err = s.pg.QueryRowContext(ctx, + err := pg.QueryRowContext(ctx, "SELECT COUNT(*) FROM sessions", ).Scan(&pgSessions) if err != nil { if isUndefinedTable(err) { return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, }, nil } @@ -315,13 +352,13 @@ func (s *Sync) Status( } var pgMessages int - err = s.pg.QueryRowContext(ctx, + err = pg.QueryRowContext(ctx, "SELECT COUNT(*) FROM messages", ).Scan(&pgMessages) if err != nil { if isUndefinedTable(err) { return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, PGSessions: pgSessions, }, nil @@ -332,7 +369,7 @@ func (s *Sync) Status( } return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, PGSessions: pgSessions, PGMessages: pgMessages, From 94357a94ba7d259f400194191d84ea1c6e64c5a1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 24 Jun 2026 12:27:19 -0500 Subject: [PATCH 4/4] fix(postgres): tolerate missing pg status watermark PostgreSQL status treats local sync state as display-only. A stale, empty, or otherwise unreadable SQLite archive should not block PG connection checks or PG session/message counts. Keep local watermark reads best-effort and covered with an empty-archive regression so upgrade or partial-state scenarios still show PostgreSQL status. --- cmd/agentsview/pg.go | 7 ++++--- cmd/agentsview/pg_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/cmd/agentsview/pg.go b/cmd/agentsview/pg.go index de6f52854..2bf2197f8 100644 --- a/cmd/agentsview/pg.go +++ b/cmd/agentsview/pg.go @@ -225,9 +225,10 @@ func runPGStatus( applyClassifierConfig(appCfg) database, err := openReadOnlyDB(appCfg) if err != nil { - if !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("opening database: %w", err) - } + log.Printf( + "warning: reading local pg status watermark: %v", + err, + ) database = nil } if database != nil { diff --git a/cmd/agentsview/pg_test.go b/cmd/agentsview/pg_test.go index b6efb0ded..9e6a3a2ae 100644 --- a/cmd/agentsview/pg_test.go +++ b/cmd/agentsview/pg_test.go @@ -195,6 +195,32 @@ url = "postgres://archive" assert.NoFileExists(t, filepath.Join(dataDir, "sessions.db")) } +func TestRunPGStatus_IgnoresUnreadableLocalWatermark(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + require.NoError(t, os.WriteFile( + filepath.Join(dataDir, "sessions.db"), + nil, + 0o600, + )) + + err := runPGStatus("archive", false) + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "opening database") + assert.NotContains(t, err.Error(), "sessions.db is empty") +} + func TestRunPGPushAll_AggregatesTargetFailures(t *testing.T) { dataDir := t.TempDir() t.Setenv("AGENTSVIEW_DATA_DIR", dataDir)