Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,40 @@ or read them, and treats sidecars as untrusted structured input -- see
Push session data to a shared PostgreSQL instance for team dashboards:

```bash
agentsview pg push # push local data to PG
agentsview pg serve # serve web UI from PG (read-only)
agentsview pg push # push local data to the default PG target
agentsview pg push archive # push to one named PG target
agentsview pg push --all # push every configured PG target sequentially
agentsview pg status # show status for the default PG target
agentsview pg status archive # show status for one named PG target
agentsview pg status --all # show status for every configured PG target
agentsview pg serve # serve web UI from the default PG target (read-only)
```

Single-target configs still use the legacy `[pg]` block. To manage more than one
PostgreSQL destination, define named `[pg.NAME]` blocks and set `default_pg`
when more than one target exists:

```toml
default_pg = "work"

[pg.work]
url = "postgres://user:pass@work-db/agentsview"
machine_name = "laptop"

[pg.archive]
url = "postgres://user:pass@archive-db/agentsview"
machine_name = "laptop-archive"
exclude_projects = ["scratch"]
```

Named target names are normalized case-insensitively. `all`, `local`, and the
legacy `[pg]` field names `url`, `schema`, `machine_name`, `allow_insecure`,
`projects`, and `exclude_projects` cannot be used for `[pg.NAME]`.

`AGENTSVIEW_PG_URL`, `AGENTSVIEW_PG_SCHEMA`, and `AGENTSVIEW_PG_MACHINE` still
work, but in named-target mode they apply only to the effective default target.
They do not rewrite every named `[pg.NAME]` entry.

### Automatic push (background service)

To keep a shared PostgreSQL database current without running `pg push` by hand,
Expand All @@ -396,10 +426,15 @@ after new sessions are recorded, with a periodic floor as a safety net:

```bash
agentsview pg push --watch # foreground, Ctrl-C to stop
agentsview pg push archive --watch # watch one named PG target
agentsview pg push --watch --debounce 1m # custom coalesce window
agentsview pg push --watch --interval 5m # custom floor interval
```

`--watch` follows the default PG target unless you pass one target name.
`--all --watch` is rejected; multi-target background watch remains out of scope
for now.

The daemon reads the same `[pg]` config as `pg push`, so the PostgreSQL DSN must
be set in your config file (or an environment variable it expands). Protect the
config file, since it holds credentials:
Expand All @@ -418,6 +453,10 @@ agentsview pg service logs -f # follow the service log
agentsview pg service uninstall # stop and remove
```

`pg serve` and `pg service` always use the effective default PG target. In
named-target mode, set `default_pg` to choose which target those long-running
commands use.

**Linux headless machines:** systemd `--user` services stop at logout and do not
start at boot unless lingering is enabled for your user. `install` detects this
and prints the command; you can also run it yourself:
Expand Down
44 changes: 20 additions & 24 deletions cmd/agentsview/archive_write_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type archiveWriteBackend interface {
PGPush(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
excludeProjects []string,
Expand All @@ -33,7 +33,7 @@ type archiveWriteBackend interface {
) (duckdbsync.PushResult, error)
PGPushWatch(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
excludeProjects []string,
Expand Down Expand Up @@ -87,18 +87,20 @@ type daemonArchiveWriteBackend struct {

func (b daemonArchiveWriteBackend) PGPush(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
excludeProjects []string,
) (postgres.PushResult, error) {
return postDaemonPush[postgres.PushResult](
ctx, b.tr, b.appCfg.AuthToken, "/api/v1/push/pg",
daemonPushRequest{
Full: cfg.Full,
Projects: projects,
ExcludeProjects: excludeProjects,
PG: &pgCfg,
Full: cfg.Full,
Projects: projects,
ExcludeProjects: excludeProjects,
PG: &target.PG,
SyncStateTarget: target.SyncStateTarget,
MigrateLegacySyncState: target.MigrateLegacySyncState,
},
)
}
Expand Down Expand Up @@ -141,7 +143,7 @@ func absolutizeDuckDBPath(

func (b daemonArchiveWriteBackend) PGPushWatch(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
exclude []string,
Expand Down Expand Up @@ -170,7 +172,7 @@ func (b daemonArchiveWriteBackend) PGPushWatch(
}
defer cleanup()
res, err := backend.PGPush(
pctx, pgCfg, pushCfg, projects, exclude,
pctx, target, pushCfg, projects, exclude,
)
if err != nil {
return err
Expand Down Expand Up @@ -213,7 +215,7 @@ type localArchiveWriteBackend struct {

func (b *localArchiveWriteBackend) PGPush(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
excludeProjects []string,
Expand All @@ -228,12 +230,9 @@ func (b *localArchiveWriteBackend) PGPush(
connectStart := time.Now()
applyClassifierConfig(b.appCfg)
ps, err := postgres.New(
pgCfg.URL, pgCfg.Schema, b.database,
pgCfg.MachineName, pgCfg.AllowInsecure,
postgres.SyncOptions{
Projects: projects,
ExcludeProjects: excludeProjects,
},
target.PG.URL, target.PG.Schema, b.database,
target.PG.MachineName, target.PG.AllowInsecure,
target.syncOptions(projects, excludeProjects),
)
if err != nil {
return postgres.PushResult{}, err
Expand Down Expand Up @@ -324,7 +323,7 @@ func (b *localArchiveWriteBackend) DuckDBPush(

func (b *localArchiveWriteBackend) PGPushWatch(
ctx context.Context,
pgCfg config.PGConfig,
target pgTargetSelection,
cfg PGPushConfig,
projects []string,
exclude []string,
Expand Down Expand Up @@ -367,12 +366,9 @@ func (b *localArchiveWriteBackend) PGPushWatch(
connect: func() (pgTarget, error) {
applyClassifierConfig(b.appCfg)
s, cErr := postgres.New(
pgCfg.URL, pgCfg.Schema, b.database,
pgCfg.MachineName, pgCfg.AllowInsecure,
postgres.SyncOptions{
Projects: projects,
ExcludeProjects: exclude,
},
target.PG.URL, target.PG.Schema, b.database,
target.PG.MachineName, target.PG.AllowInsecure,
target.syncOptions(projects, exclude),
)
if cErr != nil {
return nil, cErr
Expand All @@ -385,7 +381,7 @@ func (b *localArchiveWriteBackend) PGPushWatch(
fmt.Printf(
"agentsview pg watch: pushing to PostgreSQL as %q "+
"(debounce %s, floor %s)\n",
pgCfg.MachineName, debounce, interval,
target.PG.MachineName, debounce, interval,
)

if err := pusher.push(ctx, reasonStartup, didResync); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/agentsview/archive_write_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestLocalArchiveWriteBackendPGPushStopsAfterCanceledLocalSync(t *testing.T)
testLocalArchivePushStopsAfterCanceledSync(t,
func(backend *localArchiveWriteBackend, ctx context.Context) error {
_, err := backend.PGPush(
ctx, config.PGConfig{}, PGPushConfig{}, nil, nil,
ctx, pgTargetSelection{}, PGPushConfig{}, nil, nil,
)
return err
})
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestLocalArchiveWriteBackendPGPushWatchCanceledStartupIsClean(t *testing.T)

err := backend.PGPushWatch(
canceledContext(),
config.PGConfig{},
pgTargetSelection{},
PGPushConfig{},
nil,
nil,
Expand Down
2 changes: 1 addition & 1 deletion cmd/agentsview/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func runClassifierRebuild(
}
if pgCfg.URL == "" {
return errors.New(
"pg url not configured; set AGENTSVIEW_PG_URL or [pg].url",
"pg url not configured; set AGENTSVIEW_PG_URL, use a legacy [pg].url, or configure default_pg with named [pg.NAME] targets",
)
}
if err := clearPGClassifierHash(ctx, cfg, pgCfg); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/agentsview/classifier_wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var triggerCalls = map[string]struct{}{

const wiringHelper = "applyClassifierConfig"

var inheritedWiringFuncs = map[string]struct{}{
"runPGPushTarget": {},
"runPGStatusTarget": {},
}

// TestEveryStoreOpenPathIsWired enforces the rule documented
// in the design spec: every code path in cmd/agentsview that
// opens or initializes a store must first call
Expand Down Expand Up @@ -83,6 +88,9 @@ func scanFile(
if fn.Body == nil {
return true
}
if _, ok := inheritedWiringFuncs[fn.Name.Name]; ok {
return true
}
if v := checkBody(
fset, fn.Body, funcLabel(fset, fn),
); v != "" {
Expand Down
50 changes: 39 additions & 11 deletions cmd/agentsview/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,22 +531,40 @@ func newPGCommand() *cobra.Command {
func newPGPushCommand() *cobra.Command {
var cfg PGPushConfig
cmd := &cobra.Command{
Use: "push",
Use: "push [target]",
Short: "Push local data to PostgreSQL",
SilenceUsage: true,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
targetName := ""
if len(args) == 1 {
targetName = args[0]
}
if cfg.AllTargets && cfg.Watch {
return fmt.Errorf(
"pg push --watch: %w",
fmt.Errorf(
"--all cannot be combined with --watch",
),
)
}
if cfg.Watch {
runPGPushWatch(cfg)
return
if err := runPGPushWatch(cfg, targetName); err != nil {
return fmt.Errorf("pg push --watch: %w", err)
}
return nil
}
if cmd.Flags().Changed("debounce") || cmd.Flags().Changed("interval") {
fmt.Fprintln(os.Stderr,
"warning: --debounce and --interval have no effect without --watch")
}
runPGPush(cfg)
if err := runPGPush(cfg, targetName); err != nil {
return fmt.Errorf("pg push: %w", err)
}
return nil
},
}
cmd.Flags().BoolVar(&cfg.AllTargets, "all", false, "Push every configured PG target sequentially")
cmd.Flags().BoolVar(&cfg.Full, "full", false, "Force full local resync and PG push")
cmd.Flags().StringVar(&cfg.ProjectsFlag, "projects", "", "Comma-separated list of projects to push (inclusive)")
cmd.Flags().StringVar(&cfg.ExcludeProjects, "exclude-projects", "", "Comma-separated list of projects to exclude from push")
Expand All @@ -558,15 +576,25 @@ func newPGPushCommand() *cobra.Command {
}

func newPGStatusCommand() *cobra.Command {
return &cobra.Command{
Use: "status",
var allTargets bool
cmd := &cobra.Command{
Use: "status [target]",
Short: "Show PG sync status",
SilenceUsage: true,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
runPGStatus()
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
targetName := ""
if len(args) == 1 {
targetName = args[0]
}
if err := runPGStatus(targetName, allTargets); err != nil {
return fmt.Errorf("pg status: %w", err)
}
return nil
},
}
cmd.Flags().BoolVar(&allTargets, "all", false, "Show status for every configured PG target")
return cmd
}

func newPGServeCommand() *cobra.Command {
Expand Down
20 changes: 15 additions & 5 deletions cmd/agentsview/daemon_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -13,11 +14,13 @@ import (
)

type daemonPushRequest struct {
Full bool `json:"full"`
Projects []string `json:"projects,omitempty"`
ExcludeProjects []string `json:"exclude_projects,omitempty"`
PG *config.PGConfig `json:"pg,omitempty"`
DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"`
Full bool `json:"full"`
Projects []string `json:"projects,omitempty"`
ExcludeProjects []string `json:"exclude_projects,omitempty"`
PG *config.PGConfig `json:"pg,omitempty"`
DuckDB *config.DuckDBConfig `json:"duckdb,omitempty"`
SyncStateTarget string `json:"sync_state_target,omitempty"`
MigrateLegacySyncState bool `json:"migrate_legacy_sync_state,omitempty"`
}

func postDaemonPush[T any](
Expand Down Expand Up @@ -51,6 +54,13 @@ func postDaemonPush[T any](
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
msg, _ := io.ReadAll(resp.Body)
var apiErr struct {
Error string `json:"error"`
}
if err := json.Unmarshal(msg, &apiErr); err == nil &&
apiErr.Error != "" {
return zero, errors.New(apiErr.Error)
}
return zero, fmt.Errorf(
"HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(msg)),
)
Expand Down
Loading