-
Notifications
You must be signed in to change notification settings - Fork 6
feat: add notifier delivery runtime #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
217f6b1
d4622fe
041c8c8
d39e8e0
f0c57ac
5c07e81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,8 @@ import ( | |
| "path/filepath" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/aoagents/agent-orchestrator/backend/internal/ports" | ||
| ) | ||
|
|
||
| const ( | ||
|
|
@@ -50,6 +52,46 @@ type Config struct { | |
| // DataDir is the directory holding durable state (the SQLite database and | ||
| // the CDC JSONL log). It is created on first use by the storage layer. | ||
| DataDir string | ||
| // Notifications controls the central notifier runtime. The dashboard is the | ||
| // durable notifications table itself; desktop delivery is handed off to the | ||
| // AO Electron app via notification_deliveries rows. | ||
| Notifications NotificationConfig | ||
| } | ||
|
|
||
| // NotificationConfig contains the global notification settings used by the | ||
| // central notifier runtime. It intentionally starts global (not per-project) so | ||
| // the routing model can grow without changing lifecycle reactions. | ||
| type NotificationConfig struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all our configs need to be stored in sqlite now. Can we add a new |
||
| Enabled bool | ||
| Dashboard DashboardNotificationConfig | ||
| Desktop DesktopNotificationConfig | ||
| Routing NotificationRoutingConfig | ||
| Retry NotificationRetryConfig | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's not add any retries before notifications are completely operational |
||
| } | ||
|
|
||
| type DashboardNotificationConfig struct { | ||
| Enabled bool | ||
| Limit int | ||
| } | ||
|
|
||
| type DesktopNotificationConfig struct { | ||
| Enabled bool | ||
| Priorities []ports.Priority | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's remove priorities altogether for now |
||
| SoundPriorities []ports.Priority | ||
| } | ||
|
|
||
| type NotificationRoutingConfig struct { | ||
| // Priorities maps notification priority to built-in route names. The | ||
| // notifier currently implements dashboard and desktop only. | ||
| Priorities map[ports.Priority][]string | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why don't we have a priority level instead of the config having a list of priorities Is there any case when a user will not want high priority notifs but want low priority notifs |
||
| } | ||
|
|
||
| type NotificationRetryConfig struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove all logic related to retries |
||
| MaxAttempts int | ||
| BaseDelay time.Duration | ||
| MaxDelay time.Duration | ||
| LeaseTTL time.Duration | ||
| BatchSize int | ||
| } | ||
|
|
||
| // Addr returns the host:port the HTTP server binds. It uses net.JoinHostPort so | ||
|
|
@@ -77,6 +119,7 @@ func Load() (Config, error) { | |
| Port: DefaultPort, | ||
| RequestTimeout: DefaultRequestTimeout, | ||
| ShutdownTimeout: DefaultShutdownTimeout, | ||
| Notifications: DefaultNotificationConfig(), | ||
| } | ||
|
|
||
| if raw := os.Getenv("AO_PORT"); raw != "" { | ||
|
|
@@ -121,6 +164,35 @@ func Load() (Config, error) { | |
| return cfg, nil | ||
| } | ||
|
|
||
| // DefaultNotificationConfig returns the safe zero-setup notification settings. | ||
| func DefaultNotificationConfig() NotificationConfig { | ||
| return NotificationConfig{ | ||
| Enabled: true, | ||
| Dashboard: DashboardNotificationConfig{ | ||
| Enabled: true, | ||
| Limit: 50, | ||
| }, | ||
| Desktop: DesktopNotificationConfig{ | ||
| Enabled: true, | ||
| Priorities: []ports.Priority{ports.PriorityUrgent, ports.PriorityAction}, | ||
| SoundPriorities: []ports.Priority{ports.PriorityUrgent}, | ||
| }, | ||
| Routing: NotificationRoutingConfig{Priorities: map[ports.Priority][]string{ | ||
| ports.PriorityUrgent: []string{"dashboard", "desktop"}, | ||
| ports.PriorityAction: []string{"dashboard", "desktop"}, | ||
| ports.PriorityWarning: []string{"dashboard"}, | ||
| ports.PriorityInfo: []string{"dashboard"}, | ||
| }}, | ||
| Retry: NotificationRetryConfig{ | ||
| MaxAttempts: 5, | ||
| BaseDelay: time.Second, | ||
| MaxDelay: 5 * time.Minute, | ||
| LeaseTTL: 30 * time.Second, | ||
| BatchSize: 50, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| // parsePositiveDuration rejects zero and negative durations: a zero | ||
| // RequestTimeout would expire every request instantly, and a non-positive | ||
| // ShutdownTimeout would defeat graceful shutdown. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package daemon | ||
|
|
||
| import ( | ||
| "context" | ||
| "log/slog" | ||
|
|
||
| "github.com/aoagents/agent-orchestrator/backend/internal/config" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/notification" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" | ||
| ) | ||
|
|
||
| type notifierStack struct { | ||
| Manager *notification.Manager | ||
| done <-chan struct{} | ||
| } | ||
|
|
||
| func startNotifier(ctx context.Context, cfg config.Config, store *sqlite.Store, log *slog.Logger) *notifierStack { | ||
| mgr := notification.NewManager(store, notification.SettingsFromConfig(cfg), log) | ||
| done := mgr.Start(ctx) | ||
| return ¬ifierStack{Manager: mgr, done: done} | ||
| } | ||
|
|
||
| func (s *notifierStack) Stop() { | ||
| if s == nil || s.done == nil { | ||
| return | ||
| } | ||
| <-s.done | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package integration | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "io" | ||
| "log/slog" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/aoagents/agent-orchestrator/backend/internal/config" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/domain" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/notification" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" | ||
| ) | ||
|
|
||
| func TestNotificationRuntimeRoutesDesktopEligiblePriorities(t *testing.T) { | ||
| t.Parallel() | ||
| ctx := context.Background() | ||
| store, err := sqlite.Open(t.TempDir()) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer store.Close() | ||
| seedProject(t, store, "mer") | ||
| rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-55", "feat/notifier")) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| urgent := enqueueRuntimeNotification(t, store, rec, "urgent", "urgent") | ||
| action := enqueueRuntimeNotification(t, store, rec, "action", "action") | ||
| info := enqueueRuntimeNotification(t, store, rec, "info", "info") | ||
|
|
||
| mgr := notification.NewManager(store, notification.StaticSettings(config.DefaultNotificationConfig()), slog.New(slog.NewTextHandler(io.Discard, nil))) | ||
| routed, err := mgr.RoutePending(ctx, 50) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if routed != 3 { | ||
| t.Fatalf("routed = %d, want 3", routed) | ||
| } | ||
|
|
||
| for _, ntf := range []domain.Notification{urgent, action} { | ||
| rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(ntf.ID), Limit: 10}) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if len(rows) != 1 || rows[0].Sink != notification.SinkAOApp || rows[0].RouteName != notification.RouteDesktop { | ||
| t.Fatalf("%s should have one AO-app desktop delivery, got %+v", ntf.Priority, rows) | ||
| } | ||
| } | ||
| rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(info.ID), Limit: 10}) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if len(rows) != 0 { | ||
| t.Fatalf("info should remain dashboard/read-model only, got deliveries %+v", rows) | ||
| } | ||
| } | ||
|
|
||
| func enqueueRuntimeNotification(t *testing.T, store *sqlite.Store, rec domain.SessionRecord, priority, dedupe string) domain.Notification { | ||
| t.Helper() | ||
| now := time.Now().UTC().Truncate(time.Second) | ||
| row, _, err := store.EnqueueNotification(context.Background(), domain.Notification{ | ||
| ProjectID: rec.ProjectID, | ||
| SessionID: rec.ID, | ||
| Source: "lifecycle", | ||
| EventType: "reaction.test", | ||
| SemanticType: "test." + priority, | ||
| Priority: priority, | ||
| Message: "test " + priority, | ||
| Payload: json.RawMessage(`{}`), | ||
| DedupeKey: "runtime-" + dedupe, | ||
| CreatedAt: now, | ||
| UpdatedAt: now, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("enqueue notification: %v", err) | ||
| } | ||
| return row | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these sound like notification events. Can we decouple them from CDC?
For notifications we should simply create new entries in the notifications table.