Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions backend/internal/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
type EventType string

const (
EventSessionCreated EventType = "session_created"
EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
EventSessionCreated EventType = "session_created"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these sound like notification events. Can we decouple them from CDC?
For notifications we should simply create new entries in the notifications table.

EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
EventNotificationDeliveryCreated EventType = "notification_delivery_created"
EventNotificationDeliveryUpdated EventType = "notification_delivery_updated"
)

// Event is one CDC change read from change_log. Seq is the monotonic ordering +
Expand Down
72 changes: 72 additions & 0 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"path/filepath"
"strconv"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

const (
Expand Down Expand Up @@ -50,6 +52,46 @@ type Config struct {
// DataDir is the directory holding durable state (the SQLite database and
// the CDC JSONL log). It is created on first use by the storage layer.
DataDir string
// Notifications controls the central notifier runtime. The dashboard is the
// durable notifications table itself; desktop delivery is handed off to the
// AO Electron app via notification_deliveries rows.
Notifications NotificationConfig
}

// NotificationConfig contains the global notification settings used by the
// central notifier runtime. It intentionally starts global (not per-project) so
// the routing model can grow without changing lifecycle reactions.
type NotificationConfig struct {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all our configs need to be stored in sqlite now.

Can we add a new settings table in sqlite instead and have a single boolean called enable_notifications

Enabled bool
Dashboard DashboardNotificationConfig
Desktop DesktopNotificationConfig
Routing NotificationRoutingConfig
Retry NotificationRetryConfig

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not add any retries before notifications are completely operational

}

type DashboardNotificationConfig struct {
Enabled bool
Limit int
}

type DesktopNotificationConfig struct {
Enabled bool
Priorities []ports.Priority

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove priorities altogether for now

SoundPriorities []ports.Priority
}

type NotificationRoutingConfig struct {
// Priorities maps notification priority to built-in route names. The
// notifier currently implements dashboard and desktop only.
Priorities map[ports.Priority][]string

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we have a priority level instead of the config having a list of priorities

Is there any case when a user will not want high priority notifs but want low priority notifs

}

type NotificationRetryConfig struct {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove all logic related to retries

MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
LeaseTTL time.Duration
BatchSize int
}

// Addr returns the host:port the HTTP server binds. It uses net.JoinHostPort so
Expand Down Expand Up @@ -77,6 +119,7 @@ func Load() (Config, error) {
Port: DefaultPort,
RequestTimeout: DefaultRequestTimeout,
ShutdownTimeout: DefaultShutdownTimeout,
Notifications: DefaultNotificationConfig(),
}

if raw := os.Getenv("AO_PORT"); raw != "" {
Expand Down Expand Up @@ -121,6 +164,35 @@ func Load() (Config, error) {
return cfg, nil
}

// DefaultNotificationConfig returns the safe zero-setup notification settings.
func DefaultNotificationConfig() NotificationConfig {
return NotificationConfig{
Enabled: true,
Dashboard: DashboardNotificationConfig{
Enabled: true,
Limit: 50,
},
Desktop: DesktopNotificationConfig{
Enabled: true,
Priorities: []ports.Priority{ports.PriorityUrgent, ports.PriorityAction},
SoundPriorities: []ports.Priority{ports.PriorityUrgent},
},
Routing: NotificationRoutingConfig{Priorities: map[ports.Priority][]string{
ports.PriorityUrgent: []string{"dashboard", "desktop"},
ports.PriorityAction: []string{"dashboard", "desktop"},
ports.PriorityWarning: []string{"dashboard"},
ports.PriorityInfo: []string{"dashboard"},
}},
Retry: NotificationRetryConfig{
MaxAttempts: 5,
BaseDelay: time.Second,
MaxDelay: 5 * time.Minute,
LeaseTTL: 30 * time.Second,
BatchSize: 50,
},
}
}

// parsePositiveDuration rejects zero and negative durations: a zero
// RequestTimeout would expire every request instantly, and a non-positive
// ShutdownTimeout would defeat graceful shutdown.
Expand Down
14 changes: 14 additions & 0 deletions backend/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

func TestLoadDefaults(t *testing.T) {
Expand Down Expand Up @@ -31,6 +33,18 @@ func TestLoadDefaults(t *testing.T) {
if cfg.RunFilePath == "" {
t.Error("RunFilePath is empty, want a resolved default path")
}
if !cfg.Notifications.Enabled || !cfg.Notifications.Dashboard.Enabled || !cfg.Notifications.Desktop.Enabled {
t.Fatalf("notification defaults should be enabled: %+v", cfg.Notifications)
}
if cfg.Notifications.Dashboard.Limit != 50 {
t.Fatalf("dashboard limit = %d, want 50", cfg.Notifications.Dashboard.Limit)
}
if got := cfg.Notifications.Routing.Priorities[ports.PriorityUrgent]; len(got) != 2 || got[0] != "dashboard" || got[1] != "desktop" {
t.Fatalf("urgent routes = %v, want dashboard+desktop", got)
}
if cfg.Notifications.Retry.MaxAttempts != 5 || cfg.Notifications.Retry.LeaseTTL != 30*time.Second {
t.Fatalf("retry defaults = %+v", cfg.Notifications.Retry)
}
}

func TestLoadOverrides(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func Run() error {
return err
}

notifier := startNotifier(ctx, cfg, store, log)

// Terminal streaming: the tmux runtime supplies the PTY-attach command and
// liveness; the CDC broadcaster feeds the session-state channel. The manager
// is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow
Expand All @@ -71,6 +73,11 @@ func Run() error {

srv, err := httpd.New(cfg, log, termMgr)
if err != nil {
stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
return err
}

Expand All @@ -79,6 +86,11 @@ func Run() error {
// trigger -> change_log -> poller -> broadcaster.
lcStack, err := startLifecycle(ctx, store, log)
if err != nil {
stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
return err
}

Expand All @@ -98,6 +110,7 @@ func Run() error {
// the LIFO trap (see comment after srv.Run), hence explicit.
stop()
lcStack.Stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
Expand All @@ -113,6 +126,7 @@ func Run() error {
// runs before the cancel — which would hang any non-signal exit path.
stop()
lcStack.Stop()
notifier.Stop()
if err := cdcPipe.Stop(); err != nil {
log.Error("cdc pipeline shutdown", "err", err)
}
Expand Down
28 changes: 28 additions & 0 deletions backend/internal/daemon/notifier_wiring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package daemon

import (
"context"
"log/slog"

"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/notification"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

type notifierStack struct {
Manager *notification.Manager
done <-chan struct{}
}

func startNotifier(ctx context.Context, cfg config.Config, store *sqlite.Store, log *slog.Logger) *notifierStack {
mgr := notification.NewManager(store, notification.SettingsFromConfig(cfg), log)
done := mgr.Start(ctx)
return &notifierStack{Manager: mgr, done: done}
}

func (s *notifierStack) Stop() {
if s == nil || s.done == nil {
return
}
<-s.done
}
1 change: 1 addition & 0 deletions backend/internal/domain/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Notification struct {
CauseKey string
ReadAt time.Time
ArchivedAt time.Time
RoutedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
Expand Down
82 changes: 82 additions & 0 deletions backend/internal/integration/notification_runtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package integration

import (
"context"
"encoding/json"
"io"
"log/slog"
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/notification"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

func TestNotificationRuntimeRoutesDesktopEligiblePriorities(t *testing.T) {
t.Parallel()
ctx := context.Background()
store, err := sqlite.Open(t.TempDir())
if err != nil {
t.Fatal(err)
}
defer store.Close()
seedProject(t, store, "mer")
rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-55", "feat/notifier"))
if err != nil {
t.Fatal(err)
}

urgent := enqueueRuntimeNotification(t, store, rec, "urgent", "urgent")
action := enqueueRuntimeNotification(t, store, rec, "action", "action")
info := enqueueRuntimeNotification(t, store, rec, "info", "info")

mgr := notification.NewManager(store, notification.StaticSettings(config.DefaultNotificationConfig()), slog.New(slog.NewTextHandler(io.Discard, nil)))
routed, err := mgr.RoutePending(ctx, 50)
if err != nil {
t.Fatal(err)
}
if routed != 3 {
t.Fatalf("routed = %d, want 3", routed)
}

for _, ntf := range []domain.Notification{urgent, action} {
rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(ntf.ID), Limit: 10})
if err != nil {
t.Fatal(err)
}
if len(rows) != 1 || rows[0].Sink != notification.SinkAOApp || rows[0].RouteName != notification.RouteDesktop {
t.Fatalf("%s should have one AO-app desktop delivery, got %+v", ntf.Priority, rows)
}
}
rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(info.ID), Limit: 10})
if err != nil {
t.Fatal(err)
}
if len(rows) != 0 {
t.Fatalf("info should remain dashboard/read-model only, got deliveries %+v", rows)
}
}

func enqueueRuntimeNotification(t *testing.T, store *sqlite.Store, rec domain.SessionRecord, priority, dedupe string) domain.Notification {
t.Helper()
now := time.Now().UTC().Truncate(time.Second)
row, _, err := store.EnqueueNotification(context.Background(), domain.Notification{
ProjectID: rec.ProjectID,
SessionID: rec.ID,
Source: "lifecycle",
EventType: "reaction.test",
SemanticType: "test." + priority,
Priority: priority,
Message: "test " + priority,
Payload: json.RawMessage(`{}`),
DedupeKey: "runtime-" + dedupe,
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
t.Fatalf("enqueue notification: %v", err)
}
return row
}
Loading
Loading