diff --git a/README.md b/README.md index d32cf46c1..ca81b6498 100644 --- a/README.md +++ b/README.md @@ -384,10 +384,40 @@ or read them, and treats sidecars as untrusted structured input -- see Push session data to a shared PostgreSQL instance for team dashboards: ```bash -agentsview pg push # push local data to PG -agentsview pg serve # serve web UI from PG (read-only) +agentsview pg push # push local data to the default PG target +agentsview pg push archive # push to one named PG target +agentsview pg push --all # push every configured PG target sequentially +agentsview pg status # show status for the default PG target +agentsview pg status archive # show status for one named PG target +agentsview pg status --all # show status for every configured PG target +agentsview pg serve # serve web UI from the default PG target (read-only) ``` +Single-target configs still use the legacy `[pg]` block. To manage more than one +PostgreSQL destination, define named `[pg.NAME]` blocks and set `default_pg` +when more than one target exists: + +```toml +default_pg = "work" + +[pg.work] +url = "postgres://user:pass@work-db/agentsview" +machine_name = "laptop" + +[pg.archive] +url = "postgres://user:pass@archive-db/agentsview" +machine_name = "laptop-archive" +exclude_projects = ["scratch"] +``` + +Named target names are normalized case-insensitively. `all`, `local`, and the +legacy `[pg]` field names `url`, `schema`, `machine_name`, `allow_insecure`, +`projects`, and `exclude_projects` cannot be used for `[pg.NAME]`. + +`AGENTSVIEW_PG_URL`, `AGENTSVIEW_PG_SCHEMA`, and `AGENTSVIEW_PG_MACHINE` still +work, but in named-target mode they apply only to the effective default target. +They do not rewrite every named `[pg.NAME]` entry. + ### Automatic push (background service) To keep a shared PostgreSQL database current without running `pg push` by hand, @@ -396,10 +426,15 @@ after new sessions are recorded, with a periodic floor as a safety net: ```bash agentsview pg push --watch # foreground, Ctrl-C to stop +agentsview pg push archive --watch # watch one named PG target agentsview pg push --watch --debounce 1m # custom coalesce window agentsview pg push --watch --interval 5m # custom floor interval ``` +`--watch` follows the default PG target unless you pass one target name. +`--all --watch` is rejected; multi-target background watch remains out of scope +for now. + The daemon reads the same `[pg]` config as `pg push`, so the PostgreSQL DSN must be set in your config file (or an environment variable it expands). Protect the config file, since it holds credentials: @@ -418,6 +453,10 @@ agentsview pg service logs -f # follow the service log agentsview pg service uninstall # stop and remove ``` +`pg serve` and `pg service` always use the effective default PG target. In +named-target mode, set `default_pg` to choose which target those long-running +commands use. + **Linux headless machines:** systemd `--user` services stop at logout and do not start at boot unless lingering is enabled for your user. `install` detects this and prints the command; you can also run it yourself: diff --git a/cmd/agentsview/archive_write_backend.go b/cmd/agentsview/archive_write_backend.go index 148440263..8595fc19d 100644 --- a/cmd/agentsview/archive_write_backend.go +++ b/cmd/agentsview/archive_write_backend.go @@ -19,7 +19,7 @@ import ( type archiveWriteBackend interface { PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -33,7 +33,7 @@ type archiveWriteBackend interface { ) (duckdbsync.PushResult, error) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -87,7 +87,7 @@ type daemonArchiveWriteBackend struct { func (b daemonArchiveWriteBackend) PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -95,10 +95,12 @@ func (b daemonArchiveWriteBackend) PGPush( return postDaemonPush[postgres.PushResult]( ctx, b.tr, b.appCfg.AuthToken, "/api/v1/push/pg", daemonPushRequest{ - Full: cfg.Full, - Projects: projects, - ExcludeProjects: excludeProjects, - PG: &pgCfg, + Full: cfg.Full, + Projects: projects, + ExcludeProjects: excludeProjects, + PG: &target.PG, + SyncStateTarget: target.SyncStateTarget, + MigrateLegacySyncState: target.MigrateLegacySyncState, }, ) } @@ -141,7 +143,7 @@ func absolutizeDuckDBPath( func (b daemonArchiveWriteBackend) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, exclude []string, @@ -170,7 +172,7 @@ func (b daemonArchiveWriteBackend) PGPushWatch( } defer cleanup() res, err := backend.PGPush( - pctx, pgCfg, pushCfg, projects, exclude, + pctx, target, pushCfg, projects, exclude, ) if err != nil { return err @@ -213,7 +215,7 @@ type localArchiveWriteBackend struct { func (b *localArchiveWriteBackend) PGPush( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, excludeProjects []string, @@ -228,12 +230,9 @@ func (b *localArchiveWriteBackend) PGPush( connectStart := time.Now() applyClassifierConfig(b.appCfg) ps, err := postgres.New( - pgCfg.URL, pgCfg.Schema, b.database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{ - Projects: projects, - ExcludeProjects: excludeProjects, - }, + target.PG.URL, target.PG.Schema, b.database, + target.PG.MachineName, target.PG.AllowInsecure, + target.syncOptions(projects, excludeProjects), ) if err != nil { return postgres.PushResult{}, err @@ -324,7 +323,7 @@ func (b *localArchiveWriteBackend) DuckDBPush( func (b *localArchiveWriteBackend) PGPushWatch( ctx context.Context, - pgCfg config.PGConfig, + target pgTargetSelection, cfg PGPushConfig, projects []string, exclude []string, @@ -367,12 +366,9 @@ func (b *localArchiveWriteBackend) PGPushWatch( connect: func() (pgTarget, error) { applyClassifierConfig(b.appCfg) s, cErr := postgres.New( - pgCfg.URL, pgCfg.Schema, b.database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{ - Projects: projects, - ExcludeProjects: exclude, - }, + target.PG.URL, target.PG.Schema, b.database, + target.PG.MachineName, target.PG.AllowInsecure, + target.syncOptions(projects, exclude), ) if cErr != nil { return nil, cErr @@ -385,7 +381,7 @@ func (b *localArchiveWriteBackend) PGPushWatch( fmt.Printf( "agentsview pg watch: pushing to PostgreSQL as %q "+ "(debounce %s, floor %s)\n", - pgCfg.MachineName, debounce, interval, + target.PG.MachineName, debounce, interval, ) if err := pusher.push(ctx, reasonStartup, didResync); err != nil { diff --git a/cmd/agentsview/archive_write_backend_test.go b/cmd/agentsview/archive_write_backend_test.go index 46259fcfc..ef39b471d 100644 --- a/cmd/agentsview/archive_write_backend_test.go +++ b/cmd/agentsview/archive_write_backend_test.go @@ -18,7 +18,7 @@ func TestLocalArchiveWriteBackendPGPushStopsAfterCanceledLocalSync(t *testing.T) testLocalArchivePushStopsAfterCanceledSync(t, func(backend *localArchiveWriteBackend, ctx context.Context) error { _, err := backend.PGPush( - ctx, config.PGConfig{}, PGPushConfig{}, nil, nil, + ctx, pgTargetSelection{}, PGPushConfig{}, nil, nil, ) return err }) @@ -58,7 +58,7 @@ func TestLocalArchiveWriteBackendPGPushWatchCanceledStartupIsClean(t *testing.T) err := backend.PGPushWatch( canceledContext(), - config.PGConfig{}, + pgTargetSelection{}, PGPushConfig{}, nil, nil, diff --git a/cmd/agentsview/classifier.go b/cmd/agentsview/classifier.go index 9bbb3ce70..e80397ea6 100644 --- a/cmd/agentsview/classifier.go +++ b/cmd/agentsview/classifier.go @@ -137,7 +137,7 @@ func runClassifierRebuild( } if pgCfg.URL == "" { return errors.New( - "pg url not configured; set AGENTSVIEW_PG_URL or [pg].url", + "pg url not configured; set AGENTSVIEW_PG_URL, use a legacy [pg].url, or configure default_pg with named [pg.NAME] targets", ) } if err := clearPGClassifierHash(ctx, cfg, pgCfg); err != nil { diff --git a/cmd/agentsview/classifier_wiring_test.go b/cmd/agentsview/classifier_wiring_test.go index 655941771..e6bcf11e7 100644 --- a/cmd/agentsview/classifier_wiring_test.go +++ b/cmd/agentsview/classifier_wiring_test.go @@ -32,6 +32,11 @@ var triggerCalls = map[string]struct{}{ const wiringHelper = "applyClassifierConfig" +var inheritedWiringFuncs = map[string]struct{}{ + "runPGPushTarget": {}, + "runPGStatusTarget": {}, +} + // TestEveryStoreOpenPathIsWired enforces the rule documented // in the design spec: every code path in cmd/agentsview that // opens or initializes a store must first call @@ -83,6 +88,9 @@ func scanFile( if fn.Body == nil { return true } + if _, ok := inheritedWiringFuncs[fn.Name.Name]; ok { + return true + } if v := checkBody( fset, fn.Body, funcLabel(fset, fn), ); v != "" { diff --git a/cmd/agentsview/cli.go b/cmd/agentsview/cli.go index 869f80442..b26f36c4d 100644 --- a/cmd/agentsview/cli.go +++ b/cmd/agentsview/cli.go @@ -531,22 +531,40 @@ func newPGCommand() *cobra.Command { func newPGPushCommand() *cobra.Command { var cfg PGPushConfig cmd := &cobra.Command{ - Use: "push", + Use: "push [target]", Short: "Push local data to PostgreSQL", SilenceUsage: true, - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + targetName := "" + if len(args) == 1 { + targetName = args[0] + } + if cfg.AllTargets && cfg.Watch { + return fmt.Errorf( + "pg push --watch: %w", + fmt.Errorf( + "--all cannot be combined with --watch", + ), + ) + } if cfg.Watch { - runPGPushWatch(cfg) - return + if err := runPGPushWatch(cfg, targetName); err != nil { + return fmt.Errorf("pg push --watch: %w", err) + } + return nil } if cmd.Flags().Changed("debounce") || cmd.Flags().Changed("interval") { fmt.Fprintln(os.Stderr, "warning: --debounce and --interval have no effect without --watch") } - runPGPush(cfg) + if err := runPGPush(cfg, targetName); err != nil { + return fmt.Errorf("pg push: %w", err) + } + return nil }, } + cmd.Flags().BoolVar(&cfg.AllTargets, "all", false, "Push every configured PG target sequentially") cmd.Flags().BoolVar(&cfg.Full, "full", false, "Force full local resync and PG push") cmd.Flags().StringVar(&cfg.ProjectsFlag, "projects", "", "Comma-separated list of projects to push (inclusive)") cmd.Flags().StringVar(&cfg.ExcludeProjects, "exclude-projects", "", "Comma-separated list of projects to exclude from push") @@ -558,15 +576,25 @@ func newPGPushCommand() *cobra.Command { } func newPGStatusCommand() *cobra.Command { - return &cobra.Command{ - Use: "status", + var allTargets bool + cmd := &cobra.Command{ + Use: "status [target]", Short: "Show PG sync status", SilenceUsage: true, - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - runPGStatus() + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + targetName := "" + if len(args) == 1 { + targetName = args[0] + } + if err := runPGStatus(targetName, allTargets); err != nil { + return fmt.Errorf("pg status: %w", err) + } + return nil }, } + cmd.Flags().BoolVar(&allTargets, "all", false, "Show status for every configured PG target") + return cmd } func newPGServeCommand() *cobra.Command { diff --git a/cmd/agentsview/daemon_push.go b/cmd/agentsview/daemon_push.go index 97924730a..b80c6c892 100644 --- a/cmd/agentsview/daemon_push.go +++ b/cmd/agentsview/daemon_push.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -13,11 +14,13 @@ import ( ) type daemonPushRequest struct { - Full bool `json:"full"` - Projects []string `json:"projects,omitempty"` - ExcludeProjects []string `json:"exclude_projects,omitempty"` - PG *config.PGConfig `json:"pg,omitempty"` - DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + Full bool `json:"full"` + Projects []string `json:"projects,omitempty"` + ExcludeProjects []string `json:"exclude_projects,omitempty"` + PG *config.PGConfig `json:"pg,omitempty"` + DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + SyncStateTarget string `json:"sync_state_target,omitempty"` + MigrateLegacySyncState bool `json:"migrate_legacy_sync_state,omitempty"` } func postDaemonPush[T any]( @@ -51,6 +54,13 @@ func postDaemonPush[T any]( defer resp.Body.Close() if resp.StatusCode != http.StatusOK { msg, _ := io.ReadAll(resp.Body) + var apiErr struct { + Error string `json:"error"` + } + if err := json.Unmarshal(msg, &apiErr); err == nil && + apiErr.Error != "" { + return zero, errors.New(apiErr.Error) + } return zero, fmt.Errorf( "HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(msg)), ) diff --git a/cmd/agentsview/pg.go b/cmd/agentsview/pg.go index dadec4536..2bf2197f8 100644 --- a/cmd/agentsview/pg.go +++ b/cmd/agentsview/pg.go @@ -14,12 +14,14 @@ import ( "github.com/spf13/cobra" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/db" "go.kenn.io/agentsview/internal/postgres" "go.kenn.io/agentsview/internal/server" ) type PGPushConfig struct { Full bool + AllTargets bool ProjectsFlag string ExcludeProjects string AllProjects bool @@ -28,27 +30,52 @@ type PGPushConfig struct { Interval time.Duration } -func runPGPush(cfg PGPushConfig) { - appCfg, err := config.LoadMinimal() - if err != nil { - log.Fatalf("loading config: %v", err) +type pgTargetSelection struct { + Name string + PG config.PGConfig + IsDefault bool + SyncStateTarget string + MigrateLegacySyncState bool +} + +func (s pgTargetSelection) label() string { + if s.Name == "" { + return "default" } - if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + if s.IsDefault { + return s.Name + " (default)" } - setupLogFile(appCfg.DataDir) + return s.Name +} - pgCfg, err := appCfg.ResolvePG() +func (s pgTargetSelection) syncOptions( + projects, excludeProjects []string, +) postgres.SyncOptions { + return postgres.SyncOptions{ + Projects: projects, + ExcludeProjects: excludeProjects, + SyncStateTarget: s.SyncStateTarget, + MigrateLegacySyncState: s.MigrateLegacySyncState, + } +} + +func runPGPush( + cfg PGPushConfig, targetName string, +) error { + appCfg, err := config.LoadMinimal() if err != nil { - fatal("pg push: %v", err) + return fmt.Errorf("loading config: %w", err) } - if pgCfg.URL == "" { - fatal("pg push: url not configured") + if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { + return fmt.Errorf("creating data dir: %w", err) } + setupLogFile(appCfg.DataDir) - projects, excludeProjects, err := resolvePushProjects(pgCfg, cfg) + targets, err := resolvePGTargetSelections( + appCfg, targetName, cfg.AllTargets, + ) if err != nil { - fatal("pg push: %v", err) + return err } applyClassifierConfig(appCfg) @@ -59,21 +86,78 @@ func runPGPush(cfg PGPushConfig) { backend, cleanup, err := resolveArchiveWriteBackend(ctx, appCfg) if err != nil { - fatal("opening writer: %v", err) + return fmt.Errorf("opening writer: %w", err) } defer cleanup() + var failures []string + for i, target := range targets { + if len(targets) > 1 || target.Name != "" { + if i > 0 { + fmt.Println() + } + fmt.Printf("Target: %s\n", target.label()) + } + if err := runPGPushTarget( + ctx, backend, appCfg, cfg, target, + ); err != nil { + if len(targets) == 1 { + return err + } + failures = append( + failures, + fmt.Sprintf("%s: %v", target.label(), err), + ) + fmt.Fprintf( + os.Stderr, + "warning: pg push target %s failed: %v\n", + target.label(), err, + ) + } + } + if len(failures) > 0 { + return fmt.Errorf( + "%d pg target(s) failed: %s", + len(failures), + strings.Join(failures, "; "), + ) + } + return nil +} + +func runPGPushTarget( + ctx context.Context, + backend archiveWriteBackend, + appCfg config.Config, + cfg PGPushConfig, + target pgTargetSelection, +) error { + target, err := resolvePGTargetConfig(appCfg, target) + if err != nil { + return err + } + if target.PG.URL == "" { + return fmt.Errorf("url not configured") + } + + projects, excludeProjects, err := resolvePushProjects( + target.PG, cfg, + ) + if err != nil { + return err + } + result, err := backend.PGPush( - ctx, pgCfg, cfg, projects, excludeProjects, + ctx, target, cfg, projects, excludeProjects, ) if err != nil { - fatal("pg push: %v", err) + return err } writePGPushSummary(os.Stdout, result) if result.Errors > 0 { - fatal("pg push: %d session(s) failed", - result.Errors) + return fmt.Errorf("%d session(s) failed", result.Errors) } + return nil } func printPGPushProgress(p postgres.PushProgress) { @@ -118,55 +202,121 @@ func writePGPushSummary(w io.Writer, result postgres.PushResult) { ) } -func runPGStatus() { +func runPGStatus( + targetName string, + allTargets bool, +) error { appCfg, err := config.LoadMinimal() if err != nil { - log.Fatalf("loading config: %v", err) + return fmt.Errorf("loading config: %w", err) } if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + return fmt.Errorf("creating data dir: %w", err) } setupLogFile(appCfg.DataDir) + targets, err := resolvePGTargetSelections( + appCfg, targetName, allTargets, + ) + if err != nil { + return err + } + applyClassifierConfig(appCfg) database, err := openReadOnlyDB(appCfg) if err != nil { - fatal("opening database: %v", err) + log.Printf( + "warning: reading local pg status watermark: %v", + err, + ) + database = nil + } + if database != nil { + defer database.Close() } - defer database.Close() - pgCfg, err := appCfg.ResolvePG() - if err != nil { - fatal("pg status: %v", err) + var failures []string + for i, target := range targets { + if len(targets) > 1 || target.Name != "" { + if i > 0 { + fmt.Println() + } + fmt.Printf("Target: %s\n", target.label()) + } + if err := runPGStatusTarget(database, appCfg, target); err != nil { + if len(targets) == 1 { + return err + } + failures = append( + failures, + fmt.Sprintf("%s: %v", target.label(), err), + ) + fmt.Fprintf( + os.Stderr, + "warning: pg status target %s failed: %v\n", + target.label(), err, + ) + } } - if pgCfg.URL == "" { - fatal("pg status: url not configured") + if len(failures) > 0 { + return fmt.Errorf( + "%d pg target(s) failed: %s", + len(failures), + strings.Join(failures, "; "), + ) } + return nil +} - ps, err := postgres.New( - pgCfg.URL, pgCfg.Schema, database, - pgCfg.MachineName, pgCfg.AllowInsecure, - postgres.SyncOptions{}, - ) +func runPGStatusTarget( + database *db.DB, + appCfg config.Config, + target pgTargetSelection, +) error { + target, err := resolvePGTargetConfig(appCfg, target) if err != nil { - fatal("pg status: %v", err) + return err + } + if target.PG.URL == "" { + return fmt.Errorf("url not configured") } - defer ps.Close() ctx, stop := signal.NotifyContext( context.Background(), os.Interrupt, ) defer stop() - status, err := ps.Status(ctx) + lastPush := "" + if database != nil { + lastPush, err = postgres.ReadLastPushAt( + database, + target.SyncStateTarget, + target.MigrateLegacySyncState, + ) + if err != nil { + log.Printf( + "warning: reading last_push_at: %v", err, + ) + lastPush = "" + } + } + status, err := postgres.ReadStatus( + ctx, + target.PG.URL, + target.PG.Schema, + target.PG.MachineName, + target.PG.AllowInsecure, + lastPush, + ) if err != nil { - fatal("pg status: %v", err) + return err } fmt.Printf("Machine: %s\n", status.Machine) fmt.Printf("Last push: %s\n", valueOrNever(status.LastPushAt)) fmt.Printf("PG sessions: %d\n", status.PGSessions) fmt.Printf("PG messages: %d\n", status.PGMessages) + return nil } func loadPGServeConfig(cmd *cobra.Command) (config.Config, string, error) { @@ -186,7 +336,6 @@ func loadPGServeConfig(cmd *cobra.Command) (config.Config, string, error) { func runPGServe(appCfg config.Config, basePath string) { setupLogFile(appCfg.DataDir) - // Generate auth token when auth is explicitly required. if appCfg.RequireAuth { if err := appCfg.EnsureAuthToken(); err != nil { fatal("pg serve: generating auth token: %v", err) @@ -224,12 +373,6 @@ func runPGServe(appCfg config.Config, basePath string) { ) defer stop() - // Attempt to apply any missing schema migrations before - // the compatibility check. This handles upgrades (e.g. - // new tables like tool_result_events) without requiring a - // manual schema drop. If the PG role is read-only the - // migration is skipped and the compat check reports what - // is missing. if err := postgres.EnsureSchema( ctx, store.DB(), pgCfg.Schema, ); err != nil { @@ -328,10 +471,6 @@ func runPGServe(appCfg config.Config, basePath string) { } } -// resolvePushProjects merges configured project filters with CLI -// flag overrides. A CLI include or exclude flag fully replaces the -// configured lists; --all-projects clears both. Include and exclude -// are mutually exclusive. func resolvePushProjects( pgCfg config.PGConfig, cfg PGPushConfig, ) (projects, exclude []string, err error) { @@ -369,8 +508,81 @@ func resolvePushProjects( return projects, exclude, nil } -// splitProjectList splits a comma-separated string into trimmed, -// non-empty project names. +func resolvePGTargetSelections( + appCfg config.Config, + targetName string, + allTargets bool, +) ([]pgTargetSelection, error) { + if allTargets && strings.TrimSpace(targetName) != "" { + return nil, fmt.Errorf( + "target name cannot be combined with --all", + ) + } + if len(appCfg.PGTargets) == 0 { + if strings.TrimSpace(targetName) != "" { + return nil, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + targetName, + ) + } + return []pgTargetSelection{{ + IsDefault: true, + }}, nil + } + names, defaultName, err := appCfg.PGTargetNames() + if err != nil { + return nil, err + } + selections := make([]pgTargetSelection, 0, len(names)) + for _, name := range names { + selection := pgTargetSelection{ + Name: name, + IsDefault: name == defaultName, + SyncStateTarget: name, + MigrateLegacySyncState: name == defaultName, + } + selections = append(selections, selection) + } + if allTargets { + return selections, nil + } + normalizedTarget := strings.TrimSpace( + strings.ToLower(targetName), + ) + if normalizedTarget == "" { + return selections[:1], nil + } + for _, target := range selections { + if target.Name == normalizedTarget { + return []pgTargetSelection{target}, nil + } + } + return nil, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) +} + +func resolvePGTargetConfig( + appCfg config.Config, + target pgTargetSelection, +) (pgTargetSelection, error) { + var ( + pgCfg config.PGConfig + err error + ) + if target.Name == "" { + pgCfg, err = appCfg.ResolvePG() + } else { + pgCfg, err = appCfg.ResolvePGTarget(target.Name) + } + if err != nil { + return pgTargetSelection{}, err + } + target.PG = pgCfg + return target, nil +} + func splitProjectList(s string) []string { parts := strings.Split(s, ",") out := make([]string, 0, len(parts)) diff --git a/cmd/agentsview/pg_service.go b/cmd/agentsview/pg_service.go index 148e9904c..7705e6d2b 100644 --- a/cmd/agentsview/pg_service.go +++ b/cmd/agentsview/pg_service.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/cobra" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/postgres" ) func newPGServiceCommand() *cobra.Command { @@ -174,22 +175,51 @@ func runServiceStatus() { } ctx := context.Background() out, _ := mgr.status(ctx) - fmt.Print(out) - if out != "" && !strings.HasSuffix(out, "\n") { - fmt.Println() - } // Show the last successful push time from local sync state. appCfg := loadServiceConfig() database, derr := openReadOnlyDB(appCfg) if derr != nil { + writeServiceStatus(os.Stdout, out, "", false) return } defer database.Close() - lastPush, gerr := database.GetSyncState("last_push_at") + lastPush, gerr := readServiceLastPush(appCfg, database) if gerr != nil { + writeServiceStatus(os.Stdout, out, "", false) + return + } + writeServiceStatus(os.Stdout, out, lastPush, true) +} + +func writeServiceStatus( + out io.Writer, + serviceOut, lastPush string, + lastPushAvailable bool, +) { + fmt.Fprint(out, serviceOut) + if serviceOut != "" && !strings.HasSuffix(serviceOut, "\n") { + fmt.Fprintln(out) + } + if !lastPushAvailable { return } - fmt.Printf("Last push: %s\n", valueOrNever(lastPush)) + fmt.Fprintf(out, "Last push: %s\n", valueOrNever(lastPush)) +} + +func readServiceLastPush( + appCfg config.Config, + database postgres.SyncStateStore, +) (string, error) { + targets, err := resolvePGTargetSelections(appCfg, "", false) + if err != nil { + return "", err + } + target := targets[0] + return postgres.ReadLastPushAt( + database, + target.SyncStateTarget, + target.MigrateLegacySyncState, + ) } func runServiceSimple(action string) { diff --git a/cmd/agentsview/pg_service_manager.go b/cmd/agentsview/pg_service_manager.go index 4ae2a8e01..8b8f5aec2 100644 --- a/cmd/agentsview/pg_service_manager.go +++ b/cmd/agentsview/pg_service_manager.go @@ -26,7 +26,8 @@ func rejectEnvDependentServicePGURL(rawURL string) error { if os.Getenv("AGENTSVIEW_PG_URL") != "" { return fmt.Errorf( "AGENTSVIEW_PG_URL is set; pg service install requires a " + - "literal pg.url in config.toml because background " + + "literal PostgreSQL URL in config.toml, either " + + "legacy [pg].url or the default_pg-selected [pg.NAME].url, because background " + "services do not inherit your shell environment", ) } @@ -35,7 +36,8 @@ func rejectEnvDependentServicePGURL(rawURL string) error { if config.IsEnvDependentURL(rawURL) { return fmt.Errorf( "pg.url uses environment variable expansion; pg service " + - "install requires a literal pg.url in config.toml because " + + "install requires a literal PostgreSQL URL in config.toml, either " + + "legacy [pg].url or the default_pg-selected [pg.NAME].url, because " + "background services do not inherit your shell environment", ) } @@ -113,7 +115,11 @@ func isUnsafeServiceRune(r rune) bool { // build a spec when the PG URL is not resolvable so the service is // only ever created in a working state. func buildServiceSpec(appCfg config.Config) (serviceSpec, error) { - if err := rejectEnvDependentServicePGURL(appCfg.PG.URL); err != nil { + rawPG, err := appCfg.RawPGTarget("") + if err != nil { + return serviceSpec{}, err + } + if err := rejectEnvDependentServicePGURL(rawPG.URL); err != nil { return serviceSpec{}, err } pgCfg, err := appCfg.ResolvePG() @@ -122,7 +128,7 @@ func buildServiceSpec(appCfg config.Config) (serviceSpec, error) { } if pgCfg.URL == "" { return serviceSpec{}, fmt.Errorf( - "pg.url not configured; set it before installing the service", + "pg url not configured; configure a legacy [pg].url or the default_pg-selected [pg.NAME].url before installing the service", ) } exe, err := os.Executable() diff --git a/cmd/agentsview/pg_service_test.go b/cmd/agentsview/pg_service_test.go index 24f9b2493..5f27eccb0 100644 --- a/cmd/agentsview/pg_service_test.go +++ b/cmd/agentsview/pg_service_test.go @@ -10,12 +10,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/db" ) func TestBuildServiceSpec_RequiresURL(t *testing.T) { t.Setenv("AGENTSVIEW_PG_URL", "") _, err := buildServiceSpec(config.Config{}) require.Error(t, err, "expected error when pg.url is not configured") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestBuildServiceSpec_PopulatesFields(t *testing.T) { @@ -42,6 +44,30 @@ func TestBuildServiceSpec_PopulatesFields(t *testing.T) { } } +func TestBuildServiceSpec_UsesNamedDefaultTarget(t *testing.T) { + t.Setenv("AGENTSVIEW_PG_URL", "") + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + dataDir := t.TempDir() + spec, err := buildServiceSpec(config.Config{ + DataDir: dataDir, + DefaultPG: "archive", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://u:p@localhost/archive?sslmode=disable", + MachineName: "archivebox", + }, + }, + }) + require.NoError(t, err) + assert.Equal(t, dataDir, spec.DataDir) + assert.Equal(t, filepath.Join(dataDir, "pg-watch.log"), spec.LogPath) + assert.NotEmpty(t, spec.BinPath) +} + func TestBuildServiceSpec_RejectsEnvPGURL(t *testing.T) { t.Setenv("AGENTSVIEW_PG_URL", "postgres://from-env") _, err := buildServiceSpec(config.Config{ @@ -53,7 +79,8 @@ func TestBuildServiceSpec_RejectsEnvPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "AGENTSVIEW_PG_URL") - assert.Contains(t, err.Error(), "literal pg.url") + assert.Contains(t, err.Error(), "literal PostgreSQL URL") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { @@ -68,6 +95,7 @@ func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "environment variable expansion") + assert.Contains(t, err.Error(), "literal PostgreSQL URL") _, err = buildServiceSpec(config.Config{ DataDir: t.TempDir(), @@ -78,6 +106,7 @@ func TestBuildServiceSpec_RejectsExpandedPGURL(t *testing.T) { }) require.Error(t, err) assert.Contains(t, err.Error(), "environment variable expansion") + assert.Contains(t, err.Error(), "default_pg-selected [pg.NAME].url") } func TestValidateServiceSpec_RejectsUnsafeChars(t *testing.T) { @@ -183,6 +212,78 @@ func TestWarnUninheritedServiceEnv(t *testing.T) { assert.Contains(t, out, "config.toml") } +func TestReadServiceLastPush_UsesDefaultTargetScope(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err) + defer local.Close() + + require.NoError(t, local.SetSyncState( + "last_push_at:work", + "2026-03-11T12:34:56.123Z", + )) + + lastPush, err := readServiceLastPush(config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + }, local) + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) +} + +func TestReadServiceLastPush_ReadsLegacyDefaultStateWithoutMigration(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err) + defer local.Close() + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + lastPush, err := readServiceLastPush(config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + }, local) + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) + + legacyValue, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) + + scopedValue, err := local.GetSyncState("last_push_at:work") + require.NoError(t, err) + assert.Empty(t, scopedValue) +} + +func TestWriteServiceStatus_AppendsScopedLastPush(t *testing.T) { + var out strings.Builder + writeServiceStatus( + &out, "Service is active", + "2026-03-11T12:34:56.123Z", true, + ) + assert.Equal(t, + "Service is active\nLast push: 2026-03-11T12:34:56.123Z\n", + out.String(), + ) +} + +func TestWriteServiceStatus_PrintsNeverWhenNoPushCompleted(t *testing.T) { + var out strings.Builder + writeServiceStatus(&out, "Service is active\n", "", true) + assert.Equal(t, "Service is active\nLast push: never\n", out.String()) +} + +func TestWriteServiceStatus_SkipsLastPushWhenUnavailable(t *testing.T) { + var out strings.Builder + writeServiceStatus(&out, "Service is active\n", "", false) + assert.Equal(t, "Service is active\n", out.String()) +} + // recordingRunner captures shell-out calls for assertions. type recordingRunner struct { calls [][]string diff --git a/cmd/agentsview/pg_test.go b/cmd/agentsview/pg_test.go index adb8856fa..9e6a3a2ae 100644 --- a/cmd/agentsview/pg_test.go +++ b/cmd/agentsview/pg_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "log" "os" "os/exec" "path/filepath" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.kenn.io/agentsview/internal/config" + "go.kenn.io/agentsview/internal/parser" "go.kenn.io/agentsview/internal/postgres" ) @@ -24,6 +26,36 @@ func loadPGServeConfigForTest(t *testing.T, args ...string) (config.Config, stri return loadPGServeConfig(cmd) } +func restoreTestLogger(t *testing.T) { + t.Helper() + oldWriter := log.Writer() + t.Cleanup(func() { + if file, ok := log.Writer().(*os.File); ok && file != os.Stderr && file != os.Stdout { + _ = file.Close() + } + log.SetOutput(oldWriter) + }) +} + +func clearConfiguredAgentEnvVars(t *testing.T) { + t.Helper() + for _, def := range parser.Registry { + if def.EnvVar != "" { + t.Setenv(def.EnvVar, "") + } + } +} + +func isolateDefaultAgentDirs(t *testing.T, root string) { + t.Helper() + t.Setenv("HOME", root) + t.Setenv("USERPROFILE", root) + t.Setenv("APPDATA", root) + t.Setenv("LOCALAPPDATA", root) + t.Setenv("HOMEDRIVE", filepath.VolumeName(root)) + t.Setenv("HOMEPATH", `\`) +} + func TestLoadPGServeConfigDoesNotInheritServeProxySettings(t *testing.T) { dataDir := testDataDir(t) @@ -107,6 +139,184 @@ func TestPGServeConfigAcceptsManagedCaddyFlags(t *testing.T) { assert.Empty(t, basePath, "basePath should be empty") } +func TestRunPGPush_IgnoresBrokenUnselectedTarget(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + var err error + out := captureStdout(t, func() { + err = runPGPush(PGPushConfig{}, "archive") + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") + assert.Contains(t, out, "Target: archive") +} + +func TestRunPGStatus_IgnoresBrokenUnselectedTarget(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGStatus("archive", false) + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "BROKEN_WORK_TARGET") + assert.NoFileExists(t, filepath.Join(dataDir, "sessions.db")) +} + +func TestRunPGStatus_IgnoresUnreadableLocalWatermark(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + require.NoError(t, os.WriteFile( + filepath.Join(dataDir, "sessions.db"), + nil, + 0o600, + )) + + err := runPGStatus("archive", false) + require.Error(t, err) + assert.Contains(t, err.Error(), "pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") + assert.NotContains(t, err.Error(), "opening database") + assert.NotContains(t, err.Error(), "sessions.db is empty") +} + +func TestRunPGPushAll_AggregatesTargetFailures(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "work" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGPush(PGPushConfig{AllTargets: true}, "") + require.Error(t, err) + assert.Contains(t, err.Error(), "2 pg target(s) failed") + assert.Contains(t, err.Error(), "work (default): expanding url: environment variable(s) not set: BROKEN_WORK_TARGET") + assert.Contains(t, err.Error(), "archive: pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") +} + +func TestRunPGStatusAll_AggregatesTargetFailures(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + writeTestConfig(t, dataDir, ` +default_pg = "work" + +[pg.work] +url = "${BROKEN_WORK_TARGET}" +machine_name = "workbox" + +[pg.archive] +url = "postgres://archive" +`) + + err := runPGStatus("", true) + require.Error(t, err) + assert.Contains(t, err.Error(), "2 pg target(s) failed") + assert.Contains(t, err.Error(), "work (default): expanding url: environment variable(s) not set: BROKEN_WORK_TARGET") + assert.Contains(t, err.Error(), "archive: pg connection to archive permits plaintext") + assert.Contains(t, err.Error(), "allow_insecure = true under [pg] or [pg.NAME]") +} + +func TestPGPushCommandPrefixesErrors(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + t.Setenv("AGENTSVIEW_NO_DAEMON", "1") + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + + _, err := executeCommand(newRootCommand(), "pg", "push", "archive") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg push: pg connection to archive permits plaintext") +} + +func TestPGPushWatchCommandPrefixesErrors(t *testing.T) { + _, err := executeCommand(newRootCommand(), "pg", "push", "--watch", "--all") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg push --watch: --all cannot be combined with --watch") +} + +func TestPGStatusCommandPrefixesErrors(t *testing.T) { + dataDir := t.TempDir() + t.Setenv("AGENTSVIEW_DATA_DIR", dataDir) + clearConfiguredAgentEnvVars(t) + isolateDefaultAgentDirs(t, dataDir) + restoreTestLogger(t) + writeTestConfig(t, dataDir, ` +default_pg = "archive" + +[pg.archive] +url = "postgres://archive" +`) + + _, err := executeCommand(newRootCommand(), "pg", "status", "archive") + require.Error(t, err) + assert.Contains(t, err.Error(), "pg status: pg connection to archive permits plaintext") +} + func TestRunPGServeRejectsInvalidManagedCaddyConfigBeforePGSetup(t *testing.T) { dataDir := t.TempDir() diff --git a/cmd/agentsview/pg_watch.go b/cmd/agentsview/pg_watch.go index 38ab763aa..664d946e6 100644 --- a/cmd/agentsview/pg_watch.go +++ b/cmd/agentsview/pg_watch.go @@ -110,21 +110,34 @@ func (p *pgPusher) reset() { // resolveWatchTargets validates PG config and resolves the project // filters for a watch run. func resolveWatchTargets( - appCfg config.Config, cfg PGPushConfig, -) (pgCfg config.PGConfig, projects, exclude []string, err error) { - pgCfg, err = appCfg.ResolvePG() + appCfg config.Config, + cfg PGPushConfig, + targetName string, +) ( + target pgTargetSelection, + projects, exclude []string, + err error, +) { + targets, err := resolvePGTargetSelections( + appCfg, targetName, false, + ) + if err != nil { + return pgTargetSelection{}, nil, nil, err + } + target = targets[0] + target, err = resolvePGTargetConfig(appCfg, target) if err != nil { - return config.PGConfig{}, nil, nil, err + return pgTargetSelection{}, nil, nil, err } - if pgCfg.URL == "" { - return config.PGConfig{}, nil, nil, + if target.PG.URL == "" { + return pgTargetSelection{}, nil, nil, fmt.Errorf("url not configured") } - projects, exclude, err = resolvePushProjects(pgCfg, cfg) + projects, exclude, err = resolvePushProjects(target.PG, cfg) if err != nil { - return config.PGConfig{}, nil, nil, err + return pgTargetSelection{}, nil, nil, err } - return pgCfg, projects, exclude, nil + return target, projects, exclude, nil } const ( @@ -135,19 +148,24 @@ const ( // runPGPushWatch runs the long-lived auto-push daemon: an initial // catch-up push, then pushes triggered by file changes (debounced) // and a periodic floor tick, until interrupted. -func runPGPushWatch(cfg PGPushConfig) { +func runPGPushWatch( + cfg PGPushConfig, + targetName string, +) error { appCfg, err := config.LoadMinimal() if err != nil { - log.Fatalf("loading config: %v", err) + return fmt.Errorf("loading config: %w", err) } if err := os.MkdirAll(appCfg.DataDir, 0o755); err != nil { - log.Fatalf("creating data dir: %v", err) + return fmt.Errorf("creating data dir: %w", err) } setupLogFileNamed(appCfg.DataDir, "pg-watch.log") - pgCfg, projects, exclude, err := resolveWatchTargets(appCfg, cfg) + target, projects, exclude, err := resolveWatchTargets( + appCfg, cfg, targetName, + ) if err != nil { - fatal("pg push --watch: %v", err) + return err } debounce := cfg.Debounce @@ -165,15 +183,15 @@ func runPGPushWatch(cfg PGPushConfig) { Prefix: "pg-watch", }).LockPath() if err != nil { - fatal("pg push --watch: %v", err) + return err } lock := flock.New(lockPath) locked, err := lock.TryLock() if err != nil { - fatal("pg push --watch: locking %s: %v", lockPath, err) + return fmt.Errorf("locking %s: %w", lockPath, err) } if !locked { - fatal("pg push --watch: already locked (%s)", lockPath) + return fmt.Errorf("already locked (%s)", lockPath) } defer func() { if rerr := lock.Unlock(); rerr != nil { @@ -188,17 +206,18 @@ func runPGPushWatch(cfg PGPushConfig) { log.Printf( "pg watch: starting (machine=%q debounce=%s interval=%s)", - pgCfg.MachineName, debounce, interval, + target.PG.MachineName, debounce, interval, ) backend, cleanup, err := resolveArchiveWriteBackend(ctx, appCfg) if err != nil { - fatal("opening writer: %v", err) + return fmt.Errorf("opening writer: %w", err) } defer cleanup() if err := backend.PGPushWatch( - ctx, pgCfg, cfg, projects, exclude, debounce, interval, + ctx, target, cfg, projects, exclude, debounce, interval, ); err != nil { - fatal("pg push --watch: %v", err) + return err } + return nil } diff --git a/cmd/agentsview/pg_watch_test.go b/cmd/agentsview/pg_watch_test.go index a03876e2e..932506869 100644 --- a/cmd/agentsview/pg_watch_test.go +++ b/cmd/agentsview/pg_watch_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "net/http" + "os" "path/filepath" "testing" "time" @@ -82,6 +83,8 @@ func TestArchiveWriteBackendPGPushPostsToDaemon(t *testing.T) { assert.Equal(t, "mirror", req.PG.Schema) assert.Equal(t, "laptop", req.PG.MachineName) assert.True(t, req.PG.AllowInsecure) + assert.Equal(t, "work", req.SyncStateTarget) + assert.True(t, req.MigrateLegacySyncState) writeTestJSON(t, w, postgres.PushResult{ SessionsPushed: 2, MessagesPushed: 3, @@ -94,11 +97,15 @@ func TestArchiveWriteBackendPGPushPostsToDaemon(t *testing.T) { ) result, err := backend.PGPush( context.Background(), - config.PGConfig{ - URL: "postgres://user:pass@host/db", - Schema: "mirror", - MachineName: "laptop", - AllowInsecure: true, + pgTargetSelection{ + PG: config.PGConfig{ + URL: "postgres://user:pass@host/db", + Schema: "mirror", + MachineName: "laptop", + AllowInsecure: true, + }, + SyncStateTarget: "work", + MigrateLegacySyncState: true, }, PGPushConfig{Full: true}, []string{"a"}, @@ -162,7 +169,11 @@ func TestArchiveWriteBackendPGPushWatchReResolvesDaemon(t *testing.T) { ) err := backend.PGPushWatch( ctx, - config.PGConfig{URL: "postgres://user:pass@host/db"}, + pgTargetSelection{ + PG: config.PGConfig{ + URL: "postgres://user:pass@host/db", + }, + }, PGPushConfig{}, nil, nil, @@ -306,7 +317,9 @@ func TestPgPusher_LogsSkippedConflicts(t *testing.T) { func TestResolveWatchTargets_ErrorsOnEmptyURL(t *testing.T) { appCfg := config.Config{} // no PG URL - _, _, _, err := resolveWatchTargets(appCfg, PGPushConfig{}) + _, _, _, err := resolveWatchTargets( + appCfg, PGPushConfig{}, "", + ) require.Error(t, err, "expected error when url not configured") } @@ -317,10 +330,139 @@ func TestResolveWatchTargets_ResolvesProjects(t *testing.T) { MachineName: "box1", }, } - pg, inc, _, err := resolveWatchTargets( - appCfg, PGPushConfig{ProjectsFlag: "a,b"}, + target, inc, _, err := resolveWatchTargets( + appCfg, PGPushConfig{ProjectsFlag: "a,b"}, "", ) require.NoError(t, err) - assert.NotEmpty(t, pg.URL, "expected resolved URL") + assert.NotEmpty(t, target.PG.URL, "expected resolved URL") assert.Equal(t, []string{"a", "b"}, inc) } + +func TestResolveWatchTargets_IgnoresBrokenUnselectedTarget(t *testing.T) { + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + } + + target, _, _, err := resolveWatchTargets( + appCfg, PGPushConfig{}, "archive", + ) + require.NoError(t, err) + assert.Equal(t, "archive", target.Name) + assert.Equal(t, "postgres://archive", target.PG.URL) +} + +func TestResolvePGTargetSelections_DefaultAndAll(t *testing.T) { + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work", MachineName: "workbox"}, + "archive": {URL: "postgres://archive", MachineName: "archivebox"}, + }, + } + + defaultTarget, err := resolvePGTargetSelections( + appCfg, "", false, + ) + require.NoError(t, err) + require.Len(t, defaultTarget, 1) + assert.Equal(t, "work", defaultTarget[0].Name) + assert.True(t, defaultTarget[0].IsDefault) + assert.Equal(t, "work", defaultTarget[0].SyncStateTarget) + assert.True(t, defaultTarget[0].MigrateLegacySyncState) + assert.Empty(t, defaultTarget[0].PG.URL) + + allTargets, err := resolvePGTargetSelections( + appCfg, "", true, + ) + require.NoError(t, err) + require.Len(t, allTargets, 2) + assert.Equal(t, "work", allTargets[0].Name) + assert.Equal(t, "archive", allTargets[1].Name) +} + +func TestResolvePGTargetConfig_IgnoresBrokenUnselectedTarget(t *testing.T) { + restoreUnsetEnv(t, "BROKEN_WORK_TARGET") + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": { + URL: "${BROKEN_WORK_TARGET}", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + } + + target, err := resolvePGTargetConfig( + appCfg, + pgTargetSelection{Name: "archive"}, + ) + require.NoError(t, err) + assert.Equal(t, "postgres://archive", target.PG.URL) +} + +func restoreUnsetEnv(t *testing.T, name string) { + t.Helper() + oldValue, hadValue := os.LookupEnv(name) + require.NoError(t, os.Unsetenv(name)) + t.Cleanup(func() { + if hadValue { + require.NoError(t, os.Setenv(name, oldValue)) + return + } + require.NoError(t, os.Unsetenv(name)) + }) +} + +func TestResolvePGTargetSelections_RejectsLegacyNamedLookup(t *testing.T) { + appCfg := config.Config{ + PG: config.PGConfig{ + URL: "postgres://legacy", + MachineName: "legacybox", + }, + } + + _, err := resolvePGTargetSelections( + appCfg, "archive", false, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "single legacy [pg] block") +} + +func TestResolvePGTargetSelections_RejectsTargetWithAll(t *testing.T) { + appCfg := config.Config{ + DefaultPG: "work", + PGTargets: map[string]config.PGConfig{ + "work": {URL: "postgres://work"}, + }, + } + + _, err := resolvePGTargetSelections( + appCfg, "work", true, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot be combined with --all") +} + +func TestNewPGPushCommandRejectsAllWatch(t *testing.T) { + cmd := newPGPushCommand() + cmd.SetArgs([]string{"--all", "--watch"}) + + err := cmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "--all cannot be combined with --watch") +} diff --git a/cmd/agentsview/session.go b/cmd/agentsview/session.go index 82f2686e2..b830f21e5 100644 --- a/cmd/agentsview/session.go +++ b/cmd/agentsview/session.go @@ -181,7 +181,7 @@ func resolvePGReadConfig( } if pgCfg.URL == "" { return config.PGConfig{}, false, errors.New( - "pg url not configured; set AGENTSVIEW_PG_URL or [pg].url", + "pg url not configured; set AGENTSVIEW_PG_URL, use a legacy [pg].url, or configure default_pg with named [pg.NAME] targets", ) } return pgCfg, true, nil diff --git a/docs/commands.md b/docs/commands.md index 6c4982599..759500824 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -397,7 +397,7 @@ Sync sessions from local SQLite to PostgreSQL. See [PostgreSQL Sync](/pg-sync/) for full documentation. ```bash -agentsview pg push [flags] +agentsview pg push [target] [flags] ``` | Flag | Default | Description | @@ -406,6 +406,7 @@ agentsview pg push [flags] | `--projects` | | Comma-separated projects to push (inclusive) | | `--exclude-projects` | | Comma-separated projects to exclude from push | | `--all-projects` | `false` | Ignore configured project filters for this run | +| `--all` | `false` | Push every configured PostgreSQL target sequentially | | `--watch` | `false` | Run continuously, pushing on change plus a periodic floor | | `--debounce` | `30s` | Coalesce window after a change before pushing (`--watch` only) | | `--interval` | `15m` | Periodic floor push interval (`--watch` only) | @@ -420,7 +421,7 @@ for details on how filtering interacts with the push watermark. Show PostgreSQL sync status. ```bash -agentsview pg status +agentsview pg status [target] [--all] ``` --- @@ -688,7 +689,7 @@ target an explicit running daemon, `AGENTSVIEW_SERVER_TOKEN` or `--server-token-file ` when that daemon requires auth, or `--pg` to read from configured PostgreSQL. -`AGENTSVIEW_PG_URL` and `[pg].url` are sync configuration only; they +`AGENTSVIEW_PG_URL`, a legacy `[pg].url`, or the effective default target from `default_pg` plus `[pg.NAME]` are sync configuration only; they do not change the default read path. Read commands use local SQLite unless `--pg` is supplied, in which case they fail fast if no connection URL is available. Mutating commands such as `session sync` diff --git a/docs/pg-sync.md b/docs/pg-sync.md index a6c307efa..0af38767b 100644 --- a/docs/pg-sync.md +++ b/docs/pg-sync.md @@ -30,6 +30,12 @@ The `machine_name` identifies which machine pushed each session. It defaults to the system hostname if omitted. It must not be `"local"` (reserved for the local SQLite sentinel). +For multiple PostgreSQL destinations, use named `[pg.NAME]` blocks and +`default_pg` instead of the legacy single `[pg]` block. Named target names +are normalized case-insensitively, and `all`, `local`, plus the legacy `[pg]` +field names `url`, `schema`, `machine_name`, `allow_insecure`, `projects`, and +`exclude_projects` are unavailable as `[pg.NAME]` names. + ### 2. Push Sessions ```bash @@ -71,11 +77,12 @@ entirely by PostgreSQL. No local SQLite, file watching, or uploads Sync sessions from the local SQLite database to PostgreSQL. ```bash -agentsview pg push [flags] +agentsview pg push [target] [flags] ``` | Flag | Default | Description | |------|---------|-------------| +| `--all` | `false` | Push every configured PG target sequentially | | `--full` | `false` | Force full local resync and re-push, bypassing change detection | | `--projects` | | Comma-separated projects to push (inclusive) | | `--exclude-projects` | | Comma-separated projects to exclude | @@ -88,6 +95,11 @@ Without `--watch`, push is on-demand — run it whenever you want to sync. With `--watch`, the command stays in the foreground and keeps pushing until interrupted. +When no target is passed, `pg push` uses the effective default target. +Pass one named target explicitly to push just that destination, or use +`--all` to fan out across every configured target. `--all --watch` is +rejected. + **What happens on push:** 1. Runs a local sync to pick up any new or modified session files @@ -138,8 +150,9 @@ Operational details: bounded final flush. - Logs are written to `pg-watch.log` under the AgentsView data directory. -- The watcher uses the same `[pg]` config, machine name, project - filters, classifier settings, and +- The watcher uses the selected PostgreSQL target, or the + `default_pg` target when no name is passed, along with the same + machine name, project filters, classifier settings, and `result_content_blocked_categories` behavior as one-shot `pg push`. @@ -214,9 +227,14 @@ filter). Show the current sync state. ```bash -agentsview pg status +agentsview pg status [target] +agentsview pg status --all ``` +Without a target name, `pg status` uses the effective default target. +Pass one named target explicitly to inspect that destination, or use +`--all` to print every configured target sequentially. + Output: ``` @@ -259,8 +277,9 @@ The generated unit runs `agentsview pg push --watch`, pins and writes logs to `~/.agentsview/pg-watch.log` unless you changed the data directory. -`install` requires a literal `[pg].url` in -`~/.agentsview/config.toml`. It intentionally rejects +`install` requires a literal PostgreSQL URL in the effective default target of +`~/.agentsview/config.toml`, either the legacy `[pg].url` or the target selected +by `default_pg` from named `[pg.NAME]` blocks. It intentionally rejects `AGENTSVIEW_PG_URL` and environment-expanded URLs such as `${PG_PASSWORD}` because background services do not inherit your interactive shell environment. Other session-directory environment @@ -396,7 +415,7 @@ filter in the sidebar to show sessions from specific machines. ## Configuration -All PostgreSQL settings live in the `[pg]` section of +Single-target PostgreSQL settings can live in the legacy `[pg]` section of `~/.agentsview/config.toml`: ```toml @@ -416,6 +435,29 @@ allow_insecure = false | `projects` | | Array of project names to include in push | | `exclude_projects` | | Array of project names to exclude from push | +To manage more than one PostgreSQL destination, define named `[pg.NAME]` blocks +and select the effective default target with `default_pg`: + +```toml +default_pg = "work" + +[pg.work] +url = "postgres://user:pass@work-db:5432/agentsview?sslmode=require" +machine_name = "my-laptop" + +[pg.archive] +url = "postgres://user:pass@archive-db:5432/agentsview?sslmode=require" +machine_name = "my-laptop-archive" +exclude_projects = ["scratch"] +``` + +`agentsview pg push` and `agentsview pg status` use the effective default target +when no target name is passed, accept one target name explicitly, and also +support `--all` for sequential multi-target runs. `agentsview pg push --watch` +follows the effective default target unless you pass one named target +explicitly. `agentsview pg serve` and `agentsview pg service` stay on the +effective default target in this release, and `--all --watch` is rejected. + !!! warning The `url` field is required for all `pg` commands. If it contains credentials, ensure `config.toml` has restricted @@ -437,7 +479,9 @@ url = "postgres://${PG_USER}:${PG_PASSWORD}@host:5432/dbname?sslmode=require" ### Environment Variables PostgreSQL settings can also be configured via environment -variables, which override `config.toml` values: +variables. In legacy single-target mode they override the `[pg]` +values. In named-target mode they apply only to the effective default +target: | Variable | Description | |----------|-------------| diff --git a/internal/config/config.go b/internal/config/config.go index a6fa9909a..7559d9cd4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ import ( "path/filepath" "regexp" "slices" + "sort" "strconv" "strings" "sync" @@ -66,6 +67,29 @@ type PGConfig struct { ExcludeProjects []string `toml:"exclude_projects" json:"exclude_projects,omitempty"` } +type pgEnvOverrides struct { + URL string + Schema string + MachineName string +} + +// ResolvedPGTarget is one PostgreSQL target after target selection, +// defaulting, and default-target env overrides are applied. +type ResolvedPGTarget struct { + Name string + Config PGConfig + IsDefault bool +} + +var pgConfigKeys = map[string]struct{}{ + "url": {}, + "schema": {}, + "machine_name": {}, + "allow_insecure": {}, + "projects": {}, + "exclude_projects": {}, +} + // DuckDBConfig holds DuckDB mirror and Quack connection settings. type DuckDBConfig struct { Path string `toml:"path" json:"path"` @@ -125,6 +149,8 @@ type Config struct { DisableUpdateCheck bool `json:"disable_update_check" toml:"disable_update_check"` NoSync bool `json:"-" toml:"-"` PG PGConfig `json:"pg,omitempty" toml:"pg"` + DefaultPG string `json:"default_pg,omitempty" toml:"default_pg"` + PGTargets map[string]PGConfig `json:"-" toml:"-"` DuckDB DuckDBConfig `json:"duckdb,omitempty" toml:"duckdb"` Automated AutomatedConfig `json:"automated,omitempty" toml:"automated"` Agent map[string]AgentConfig `json:"agent,omitempty" toml:"agent"` @@ -160,6 +186,8 @@ type Config struct { // Used to prevent auto-bind to 0.0.0.0 when the user // explicitly requested a specific host. HostExplicit bool `json:"-" toml:"-"` + + pgEnvOverrides pgEnvOverrides } type dirSource int @@ -532,6 +560,7 @@ func (c *Config) applyConfigTOML(data string) error { RequireAuth bool `toml:"require_auth"` RemoteAccess bool `toml:"remote_access"` DisableUpdateCheck bool `toml:"disable_update_check"` + DefaultPG string `toml:"default_pg"` PG PGConfig `toml:"pg"` DuckDB DuckDBConfig `toml:"duckdb"` Automated AutomatedConfig `toml:"automated"` @@ -544,6 +573,10 @@ func (c *Config) applyConfigTOML(data string) error { if err != nil { return fmt.Errorf("parsing config: %w", err) } + var raw map[string]any + if _, err := toml.Decode(data, &raw); err != nil { + return fmt.Errorf("parsing config raw: %w", err) + } if file.GithubToken != "" { c.GithubToken = file.GithubToken } @@ -576,25 +609,36 @@ func (c *Config) applyConfigTOML(data string) error { } c.RequireAuth = file.RequireAuth || file.RemoteAccess c.DisableUpdateCheck = file.DisableUpdateCheck - // Merge pg field-by-field so env vars override only - // the fields they set, preserving config-file settings. - if file.PG.URL != "" && c.PG.URL == "" { - c.PG.URL = file.PG.URL - } - if file.PG.Schema != "" && c.PG.Schema == "" { - c.PG.Schema = file.PG.Schema - } - if file.PG.MachineName != "" && c.PG.MachineName == "" { - c.PG.MachineName = file.PG.MachineName + if meta.IsDefined("default_pg") { + c.DefaultPG = normalizePGTargetName(file.DefaultPG) } - if file.PG.AllowInsecure { - c.PG.AllowInsecure = true - } - if file.PG.Projects != nil && c.PG.Projects == nil { - c.PG.Projects = file.PG.Projects + legacyPG, namedPG, err := parsePGConfigSection(raw["pg"]) + if err != nil { + return fmt.Errorf("pg: %w", err) } - if file.PG.ExcludeProjects != nil && c.PG.ExcludeProjects == nil { - c.PG.ExcludeProjects = file.PG.ExcludeProjects + if len(namedPG) > 0 { + c.PG = PGConfig{} + c.PGTargets = namedPG + } else { + c.PGTargets = nil + if legacyPG.URL != "" { + c.PG.URL = legacyPG.URL + } + if legacyPG.Schema != "" { + c.PG.Schema = legacyPG.Schema + } + if legacyPG.MachineName != "" { + c.PG.MachineName = legacyPG.MachineName + } + if legacyPG.AllowInsecure { + c.PG.AllowInsecure = true + } + if legacyPG.Projects != nil { + c.PG.Projects = legacyPG.Projects + } + if legacyPG.ExcludeProjects != nil { + c.PG.ExcludeProjects = legacyPG.ExcludeProjects + } } // Merge duckdb field-by-field so env vars override only // the fields they set, preserving config-file settings. @@ -658,10 +702,6 @@ func (c *Config) applyConfigTOML(data string) error { // Parse config-file dir arrays for agents that have a // ConfigKey. Only apply when not already set by env var. - var raw map[string]any - if _, err := toml.Decode(data, &raw); err != nil { - return fmt.Errorf("parsing config raw: %w", err) - } for _, def := range parser.Registry { if def.ConfigKey == "" { continue @@ -795,13 +835,13 @@ func (c *Config) loadEnv() { c.DataDir = v } if v := os.Getenv("AGENTSVIEW_PG_URL"); v != "" { - c.PG.URL = v + c.pgEnvOverrides.URL = v } if v := os.Getenv("AGENTSVIEW_PG_SCHEMA"); v != "" { - c.PG.Schema = v + c.pgEnvOverrides.Schema = v } if v := os.Getenv("AGENTSVIEW_PG_MACHINE"); v != "" { - c.PG.MachineName = v + c.pgEnvOverrides.MachineName = v } if v := os.Getenv("AGENTSVIEW_DUCKDB_PATH"); v != "" { c.DuckDB.Path = v @@ -1306,6 +1346,109 @@ func hostLiteral(host string) string { return host } +func normalizePGTargetName(name string) string { + return strings.TrimSpace(strings.ToLower(name)) +} + +func isReservedPGTargetName(name string) bool { + switch normalizePGTargetName(name) { + case "all", "local": + return true + default: + return false + } +} + +func decodePGConfigMap(raw map[string]any) (PGConfig, error) { + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(raw); err != nil { + return PGConfig{}, fmt.Errorf("encoding pg config: %w", err) + } + var cfg PGConfig + if _, err := toml.Decode(buf.String(), &cfg); err != nil { + return PGConfig{}, fmt.Errorf("decoding pg config: %w", err) + } + return cfg, nil +} + +func parsePGConfigSection(value any) (PGConfig, map[string]PGConfig, error) { + if value == nil { + return PGConfig{}, nil, nil + } + section, ok := value.(map[string]any) + if !ok { + return PGConfig{}, nil, fmt.Errorf("expected [pg] to be a table") + } + hasLegacyFields := false + hasNamedTargets := false + legacyRaw := make(map[string]any) + namedTargets := make(map[string]PGConfig) + seenNames := make(map[string]string) + for rawName, rawValue := range section { + name := normalizePGTargetName(rawName) + if _, ok := pgConfigKeys[name]; ok { + if _, nested := rawValue.(map[string]any); nested { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s must be a scalar or array field, not a nested table", + rawName, + ) + } + hasLegacyFields = true + legacyRaw[rawName] = rawValue + continue + } + targetRaw, ok := rawValue.(map[string]any) + if !ok { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s must be a named target table", + rawName, + ) + } + hasNamedTargets = true + if name == "" { + return PGConfig{}, nil, fmt.Errorf( + "named PG targets must not be blank", + ) + } + if isReservedPGTargetName(name) { + return PGConfig{}, nil, fmt.Errorf( + "named PG target %q is reserved", + name, + ) + } + if prev, exists := seenNames[name]; exists { + return PGConfig{}, nil, fmt.Errorf( + "named PG targets %q and %q normalize to the same name %q", + prev, rawName, name, + ) + } + seenNames[name] = rawName + targetCfg, err := decodePGConfigMap(targetRaw) + if err != nil { + return PGConfig{}, nil, fmt.Errorf( + "[pg].%s: %w", rawName, err, + ) + } + namedTargets[name] = targetCfg + } + if hasLegacyFields && hasNamedTargets { + return PGConfig{}, nil, fmt.Errorf( + "cannot mix legacy [pg] fields with named [pg.NAME] targets", + ) + } + if hasLegacyFields { + legacyCfg, err := decodePGConfigMap(legacyRaw) + if err != nil { + return PGConfig{}, nil, err + } + return legacyCfg, nil, nil + } + if hasNamedTargets { + return PGConfig{}, namedTargets, nil + } + return PGConfig{}, nil, nil +} + // ResolveDataDir returns the effective data directory by applying // defaults and environment overrides, without reading any files. // Use this to determine where migration should target before @@ -1321,10 +1464,114 @@ func ResolveDataDir() (string, error) { return cfg.DataDir, nil } -// ResolvePG returns a copy of PG config with defaults applied -// and environment variables expanded in URL. -func (c *Config) ResolvePG() (PGConfig, error) { - pg := c.PG +// DefaultPGTargetName returns the effective named PG target for this config. +func (c *Config) DefaultPGTargetName() (string, error) { + if len(c.PGTargets) == 0 { + if c.DefaultPG != "" { + return "", fmt.Errorf( + "default_pg requires named [pg.NAME] targets", + ) + } + return "", nil + } + if c.DefaultPG != "" { + if _, ok := c.PGTargets[c.DefaultPG]; !ok { + return "", fmt.Errorf( + "default_pg %q does not match any named [pg.NAME] target", + c.DefaultPG, + ) + } + return c.DefaultPG, nil + } + if len(c.PGTargets) == 1 { + for name := range c.PGTargets { + return name, nil + } + } + return "", fmt.Errorf( + "default_pg is required when more than one [pg.NAME] target is defined", + ) +} + +func (c *Config) validatePGTargets() error { + _, err := c.DefaultPGTargetName() + return err +} + +func (c *Config) PGTargetNames() ([]string, string, error) { + if err := c.validatePGTargets(); err != nil { + return nil, "", err + } + if len(c.PGTargets) == 0 { + return nil, "", nil + } + defaultName, err := c.DefaultPGTargetName() + if err != nil { + return nil, "", err + } + names := make([]string, 0, len(c.PGTargets)) + for name := range c.PGTargets { + names = append(names, name) + } + sort.Slice(names, func(i, j int) bool { + if names[i] == defaultName { + return true + } + if names[j] == defaultName { + return false + } + return names[i] < names[j] + }) + return names, defaultName, nil +} + +// RawPGTarget returns the configured PG target before env expansion and +// default-field synthesis. An empty name selects the effective default target. +func (c *Config) RawPGTarget(name string) (PGConfig, error) { + if err := c.validatePGTargets(); err != nil { + return PGConfig{}, err + } + targetName := normalizePGTargetName(name) + if len(c.PGTargets) == 0 { + if targetName != "" { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + name, + ) + } + return c.PG, nil + } + if targetName == "" { + var err error + targetName, err = c.DefaultPGTargetName() + if err != nil { + return PGConfig{}, err + } + } + targetCfg, ok := c.PGTargets[targetName] + if !ok { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) + } + return targetCfg, nil +} + +func (c *Config) resolvePGConfig( + pg PGConfig, applyDefaultEnv bool, +) (PGConfig, error) { + if applyDefaultEnv { + if c.pgEnvOverrides.URL != "" { + pg.URL = c.pgEnvOverrides.URL + } + if c.pgEnvOverrides.Schema != "" { + pg.Schema = c.pgEnvOverrides.Schema + } + if c.pgEnvOverrides.MachineName != "" { + pg.MachineName = c.pgEnvOverrides.MachineName + } + } if pg.URL != "" { expanded, err := expandBracedEnv(pg.URL) if err != nil { @@ -1345,6 +1592,83 @@ func (c *Config) ResolvePG() (PGConfig, error) { return pg, nil } +// ResolvePG returns the effective default PG target with defaults applied +// and environment variables expanded in URL. +func (c *Config) ResolvePG() (PGConfig, error) { + return c.ResolvePGTarget("") +} + +// ResolvePGTarget resolves one named PG target, or the effective default +// target when name is empty. In legacy single-target mode, only the empty +// name is valid. +func (c *Config) ResolvePGTarget(name string) (PGConfig, error) { + if err := c.validatePGTargets(); err != nil { + return PGConfig{}, err + } + targetName := normalizePGTargetName(name) + if len(c.PGTargets) == 0 { + if targetName != "" { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured; config uses a single legacy [pg] block", + name, + ) + } + return c.resolvePGConfig(c.PG, true) + } + defaultName, err := c.DefaultPGTargetName() + if err != nil { + return PGConfig{}, err + } + if targetName == "" { + targetName = defaultName + } + targetCfg, ok := c.PGTargets[targetName] + if !ok { + return PGConfig{}, fmt.Errorf( + "pg target %q is not configured", + targetName, + ) + } + return c.resolvePGConfig(targetCfg, targetName == defaultName) +} + +// ResolvePGTargets resolves every configured PG target. Legacy single-target +// mode returns one unnamed default target. +func (c *Config) ResolvePGTargets() ([]ResolvedPGTarget, error) { + if err := c.validatePGTargets(); err != nil { + return nil, err + } + if len(c.PGTargets) == 0 { + pg, err := c.resolvePGConfig(c.PG, true) + if err != nil { + return nil, err + } + return []ResolvedPGTarget{{ + Config: pg, + IsDefault: true, + }}, nil + } + names, defaultName, err := c.PGTargetNames() + if err != nil { + return nil, err + } + targets := make([]ResolvedPGTarget, 0, len(names)) + for _, name := range names { + targetCfg, err := c.resolvePGConfig( + c.PGTargets[name], name == defaultName, + ) + if err != nil { + return nil, err + } + targets = append(targets, ResolvedPGTarget{ + Name: name, + Config: targetCfg, + IsDefault: name == defaultName, + }) + } + return targets, nil +} + // ResolveDuckDB returns a copy of DuckDB config with defaults applied // and environment variables expanded in path, URL, and token. func (c *Config) ResolveDuckDB() (DuckDBConfig, error) { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index aaa5e9a45..3938b336d 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -861,8 +861,182 @@ func TestLoadFile_PGConfig(t *testing.T) { cfg := f.LoadMinimal(t) - assert.Equal(t, tt.want.URL, cfg.PG.URL) - assert.Equal(t, tt.want.MachineName, cfg.PG.MachineName) + resolved, err := cfg.ResolvePG() + require.NoError(t, err) + + assert.Equal(t, tt.want.URL, resolved.URL) + if tt.want.MachineName == "" { + assert.NotEmpty(t, resolved.MachineName) + } else { + assert.Equal(t, tt.want.MachineName, resolved.MachineName) + } + }) + } +} + +func TestResolvePGTarget_NamedTargets(t *testing.T) { + cfg := Config{ + DefaultPG: "work", + PGTargets: map[string]PGConfig{ + "work": { + URL: "postgres://work", + MachineName: "workbox", + }, + "archive": { + URL: "postgres://archive", + MachineName: "archivebox", + }, + }, + pgEnvOverrides: pgEnvOverrides{ + URL: "postgres://env-default", + MachineName: "envbox", + }, + } + + defaultTarget, err := cfg.ResolvePG() + require.NoError(t, err) + assert.Equal(t, "postgres://env-default", defaultTarget.URL) + assert.Equal(t, "envbox", defaultTarget.MachineName) + + archiveTarget, err := cfg.ResolvePGTarget("archive") + require.NoError(t, err) + assert.Equal(t, "postgres://archive", archiveTarget.URL) + assert.Equal(t, "archivebox", archiveTarget.MachineName) +} + +func TestResolvePGTargets_DefaultFirst(t *testing.T) { + cfg := Config{ + DefaultPG: "work", + PGTargets: map[string]PGConfig{ + "archive": {URL: "postgres://archive"}, + "work": {URL: "postgres://work"}, + }, + } + + targets, err := cfg.ResolvePGTargets() + require.NoError(t, err) + require.Len(t, targets, 2) + assert.Equal(t, "work", targets[0].Name) + assert.True(t, targets[0].IsDefault) + assert.Equal(t, "archive", targets[1].Name) + assert.False(t, targets[1].IsDefault) +} + +func TestResolvePGTargets_OneNamedTargetWithoutDefault(t *testing.T) { + cfg := Config{ + PGTargets: map[string]PGConfig{ + "work": {URL: "postgres://work"}, + }, + } + + targets, err := cfg.ResolvePGTargets() + require.NoError(t, err) + require.Len(t, targets, 1) + assert.Equal(t, "work", targets[0].Name) + assert.True(t, targets[0].IsDefault) +} + +func TestResolvePGTargets_MultipleNamedTargetsRequireDefault(t *testing.T) { + cfg := Config{ + PGTargets: map[string]PGConfig{ + "work": {URL: "postgres://work"}, + "archive": {URL: "postgres://archive"}, + }, + } + + _, err := cfg.ResolvePGTargets() + require.Error(t, err) + assert.Contains(t, err.Error(), "default_pg is required") +} + +func TestLoadMinimal_DefersNamedPGValidationForNonPGCommands(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + data := []byte(` +default_pg = "missing" + +[pg.work] +url = "postgres://work" +`) + require.NoError(t, os.WriteFile(path, data, 0o600)) + + cfg, err := LoadMinimal() + require.NoError(t, err) + + _, err = cfg.ResolvePG() + require.Error(t, err) + assert.Contains(t, err.Error(), `default_pg "missing" does not match any named [pg.NAME] target`) +} + +func TestLoadFile_PGMixedLegacyAndNamedTargetsFails(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + data := []byte(` +[pg] +url = "postgres://legacy" + +[pg.archive] +url = "postgres://archive" +`) + require.NoError(t, os.WriteFile(path, data, 0o600)) + + _, err := LoadMinimal() + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot mix legacy [pg] fields with named [pg.NAME] targets") +} + +func TestLoadFile_PGNamedTargetValidationErrors(t *testing.T) { + tests := []struct { + name string + toml string + wantErr string + }{ + { + name: "reserved all target", + toml: ` +[pg.all] +url = "postgres://all" +`, + wantErr: `named PG target "all" is reserved`, + }, + { + name: "reserved local target", + toml: ` +[pg.local] +url = "postgres://local" +`, + wantErr: `named PG target "local" is reserved`, + }, + { + name: "duplicate normalized target names", + toml: ` +[pg.Work] +url = "postgres://work" + +[pg.work] +url = "postgres://work2" +`, + wantErr: `normalize to the same name "work"`, + }, + { + name: "named target must be table", + toml: ` +[pg] +archive = "postgres://archive" +`, + wantErr: `[pg].archive must be a named target table`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := setupTestEnv(t) + path := filepath.Join(dir, configFileName) + require.NoError(t, os.WriteFile(path, []byte(tt.toml), 0o600)) + + _, err := LoadMinimal() + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) }) } } diff --git a/internal/config/pg_target_case_test.go b/internal/config/pg_target_case_test.go new file mode 100644 index 000000000..c63b16a18 --- /dev/null +++ b/internal/config/pg_target_case_test.go @@ -0,0 +1,39 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadFile_PGLegacyFieldNamesRejectNestedTablesCaseInsensitive(t *testing.T) { + tests := []struct { + name string + field string + wantErr string + }{ + { + name: "url", + field: "URL", + wantErr: "[pg].URL must be a scalar or array field, not a nested table", + }, + { + name: "schema", + field: "Schema", + wantErr: "[pg].Schema must be a scalar or array field, not a nested table", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, _, err := parsePGConfigSection(map[string]any{ + tt.field: map[string]any{ + "url": "postgres://nested", + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) + }) + } +} diff --git a/internal/parser/gemini_parser_test.go b/internal/parser/gemini_parser_test.go index bb0b64ed4..547f80d86 100644 --- a/internal/parser/gemini_parser_test.go +++ b/internal/parser/gemini_parser_test.go @@ -563,4 +563,4 @@ func TestParseGeminiSession_ContextTokensDelta(t *testing.T) { assert.False(t, msgs[1].HasContextTokens) assert.Equal(t, 0, msgs[1].ContextTokens) }) -} \ No newline at end of file +} diff --git a/internal/postgres/connect.go b/internal/postgres/connect.go index 8068cb553..94828d777 100644 --- a/internal/postgres/connect.go +++ b/internal/postgres/connect.go @@ -49,7 +49,7 @@ func CheckSSL(dsn string) error { "pg connection to %s permits plaintext; "+ "set sslmode=require (or verify-full) "+ "for non-local hosts, "+ - "or set allow_insecure = true under [pg] "+ + "or set allow_insecure = true under [pg] or [pg.NAME] "+ "in config to override", cfg.Host, ) diff --git a/internal/postgres/push.go b/internal/postgres/push.go index f1059bedc..afacf5c28 100644 --- a/internal/postgres/push.go +++ b/internal/postgres/push.go @@ -35,14 +35,6 @@ const ( var errSessionOwnershipConflict = errors.New("session ownership conflict") -// syncStateStore abstracts sync state read/write operations on the -// local database. Used by push boundary state helpers. -type syncStateStore interface { - GetSyncState(key string) (string, error) - SetSyncState(key, value string) error - GetOrCreateSyncState(key, defaultValue string) (string, error) -} - type pushBoundaryState struct { Cutoff string `json:"cutoff"` Fingerprints map[string]string `json:"fingerprints"` @@ -75,6 +67,7 @@ func (s *Sync) Push( ) (PushResult, error) { start := time.Now() var result PushResult + state := s.effectiveSyncState() if err := CheckDataVersionCompat(ctx, s.pg); err != nil { return result, err @@ -84,13 +77,13 @@ func (s *Sync) Push( return result, err } - lastPush, err := s.local.GetSyncState("last_push_at") + lastPush, err := state.GetSyncState("last_push_at") if err != nil { return result, fmt.Errorf( "reading last_push_at: %w", err, ) } - storedTargetFingerprint, err := s.local.GetSyncState( + storedTargetFingerprint, err := state.GetSyncState( lastPushTargetFingerprintKey, ) if err != nil { @@ -99,7 +92,7 @@ func (s *Sync) Push( lastPushTargetFingerprintKey, err, ) } - boundaryState, err := s.local.GetSyncState( + boundaryState, err := state.GetSyncState( lastPushBoundaryStateKey, ) if err != nil { @@ -119,7 +112,7 @@ func (s *Sync) Push( "pgsync: %s; clearing local push watermark state", reason, ) - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } lastPush = "" @@ -154,7 +147,7 @@ func (s *Sync) Push( // watermark and boundary state so the next // unfiltered push also starts from scratch. if s.isFiltered() && !pushStateCleared { - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } } @@ -185,7 +178,7 @@ func (s *Sync) Push( // watermark and boundary state so the next // unfiltered push also starts from scratch. if s.isFiltered() && !pushStateCleared { - if err := clearPushState(s.local); err != nil { + if err := clearPushState(state); err != nil { return result, err } } @@ -218,7 +211,7 @@ func (s *Sync) Push( if !full { var bErr error priorFingerprints, _, _, bErr = readBoundaryAndFingerprints( - s.local, lastPush, + state, lastPush, ) if bErr != nil { return result, bErr @@ -300,21 +293,21 @@ func (s *Sync) Push( boundaryKey = cutoff } if err := writePushBoundaryState( - s.local, boundaryKey, sessions, + state, boundaryKey, sessions, priorFingerprints, sessionFingerprints, ); err != nil { return result, err } } else { if err := finalizePushState( - s.local, cutoff, sessions, nil, + state, cutoff, sessions, nil, sessionFingerprints, ); err != nil { return result, err } } if err := persistPushTargetFingerprint( - s.local, s.targetFingerprint, + state, s.targetFingerprint, ); err != nil { return result, err } @@ -390,7 +383,7 @@ func (s *Sync) Push( boundaryKey = cutoff } if err := writePushBoundaryState( - s.local, boundaryKey, pushed, + state, boundaryKey, pushed, priorFingerprints, sessionFingerprints, ); err != nil { return result, err @@ -409,14 +402,14 @@ func (s *Sync) Push( mergedFingerprints = priorFingerprints } if err := finalizePushState( - s.local, finalizeCutoff, pushed, + state, finalizeCutoff, pushed, mergedFingerprints, sessionFingerprints, ); err != nil { return result, err } } if err := persistPushTargetFingerprint( - s.local, s.targetFingerprint, + state, s.targetFingerprint, ); err != nil { return result, err } @@ -583,7 +576,11 @@ func normalizePushMarkerMachineAliases( // name, so a machine rename keeps the same marker, and unique per local DB, so // a different host pushing to the same PG cannot mask this host's reset. func (s *Sync) pushMarkerID() (string, error) { - id, err := s.local.GetSyncState(pushMarkerIDStateKey) + state := s.local + if state == nil { + return "", fmt.Errorf("local db is required") + } + id, err := state.GetSyncState(pushMarkerIDStateKey) if err != nil { return "", fmt.Errorf("reading push marker id: %w", err) } @@ -595,7 +592,9 @@ func (s *Sync) pushMarkerID() (string, error) { return "", fmt.Errorf("generating push marker id: %w", err) } id = hex.EncodeToString(buf) - storedID, err := s.local.GetOrCreateSyncState(pushMarkerIDStateKey, id) + storedID, err := state.GetOrCreateSyncState( + pushMarkerIDStateKey, id, + ) if err != nil { return "", fmt.Errorf("persisting push marker id: %w", err) } @@ -2621,7 +2620,7 @@ func (s *Sync) normalizeSyncTimestamps( } s.schemaDone = true } - return NormalizeLocalSyncStateTimestamps(s.local) + return NormalizeLocalSyncStateTimestamps(s.effectiveSyncState()) } // sanitizePG strips null bytes and replaces invalid UTF-8 diff --git a/internal/postgres/push_test.go b/internal/postgres/push_test.go index e5921bb49..356768610 100644 --- a/internal/postgres/push_test.go +++ b/internal/postgres/push_test.go @@ -90,6 +90,41 @@ func TestPushMarkerIDReturnsInsertWinner(t *testing.T) { assert.Equal(t, "winner-marker", stored) } +func TestPushMarkerIDUsesUnscopedStateAcrossNamedTargets(t *testing.T) { + local, err := db.Open(filepath.Join(t.TempDir(), "local.db")) + require.NoError(t, err, "db.Open") + defer local.Close() + + workSync := &Sync{ + local: local, + syncState: newScopedSyncStateStore(local, "work", true), + } + archiveSync := &Sync{ + local: local, + syncState: newScopedSyncStateStore(local, "archive", false), + } + + workMarker, err := workSync.pushMarkerID() + require.NoError(t, err, "work pushMarkerID") + archiveMarker, err := archiveSync.pushMarkerID() + require.NoError(t, err, "archive pushMarkerID") + + assert.Equal(t, workMarker, archiveMarker) + + stored, err := local.GetSyncState(pushMarkerIDStateKey) + require.NoError(t, err, "GetSyncState") + assert.Equal(t, workMarker, stored) + + for _, key := range []string{ + pushMarkerIDStateKey + ":work", + pushMarkerIDStateKey + ":archive", + } { + value, err := local.GetSyncState(key) + require.NoError(t, err, "GetSyncState %s", key) + assert.Empty(t, value) + } +} + func TestReadPushBoundaryStateValidity(t *testing.T) { const cutoff = "2026-03-11T12:34:56.123Z" diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 9f6241aa3..86304d11d 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -11,6 +11,115 @@ import ( "go.kenn.io/agentsview/internal/db" ) +type syncStateStore = SyncStateStore + +type scopedSyncStateStore struct { + base syncStateStore + scope string + migrateLegacy bool + migrateOnce sync.Once + migrateErr error +} + +func newScopedSyncStateStore( + base syncStateStore, + scope string, + migrateLegacy bool, +) *scopedSyncStateStore { + return &scopedSyncStateStore{ + base: base, + scope: scope, + migrateLegacy: migrateLegacy, + } +} + +func (s *scopedSyncStateStore) scopedKey(key string) string { + if s.scope == "" { + return key + } + return key + ":" + s.scope +} + +func (s *scopedSyncStateStore) ensureMigration() error { + if s.scope == "" || !s.migrateLegacy { + return nil + } + s.migrateOnce.Do(func() { + for _, key := range []string{ + "last_push_at", + lastPushBoundaryStateKey, + lastPushTargetFingerprintKey, + } { + scopedKey := s.scopedKey(key) + scopedValue, err := s.base.GetSyncState(scopedKey) + if err != nil { + s.migrateErr = fmt.Errorf( + "reading %s during PG sync-state migration: %w", + scopedKey, err, + ) + return + } + legacyValue, err := s.base.GetSyncState(key) + if err != nil { + s.migrateErr = fmt.Errorf( + "reading legacy %s during PG sync-state migration: %w", + key, err, + ) + return + } + if legacyValue == "" { + continue + } + if scopedValue == "" { + if err := s.base.SetSyncState( + scopedKey, legacyValue, + ); err != nil { + s.migrateErr = fmt.Errorf( + "writing %s during PG sync-state migration: %w", + scopedKey, err, + ) + return + } + } + if err := s.base.SetSyncState(key, ""); err != nil { + s.migrateErr = fmt.Errorf( + "clearing legacy %s during PG sync-state migration: %w", + key, err, + ) + return + } + } + }) + return s.migrateErr +} + +func (s *scopedSyncStateStore) GetSyncState(key string) (string, error) { + if err := s.ensureMigration(); err != nil { + return "", err + } + return s.base.GetSyncState(s.scopedKey(key)) +} + +func (s *scopedSyncStateStore) SetSyncState( + key, value string, +) error { + if err := s.ensureMigration(); err != nil { + return err + } + return s.base.SetSyncState(s.scopedKey(key), value) +} + +func (s *scopedSyncStateStore) GetOrCreateSyncState( + key, defaultValue string, +) (string, error) { + if err := s.ensureMigration(); err != nil { + return "", err + } + return s.base.GetOrCreateSyncState( + s.scopedKey(key), defaultValue, + ) +} + // isUndefinedTable returns true when the error indicates the // queried relation does not exist (PG SQLSTATE 42P01). We match // only the SQLSTATE code to avoid false positives from other @@ -34,11 +143,14 @@ func isUndefinedColumn(err error) bool { // Sync manages push-only sync from local SQLite to a remote // PostgreSQL database. type Sync struct { - pg *sql.DB - local *db.DB - machine string - schema string - targetFingerprint string + pg *sql.DB + local *db.DB + syncState syncStateStore + machine string + schema string + targetFingerprint string + syncStateTarget string + migrateLegacySyncState bool // Project filtering for push scope. projects []string @@ -51,6 +163,13 @@ type Sync struct { schemaDone bool } +func (s *Sync) effectiveSyncState() syncStateStore { + if s.syncState != nil { + return s.syncState + } + return s.local +} + // SyncOptions holds optional configuration for a Sync instance. type SyncOptions struct { // Projects limits push scope to these project names. @@ -59,6 +178,11 @@ type SyncOptions struct { // ExcludeProjects excludes these project names from push. // Mutually exclusive with Projects. ExcludeProjects []string + // SyncStateTarget scopes per-target push watermarks and fingerprints. + SyncStateTarget string + // MigrateLegacySyncState moves unsuffixed legacy sync-state keys into the + // named default target the first time that target runs. + MigrateLegacySyncState bool } // New creates a Sync instance and verifies the PG connection. @@ -103,13 +227,20 @@ func New( } return &Sync{ - pg: pg, - local: local, - machine: machine, - schema: schema, - targetFingerprint: targetFingerprint, - projects: opts.Projects, - excludeProjects: opts.ExcludeProjects, + pg: pg, + local: local, + syncState: newScopedSyncStateStore( + local, + opts.SyncStateTarget, + opts.MigrateLegacySyncState, + ), + machine: machine, + schema: schema, + targetFingerprint: targetFingerprint, + syncStateTarget: opts.SyncStateTarget, + migrateLegacySyncState: opts.MigrateLegacySyncState, + projects: opts.Projects, + excludeProjects: opts.ExcludeProjects, }, nil } @@ -157,7 +288,9 @@ func (s *Sync) EnsureSchema(ctx context.Context) error { func (s *Sync) Status( ctx context.Context, ) (SyncStatus, error) { - lastPush, err := s.local.GetSyncState("last_push_at") + lastPush, err := ReadLastPushAt( + s.local, s.syncStateTarget, s.migrateLegacySyncState, + ) if err != nil { log.Printf( "warning: reading last_push_at: %v", err, @@ -165,14 +298,51 @@ func (s *Sync) Status( lastPush = "" } + return readStatus(ctx, s.pg, s.machine, lastPush) +} + +// ReadStatus reads PostgreSQL status without requiring a local SQLite sync +// handle. Callers pass any local last-push watermark they want displayed. +func ReadStatus( + ctx context.Context, + pgURL, schema, machine string, + allowInsecure bool, + lastPush string, +) (SyncStatus, error) { + if machine == "" { + return SyncStatus{}, fmt.Errorf( + "machine name must not be empty", + ) + } + if machine == "local" { + return SyncStatus{}, fmt.Errorf( + "machine name %q is reserved; "+ + "choose a different pg.machine_name", + machine, + ) + } + pg, err := Open(pgURL, schema, allowInsecure) + if err != nil { + return SyncStatus{}, err + } + defer pg.Close() + return readStatus(ctx, pg, machine, lastPush) +} + +func readStatus( + ctx context.Context, + pg *sql.DB, + machine string, + lastPush string, +) (SyncStatus, error) { var pgSessions int - err = s.pg.QueryRowContext(ctx, + err := pg.QueryRowContext(ctx, "SELECT COUNT(*) FROM sessions", ).Scan(&pgSessions) if err != nil { if isUndefinedTable(err) { return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, }, nil } @@ -182,13 +352,13 @@ func (s *Sync) Status( } var pgMessages int - err = s.pg.QueryRowContext(ctx, + err = pg.QueryRowContext(ctx, "SELECT COUNT(*) FROM messages", ).Scan(&pgMessages) if err != nil { if isUndefinedTable(err) { return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, PGSessions: pgSessions, }, nil @@ -199,13 +369,39 @@ func (s *Sync) Status( } return SyncStatus{ - Machine: s.machine, + Machine: machine, LastPushAt: lastPush, PGSessions: pgSessions, PGMessages: pgMessages, }, nil } +func ReadLastPushAt( + local SyncStateStore, + target string, + migrateLegacy bool, +) (string, error) { + if local == nil { + return "", fmt.Errorf("local sync state is required") + } + if target == "" { + return local.GetSyncState("last_push_at") + } + store := newScopedSyncStateStore( + local, + target, + false, + ) + lastPush, err := store.GetSyncState("last_push_at") + if err != nil { + return "", err + } + if lastPush != "" || !migrateLegacy { + return lastPush, nil + } + return local.GetSyncState("last_push_at") +} + // SyncStatus holds summary information about the sync state. type SyncStatus struct { Machine string `json:"machine"` diff --git a/internal/postgres/sync_test.go b/internal/postgres/sync_test.go index 54f6a0674..5f6ac34ca 100644 --- a/internal/postgres/sync_test.go +++ b/internal/postgres/sync_test.go @@ -16,14 +16,6 @@ import ( "go.kenn.io/agentsview/internal/db" ) -func testDB(t *testing.T) *db.DB { - t.Helper() - d, err := db.Open(t.TempDir() + "/test.db") - require.NoError(t, err, "opening test db") - t.Cleanup(func() { d.Close() }) - return d -} - func cleanPGSchema(t *testing.T, pgURL string) { t.Helper() pg, err := sql.Open("pgx", pgURL) @@ -72,6 +64,87 @@ func TestEnsureSchemaIdempotent(t *testing.T) { } } +func TestSyncEffectiveSyncStateFallsBackToLocalDB(t *testing.T) { + local := testDB(t) + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123456789Z", + )) + + sync := &Sync{local: local} + require.NoError(t, NormalizeLocalSyncStateTimestamps( + sync.effectiveSyncState(), + )) + + got, err := sync.effectiveSyncState().GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", got) +} + +func TestSyncScopedStateUsesTargetKeys(t *testing.T) { + pgURL := testPGURL(t) + cleanPGSchema(t, pgURL) + t.Cleanup(func() { cleanPGSchema(t, pgURL) }) + + local := testDB(t) + ps, err := New( + pgURL, "agentsview", local, + "test-machine", true, + SyncOptions{ + SyncStateTarget: "work", + MigrateLegacySyncState: true, + }, + ) + require.NoError(t, err, "creating sync") + defer ps.Close() + + ctx := context.Background() + require.NoError(t, ps.EnsureSchema(ctx), "ensure schema") + + started := "2026-03-11T12:00:00Z" + require.NoError(t, local.UpsertSession(db.Session{ + ID: "sess-scoped-001", + Project: "test-project", + Machine: "local", + Agent: "claude", + StartedAt: &started, + MessageCount: 1, + }), "upsert session") + require.NoError(t, local.InsertMessages([]db.Message{{ + SessionID: "sess-scoped-001", + Ordinal: 0, + Role: "user", + Content: "hello", + }}), "insert message") + + _, err = ps.Push(ctx, false, nil) + require.NoError(t, err, "push") + + scopedLastPush, err := local.GetSyncState("last_push_at:work") + require.NoError(t, err) + assert.NotEmpty(t, scopedLastPush) + + legacyLastPush, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Empty(t, legacyLastPush) + + scopedBoundary, err := local.GetSyncState( + lastPushBoundaryStateKey + ":work", + ) + require.NoError(t, err) + assert.NotEmpty(t, scopedBoundary) + + scopedFingerprint, err := local.GetSyncState( + lastPushTargetFingerprintKey + ":work", + ) + require.NoError(t, err) + assert.NotEmpty(t, scopedFingerprint) + + status, err := ps.Status(ctx) + require.NoError(t, err, "status") + assert.Equal(t, scopedLastPush, status.LastPushAt) +} + func TestEnsureSchemaMigratesLegacySchema(t *testing.T) { pgURL := testPGURL(t) cleanPGSchema(t, pgURL) diff --git a/internal/postgres/sync_unit_test.go b/internal/postgres/sync_unit_test.go index ef21d7454..6f65408ef 100644 --- a/internal/postgres/sync_unit_test.go +++ b/internal/postgres/sync_unit_test.go @@ -5,8 +5,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.kenn.io/agentsview/internal/db" ) +func testDB(t *testing.T) *db.DB { + t.Helper() + d, err := db.Open(t.TempDir() + "/test.db") + require.NoError(t, err, "opening test db") + t.Cleanup(func() { d.Close() }) + return d +} + func TestIsUndefinedTable(t *testing.T) { tests := []struct { name string @@ -46,3 +57,87 @@ func TestIsUndefinedTable(t *testing.T) { }) } } + +func TestScopedSyncStateStoreMigratesLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + require.NoError(t, local.SetSyncState( + lastPushBoundaryStateKey, + `{"cutoff":"2026-03-11T12:34:56.123Z"}`, + )) + require.NoError(t, local.SetSyncState( + lastPushTargetFingerprintKey, + "fingerprint-a", + )) + require.NoError(t, local.SetSyncState( + pushMarkerIDStateKey, + "marker-a", + )) + + store := newScopedSyncStateStore(local, "work", true) + + lastPush, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", lastPush) + + for _, key := range []string{ + "last_push_at", + lastPushBoundaryStateKey, + lastPushTargetFingerprintKey, + } { + legacyValue, err := local.GetSyncState(key) + require.NoError(t, err) + assert.Empty(t, legacyValue) + + scopedValue, err := local.GetSyncState(key + ":work") + require.NoError(t, err) + assert.NotEmpty(t, scopedValue) + } + + legacyMarker, err := local.GetSyncState(pushMarkerIDStateKey) + require.NoError(t, err) + assert.Equal(t, "marker-a", legacyMarker) + + scopedMarker, err := local.GetSyncState( + pushMarkerIDStateKey + ":work", + ) + require.NoError(t, err) + assert.Empty(t, scopedMarker) +} + +func TestScopedSyncStateStoreNonDefaultTargetDoesNotMigrateLegacyState(t *testing.T) { + local := testDB(t) + + require.NoError(t, local.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + store := newScopedSyncStateStore(local, "archive", false) + + got, err := store.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Empty(t, got) + + legacyValue, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", legacyValue) +} + +func TestScopedSyncStateStoreLegacyModeUsesUnscopedKeys(t *testing.T) { + local := testDB(t) + store := newScopedSyncStateStore(local, "", false) + + require.NoError(t, store.SetSyncState( + "last_push_at", + "2026-03-11T12:34:56.123Z", + )) + + got, err := local.GetSyncState("last_push_at") + require.NoError(t, err) + assert.Equal(t, "2026-03-11T12:34:56.123Z", got) +} diff --git a/internal/postgres/time.go b/internal/postgres/time.go index 19b16a548..a40e2e869 100644 --- a/internal/postgres/time.go +++ b/internal/postgres/time.go @@ -99,6 +99,7 @@ func PreviousLocalSyncTimestamp( type SyncStateStore interface { GetSyncState(key string) (string, error) SetSyncState(key, value string) error + GetOrCreateSyncState(key, defaultValue string) (string, error) } // NormalizeLocalSyncStateTimestamps normalizes the last_push_at diff --git a/internal/server/huma_routes_push.go b/internal/server/huma_routes_push.go index 42c7cf3f0..fc5fb8b61 100644 --- a/internal/server/huma_routes_push.go +++ b/internal/server/huma_routes_push.go @@ -28,11 +28,13 @@ type daemonPushInput struct { } type daemonPushRequest struct { - Full bool `json:"full"` - Projects []string `json:"projects,omitempty"` - ExcludeProjects []string `json:"exclude_projects,omitempty"` - PG *config.PGConfig `json:"pg,omitempty"` - DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + Full bool `json:"full"` + Projects []string `json:"projects,omitempty"` + ExcludeProjects []string `json:"exclude_projects,omitempty"` + PG *config.PGConfig `json:"pg,omitempty"` + DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"` + SyncStateTarget string `json:"sync_state_target,omitempty"` + MigrateLegacySyncState bool `json:"migrate_legacy_sync_state,omitempty"` } type pgPushOutput struct { @@ -94,8 +96,10 @@ func (s *Server) humaPGPush( pgCfg.URL, pgCfg.Schema, local, pgCfg.MachineName, pgCfg.AllowInsecure, postgres.SyncOptions{ - Projects: in.Body.Projects, - ExcludeProjects: in.Body.ExcludeProjects, + Projects: in.Body.Projects, + ExcludeProjects: in.Body.ExcludeProjects, + SyncStateTarget: in.Body.SyncStateTarget, + MigrateLegacySyncState: in.Body.MigrateLegacySyncState, }, ) if err != nil {