diff --git a/backend/internal/cdc/event.go b/backend/internal/cdc/event.go index 5d37f47e..35576cf6 100644 --- a/backend/internal/cdc/event.go +++ b/backend/internal/cdc/event.go @@ -18,13 +18,15 @@ import ( type EventType string const ( - EventSessionCreated EventType = "session_created" - EventSessionUpdated EventType = "session_updated" - EventPRCreated EventType = "pr_created" - EventPRUpdated EventType = "pr_updated" - EventPRCheckRecorded EventType = "pr_check_recorded" - EventNotificationCreated EventType = "notification_created" - EventNotificationUpdated EventType = "notification_updated" + EventSessionCreated EventType = "session_created" + EventSessionUpdated EventType = "session_updated" + EventPRCreated EventType = "pr_created" + EventPRUpdated EventType = "pr_updated" + EventPRCheckRecorded EventType = "pr_check_recorded" + EventNotificationCreated EventType = "notification_created" + EventNotificationUpdated EventType = "notification_updated" + EventNotificationDeliveryCreated EventType = "notification_delivery_created" + EventNotificationDeliveryUpdated EventType = "notification_delivery_updated" ) // Event is one CDC change read from change_log. Seq is the monotonic ordering + diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 719e7524..62eb41a2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -11,6 +11,8 @@ import ( "path/filepath" "strconv" "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) const ( @@ -50,6 +52,46 @@ type Config struct { // DataDir is the directory holding durable state (the SQLite database and // the CDC JSONL log). It is created on first use by the storage layer. DataDir string + // Notifications controls the central notifier runtime. The dashboard is the + // durable notifications table itself; desktop delivery is handed off to the + // AO Electron app via notification_deliveries rows. + Notifications NotificationConfig +} + +// NotificationConfig contains the global notification settings used by the +// central notifier runtime. It intentionally starts global (not per-project) so +// the routing model can grow without changing lifecycle reactions. +type NotificationConfig struct { + Enabled bool + Dashboard DashboardNotificationConfig + Desktop DesktopNotificationConfig + Routing NotificationRoutingConfig + Retry NotificationRetryConfig +} + +type DashboardNotificationConfig struct { + Enabled bool + Limit int +} + +type DesktopNotificationConfig struct { + Enabled bool + Priorities []ports.Priority + SoundPriorities []ports.Priority +} + +type NotificationRoutingConfig struct { + // Priorities maps notification priority to built-in route names. The + // notifier currently implements dashboard and desktop only. + Priorities map[ports.Priority][]string +} + +type NotificationRetryConfig struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration + LeaseTTL time.Duration + BatchSize int } // Addr returns the host:port the HTTP server binds. It uses net.JoinHostPort so @@ -77,6 +119,7 @@ func Load() (Config, error) { Port: DefaultPort, RequestTimeout: DefaultRequestTimeout, ShutdownTimeout: DefaultShutdownTimeout, + Notifications: DefaultNotificationConfig(), } if raw := os.Getenv("AO_PORT"); raw != "" { @@ -121,6 +164,35 @@ func Load() (Config, error) { return cfg, nil } +// DefaultNotificationConfig returns the safe zero-setup notification settings. +func DefaultNotificationConfig() NotificationConfig { + return NotificationConfig{ + Enabled: true, + Dashboard: DashboardNotificationConfig{ + Enabled: true, + Limit: 50, + }, + Desktop: DesktopNotificationConfig{ + Enabled: true, + Priorities: []ports.Priority{ports.PriorityUrgent, ports.PriorityAction}, + SoundPriorities: []ports.Priority{ports.PriorityUrgent}, + }, + Routing: NotificationRoutingConfig{Priorities: map[ports.Priority][]string{ + ports.PriorityUrgent: []string{"dashboard", "desktop"}, + ports.PriorityAction: []string{"dashboard", "desktop"}, + ports.PriorityWarning: []string{"dashboard"}, + ports.PriorityInfo: []string{"dashboard"}, + }}, + Retry: NotificationRetryConfig{ + MaxAttempts: 5, + BaseDelay: time.Second, + MaxDelay: 5 * time.Minute, + LeaseTTL: 30 * time.Second, + BatchSize: 50, + }, + } +} + // parsePositiveDuration rejects zero and negative durations: a zero // RequestTimeout would expire every request instantly, and a non-positive // ShutdownTimeout would defeat graceful shutdown. diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index dfcb5b8a..88f0d927 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -3,6 +3,8 @@ package config import ( "testing" "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) func TestLoadDefaults(t *testing.T) { @@ -31,6 +33,18 @@ func TestLoadDefaults(t *testing.T) { if cfg.RunFilePath == "" { t.Error("RunFilePath is empty, want a resolved default path") } + if !cfg.Notifications.Enabled || !cfg.Notifications.Dashboard.Enabled || !cfg.Notifications.Desktop.Enabled { + t.Fatalf("notification defaults should be enabled: %+v", cfg.Notifications) + } + if cfg.Notifications.Dashboard.Limit != 50 { + t.Fatalf("dashboard limit = %d, want 50", cfg.Notifications.Dashboard.Limit) + } + if got := cfg.Notifications.Routing.Priorities[ports.PriorityUrgent]; len(got) != 2 || got[0] != "dashboard" || got[1] != "desktop" { + t.Fatalf("urgent routes = %v, want dashboard+desktop", got) + } + if cfg.Notifications.Retry.MaxAttempts != 5 || cfg.Notifications.Retry.LeaseTTL != 30*time.Second { + t.Fatalf("retry defaults = %+v", cfg.Notifications.Retry) + } } func TestLoadOverrides(t *testing.T) { diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 556fe5f0..fd426e7c 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -61,6 +61,8 @@ func Run() error { return err } + notifier := startNotifier(ctx, cfg, store, log) + // Terminal streaming: the tmux runtime supplies the PTY-attach command and // liveness; the CDC broadcaster feeds the session-state channel. The manager // is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow @@ -71,6 +73,11 @@ func Run() error { srv, err := httpd.New(cfg, log, termMgr) if err != nil { + stop() + notifier.Stop() + if cdcErr := cdcPipe.Stop(); cdcErr != nil { + log.Error("cdc pipeline shutdown", "err", cdcErr) + } return err } @@ -79,6 +86,11 @@ func Run() error { // trigger -> change_log -> poller -> broadcaster. lcStack, err := startLifecycle(ctx, store, log) if err != nil { + stop() + notifier.Stop() + if cdcErr := cdcPipe.Stop(); cdcErr != nil { + log.Error("cdc pipeline shutdown", "err", cdcErr) + } return err } @@ -98,6 +110,7 @@ func Run() error { // the LIFO trap (see comment after srv.Run), hence explicit. stop() lcStack.Stop() + notifier.Stop() if cdcErr := cdcPipe.Stop(); cdcErr != nil { log.Error("cdc pipeline shutdown", "err", cdcErr) } @@ -113,6 +126,7 @@ func Run() error { // runs before the cancel — which would hang any non-signal exit path. stop() lcStack.Stop() + notifier.Stop() if err := cdcPipe.Stop(); err != nil { log.Error("cdc pipeline shutdown", "err", err) } diff --git a/backend/internal/daemon/notifier_wiring.go b/backend/internal/daemon/notifier_wiring.go new file mode 100644 index 00000000..5864b1e9 --- /dev/null +++ b/backend/internal/daemon/notifier_wiring.go @@ -0,0 +1,28 @@ +package daemon + +import ( + "context" + "log/slog" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +type notifierStack struct { + Manager *notification.Manager + done <-chan struct{} +} + +func startNotifier(ctx context.Context, cfg config.Config, store *sqlite.Store, log *slog.Logger) *notifierStack { + mgr := notification.NewManager(store, notification.SettingsFromConfig(cfg), log) + done := mgr.Start(ctx) + return ¬ifierStack{Manager: mgr, done: done} +} + +func (s *notifierStack) Stop() { + if s == nil || s.done == nil { + return + } + <-s.done +} diff --git a/backend/internal/domain/notification.go b/backend/internal/domain/notification.go index 8c64c9bc..8af49550 100644 --- a/backend/internal/domain/notification.go +++ b/backend/internal/domain/notification.go @@ -27,6 +27,7 @@ type Notification struct { CauseKey string ReadAt time.Time ArchivedAt time.Time + RoutedAt time.Time CreatedAt time.Time UpdatedAt time.Time } diff --git a/backend/internal/integration/notification_runtime_test.go b/backend/internal/integration/notification_runtime_test.go new file mode 100644 index 00000000..c837bf29 --- /dev/null +++ b/backend/internal/integration/notification_runtime_test.go @@ -0,0 +1,82 @@ +package integration + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +func TestNotificationRuntimeRoutesDesktopEligiblePriorities(t *testing.T) { + t.Parallel() + ctx := context.Background() + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + defer store.Close() + seedProject(t, store, "mer") + rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-55", "feat/notifier")) + if err != nil { + t.Fatal(err) + } + + urgent := enqueueRuntimeNotification(t, store, rec, "urgent", "urgent") + action := enqueueRuntimeNotification(t, store, rec, "action", "action") + info := enqueueRuntimeNotification(t, store, rec, "info", "info") + + mgr := notification.NewManager(store, notification.StaticSettings(config.DefaultNotificationConfig()), slog.New(slog.NewTextHandler(io.Discard, nil))) + routed, err := mgr.RoutePending(ctx, 50) + if err != nil { + t.Fatal(err) + } + if routed != 3 { + t.Fatalf("routed = %d, want 3", routed) + } + + for _, ntf := range []domain.Notification{urgent, action} { + rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(ntf.ID), Limit: 10}) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 || rows[0].Sink != notification.SinkAOApp || rows[0].RouteName != notification.RouteDesktop { + t.Fatalf("%s should have one AO-app desktop delivery, got %+v", ntf.Priority, rows) + } + } + rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(info.ID), Limit: 10}) + if err != nil { + t.Fatal(err) + } + if len(rows) != 0 { + t.Fatalf("info should remain dashboard/read-model only, got deliveries %+v", rows) + } +} + +func enqueueRuntimeNotification(t *testing.T, store *sqlite.Store, rec domain.SessionRecord, priority, dedupe string) domain.Notification { + t.Helper() + now := time.Now().UTC().Truncate(time.Second) + row, _, err := store.EnqueueNotification(context.Background(), domain.Notification{ + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle", + EventType: "reaction.test", + SemanticType: "test." + priority, + Priority: priority, + Message: "test " + priority, + Payload: json.RawMessage(`{}`), + DedupeKey: "runtime-" + dedupe, + CreatedAt: now, + UpdatedAt: now, + }) + if err != nil { + t.Fatalf("enqueue notification: %v", err) + } + return row +} diff --git a/backend/internal/notification/delivery.go b/backend/internal/notification/delivery.go new file mode 100644 index 00000000..c7fa0311 --- /dev/null +++ b/backend/internal/notification/delivery.go @@ -0,0 +1,118 @@ +package notification + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +var ErrDeliveryUpdateConflict = errors.New("notification delivery update conflict") + +const ( + RouteDashboard = "dashboard" + RouteDesktop = "desktop" + + SinkAOApp = "ao-app" + SinkUnknown = "unknown" +) + +type DeliveryStatus string + +const ( + DeliveryQueued DeliveryStatus = "queued" + DeliveryLeased DeliveryStatus = "leased" + DeliverySent DeliveryStatus = "sent" + DeliveryRetryWait DeliveryStatus = "retry_wait" + DeliveryFailed DeliveryStatus = "failed" + DeliverySkipped DeliveryStatus = "skipped" + DeliveryCancelled DeliveryStatus = "cancelled" +) + +// DeliveryRow is the durable handoff state for one notification route. The +// backend creates AO-app rows; Electron claims them later and reports success or +// failure. External sinks can use the same shape in future issues. +type DeliveryRow struct { + ID string + NotificationID domain.NotificationID + NotificationSeq int64 + ProjectID domain.ProjectID + SessionID domain.SessionID + + RouteName string + Sink string + DestinationKey string + RequestJSON json.RawMessage + + Status DeliveryStatus + Attempts int + MaxAttempts int + NextAttemptAt time.Time + LeaseOwner string + // LeaseExpiresAt is zero when the row is not leased. + LeaseExpiresAt time.Time + + LastErrorCode string + LastError string + ExternalID string + + CreatedAt time.Time + UpdatedAt time.Time + DeliveredAt time.Time +} + +func NewDeliveryID() (string, error) { + var b [16]byte + if _, err := rand.Read(b[:]); err != nil { + return "", fmt.Errorf("generate delivery id: %w", err) + } + return "del_" + hex.EncodeToString(b[:]), nil +} + +func NormalizeDelivery(row DeliveryRow, now time.Time, maxAttempts int) (DeliveryRow, error) { + if row.ID == "" { + id, err := NewDeliveryID() + if err != nil { + return DeliveryRow{}, err + } + row.ID = id + } + if len(row.RequestJSON) == 0 { + row.RequestJSON = json.RawMessage(`{}`) + } + if !json.Valid(row.RequestJSON) { + return DeliveryRow{}, fmt.Errorf("invalid delivery request JSON for %s", row.ID) + } + if row.Status == "" { + row.Status = DeliveryQueued + } + if row.MaxAttempts <= 0 { + row.MaxAttempts = maxAttempts + } + if row.MaxAttempts <= 0 { + row.MaxAttempts = 1 + } + if row.NextAttemptAt.IsZero() { + row.NextAttemptAt = now + } + if row.CreatedAt.IsZero() { + row.CreatedAt = now + } + if row.UpdatedAt.IsZero() { + row.UpdatedAt = row.CreatedAt + } + return row, nil +} + +func TerminalStatus(s DeliveryStatus) bool { + switch s { + case DeliverySent, DeliveryFailed, DeliverySkipped, DeliveryCancelled: + return true + default: + return false + } +} diff --git a/backend/internal/notification/dispatcher.go b/backend/internal/notification/dispatcher.go new file mode 100644 index 00000000..19d0cbf5 --- /dev/null +++ b/backend/internal/notification/dispatcher.go @@ -0,0 +1,36 @@ +package notification + +import ( + "context" + "time" +) + +func startDispatcher(ctx context.Context, m *Manager) <-chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + runDispatcherOnce(ctx, m) + + interval := m.interval + if interval <= 0 { + interval = time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + runDispatcherOnce(ctx, m) + } + } + }() + return done +} + +func runDispatcherOnce(ctx context.Context, m *Manager) { + if err := m.RunOnce(ctx); err != nil { + m.logger.ErrorContext(ctx, "notification dispatcher tick", "err", err) + } +} diff --git a/backend/internal/notification/dispatcher_test.go b/backend/internal/notification/dispatcher_test.go new file mode 100644 index 00000000..34b7b23e --- /dev/null +++ b/backend/internal/notification/dispatcher_test.go @@ -0,0 +1,170 @@ +package notification + +import ( + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +type fakeRuntimeStore struct { + mu sync.Mutex + unrouted []domain.Notification + deliveries []DeliveryRow + byID map[string]DeliveryRow + routed []domain.NotificationID + releases int + retryNext time.Time + failEnqueue map[domain.NotificationID]error +} + +func (f *fakeRuntimeStore) ListUnroutedNotifications(context.Context, int) ([]domain.Notification, error) { + f.mu.Lock() + defer f.mu.Unlock() + return append([]domain.Notification(nil), f.unrouted...), nil +} + +func (f *fakeRuntimeStore) MarkNotificationRouted(_ context.Context, id domain.NotificationID, _ time.Time) error { + f.mu.Lock() + defer f.mu.Unlock() + f.routed = append(f.routed, id) + return nil +} + +func (f *fakeRuntimeStore) EnqueueDelivery(_ context.Context, row DeliveryRow) (DeliveryRow, bool, error) { + f.mu.Lock() + defer f.mu.Unlock() + if err := f.failEnqueue[row.NotificationID]; err != nil { + return DeliveryRow{}, false, err + } + f.deliveries = append(f.deliveries, row) + return row, true, nil +} + +func (f *fakeRuntimeStore) GetDelivery(_ context.Context, id string) (DeliveryRow, bool, error) { + f.mu.Lock() + defer f.mu.Unlock() + row, ok := f.byID[id] + return row, ok, nil +} + +func (f *fakeRuntimeStore) ClaimDueDeliveries(context.Context, string, string, time.Time, int, time.Duration) ([]DeliveryRow, error) { + return nil, nil +} +func (f *fakeRuntimeStore) ReleaseExpiredDeliveryLeases(context.Context, time.Time) (int, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.releases++ + return 0, nil +} +func (f *fakeRuntimeStore) MarkDeliverySent(context.Context, string, string, string, time.Time) error { + return nil +} +func (f *fakeRuntimeStore) MarkDeliveryRetry(_ context.Context, _ string, _ string, _ string, _ string, next time.Time, _ time.Time) error { + f.mu.Lock() + defer f.mu.Unlock() + f.retryNext = next + return nil +} +func (f *fakeRuntimeStore) MarkDeliveryFailed(context.Context, string, string, string, string, time.Time) error { + return nil +} +func (f *fakeRuntimeStore) MarkDeliverySkipped(context.Context, string, string, time.Time) error { + return nil +} + +func TestDispatcherStartReleasesAndStops(t *testing.T) { + store := &fakeRuntimeStore{} + mgr := NewManager(store, StaticSettings(config.DefaultNotificationConfig()), discardLogger()) + mgr.interval = 10 * time.Millisecond + ctx, cancel := context.WithCancel(context.Background()) + done := mgr.Start(ctx) + + deadline := time.After(time.Second) + for { + store.mu.Lock() + released := store.releases + store.mu.Unlock() + if released > 0 { + break + } + select { + case <-deadline: + t.Fatal("dispatcher did not run initial release") + case <-time.After(time.Millisecond): + } + } + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("dispatcher did not stop after context cancel") + } +} + +func TestRoutePendingDeliveryFailureDoesNotBlockOtherNotifications(t *testing.T) { + n1 := sampleDomainNotification("ntf_1", "urgent") + n2 := sampleDomainNotification("ntf_2", "urgent") + store := &fakeRuntimeStore{ + unrouted: []domain.Notification{n1, n2}, + failEnqueue: map[domain.NotificationID]error{n1.ID: errors.New("boom")}, + } + mgr := NewManager(store, StaticSettings(config.DefaultNotificationConfig()), discardLogger()) + + routed, err := mgr.RoutePending(context.Background(), 10) + if err == nil { + t.Fatal("RoutePending should return the first routing error") + } + if routed != 1 { + t.Fatalf("routed = %d, want one successful notification", routed) + } + store.mu.Lock() + defer store.mu.Unlock() + if len(store.routed) != 1 || store.routed[0] != n2.ID { + t.Fatalf("routed IDs = %v, want only %s", store.routed, n2.ID) + } +} + +func TestMarkDeliveryErrorUsesAttemptAwareBackoff(t *testing.T) { + now := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) + cfg := config.DefaultNotificationConfig() + cfg.Retry.BaseDelay = time.Second + cfg.Retry.MaxDelay = time.Minute + store := &fakeRuntimeStore{byID: map[string]DeliveryRow{ + "del_1": {ID: "del_1", Attempts: 2, MaxAttempts: 5}, + }} + mgr := NewManager(store, StaticSettings(cfg), discardLogger()) + mgr.clock = func() time.Time { return now } + + if err := mgr.MarkDeliveryError(context.Background(), "del_1", "electron", "timeout", "timed out"); err != nil { + t.Fatal(err) + } + store.mu.Lock() + next := store.retryNext + store.mu.Unlock() + delay := next.Sub(now) + if delay < 3200*time.Millisecond || delay > 4800*time.Millisecond { + t.Fatalf("retry delay for third attempt = %s, want jittered 4s backoff", delay) + } +} + +func sampleDomainNotification(id domain.NotificationID, priority string) domain.Notification { + return domain.Notification{ + Seq: 1, + ID: id, + ProjectID: "ao", + SessionID: "ao-1", + Priority: priority, + Message: "hello", + } +} + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} diff --git a/backend/internal/notification/enqueuer.go b/backend/internal/notification/enqueuer.go index 79e902bf..59538305 100644 --- a/backend/internal/notification/enqueuer.go +++ b/backend/internal/notification/enqueuer.go @@ -8,23 +8,23 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) -// Store is the durable write-side used by the enqueuer. *sqlite.Store satisfies -// this interface. -type Store interface { +// EnqueueStore is the durable write-side used by the enqueuer. *sqlite.Store +// satisfies this interface. +type EnqueueStore interface { EnqueueNotification(ctx context.Context, row domain.Notification) (domain.Notification, bool, error) } // Enqueuer is a store-backed ports.Notifier. It does not deliver to external // sinks; it renders and persists the notification for later dashboard/app sinks. type Enqueuer struct { - store Store + store EnqueueStore renderer *Renderer logger *slog.Logger } var _ ports.Notifier = (*Enqueuer)(nil) -func NewEnqueuer(store Store, renderer *Renderer, logger *slog.Logger) *Enqueuer { +func NewEnqueuer(store EnqueueStore, renderer *Renderer, logger *slog.Logger) *Enqueuer { if logger == nil { logger = slog.Default() } diff --git a/backend/internal/notification/manager.go b/backend/internal/notification/manager.go new file mode 100644 index 00000000..235f893e --- /dev/null +++ b/backend/internal/notification/manager.go @@ -0,0 +1,157 @@ +package notification + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type Manager struct { + store Store + settings SettingsProvider + clock func() time.Time + logger *slog.Logger + + interval time.Duration +} + +func NewManager(store Store, settings SettingsProvider, logger *slog.Logger) *Manager { + if logger == nil { + logger = slog.Default() + } + if settings == nil { + settings = StaticSettings(config.DefaultNotificationConfig()) + } + return &Manager{ + store: store, + settings: settings, + clock: time.Now, + logger: logger, + interval: time.Second, + } +} + +func (m *Manager) Start(ctx context.Context) <-chan struct{} { + return startDispatcher(ctx, m) +} + +// RunOnce performs one maintenance/routing pass. It is exposed for tests and +// for future API-triggered nudges; Start calls it on every dispatcher tick. +func (m *Manager) RunOnce(ctx context.Context) error { + settings := m.settings.Settings(ctx) + policy := RetryPolicyFromConfig(settings.Retry) + now := m.clock().UTC() + + if released, err := m.store.ReleaseExpiredDeliveryLeases(ctx, now); err != nil { + return err + } else if released > 0 { + m.logger.DebugContext(ctx, "notification delivery leases released", "count", released) + } + + _, err := m.RoutePending(ctx, policy.BatchSize) + return err +} + +func (m *Manager) RoutePending(ctx context.Context, limit int) (int, error) { + if limit <= 0 { + limit = RetryPolicyFromConfig(m.settings.Settings(ctx).Retry).BatchSize + } + rows, err := m.store.ListUnroutedNotifications(ctx, limit) + if err != nil { + return 0, err + } + var firstErr error + var routed int + for _, row := range rows { + if err := m.RouteNotification(ctx, row); err != nil { + m.logger.ErrorContext(ctx, "route notification", "notification", row.ID, "err", err) + if firstErr == nil { + firstErr = err + } else { + firstErr = errors.Join(firstErr, err) + } + continue + } + routed++ + } + return routed, firstErr +} + +func (m *Manager) RouteNotification(ctx context.Context, row domain.Notification) error { + settings := m.settings.Settings(ctx) + now := m.clock().UTC() + if !settings.Enabled || !row.ArchivedAt.IsZero() { + return m.store.MarkNotificationRouted(ctx, row.ID, now) + } + + decisions := ResolveRoutes(settings, ports.Priority(row.Priority)) + maxAttempts := RetryPolicyFromConfig(settings.Retry).MaxAttempts + for _, decision := range decisions { + if !decision.CreateDelivery { + continue + } + delivery := DeliveryRow{ + NotificationID: row.ID, + NotificationSeq: row.Seq, + ProjectID: row.ProjectID, + SessionID: row.SessionID, + RouteName: decision.RouteName, + Sink: decision.Sink, + DestinationKey: decision.DestinationKey, + Status: decision.Status, + MaxAttempts: maxAttempts, + NextAttemptAt: now, + CreatedAt: now, + UpdatedAt: now, + } + if delivery.Status == "" { + delivery.Status = DeliveryQueued + } + if decision.Reason != "" { + delivery.LastErrorCode = "route_skipped" + delivery.LastError = decision.Reason + } + if _, _, err := m.store.EnqueueDelivery(ctx, delivery); err != nil { + return err + } + } + return m.store.MarkNotificationRouted(ctx, row.ID, now) +} + +func (m *Manager) ClaimDesktopDeliveries(ctx context.Context, owner string, limit int) ([]DeliveryRow, error) { + settings := m.settings.Settings(ctx) + policy := RetryPolicyFromConfig(settings.Retry) + return m.store.ClaimDueDeliveries(ctx, SinkAOApp, owner, m.clock().UTC(), limit, policy.LeaseTTL) +} + +func (m *Manager) MarkDeliverySent(ctx context.Context, id, owner, externalID string) error { + return m.store.MarkDeliverySent(ctx, id, owner, externalID, m.clock().UTC()) +} + +func (m *Manager) MarkDeliveryError(ctx context.Context, id, owner, code, message string) error { + settings := m.settings.Settings(ctx) + policy := RetryPolicyFromConfig(settings.Retry) + now := m.clock().UTC() + // The store is the source of truth for attempts/max-attempt terminal + // handling. Permanent classification short-circuits to failed; otherwise we + // fetch the current delivery attempts and provide the attempt-aware next + // retry timestamp for retry_wait rows. + if ClassifyError(code) == ErrorPermanent { + return m.store.MarkDeliveryFailed(ctx, id, owner, code, message, now) + } + row, ok, err := m.store.GetDelivery(ctx, id) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("notification delivery %s not found", id) + } + nextAttemptNo := row.Attempts + 1 + return m.store.MarkDeliveryRetry(ctx, id, owner, code, message, policy.NextAttemptAt(now, nextAttemptNo), now) +} diff --git a/backend/internal/notification/retry.go b/backend/internal/notification/retry.go new file mode 100644 index 00000000..72c54775 --- /dev/null +++ b/backend/internal/notification/retry.go @@ -0,0 +1,116 @@ +package notification + +import ( + crand "crypto/rand" + "encoding/binary" + "math" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" +) + +const retryJitterFraction = 0.20 + +type RetryPolicy struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration + LeaseTTL time.Duration + BatchSize int + Jitter float64 + RandFloat64 func() float64 +} + +func RetryPolicyFromConfig(cfg config.NotificationRetryConfig) RetryPolicy { + settings := NormalizeSettings(config.NotificationConfig{Enabled: true, Retry: cfg}) + return RetryPolicy{ + MaxAttempts: settings.Retry.MaxAttempts, + BaseDelay: settings.Retry.BaseDelay, + MaxDelay: settings.Retry.MaxDelay, + LeaseTTL: settings.Retry.LeaseTTL, + BatchSize: settings.Retry.BatchSize, + Jitter: retryJitterFraction, + RandFloat64: cryptoRandFloat64, + } +} + +func (p RetryPolicy) normalized() RetryPolicy { + cfg := config.NotificationRetryConfig{ + MaxAttempts: p.MaxAttempts, + BaseDelay: p.BaseDelay, + MaxDelay: p.MaxDelay, + LeaseTTL: p.LeaseTTL, + BatchSize: p.BatchSize, + } + out := RetryPolicyFromConfig(cfg) + if p.Jitter != 0 { + out.Jitter = p.Jitter + } + if p.RandFloat64 != nil { + out.RandFloat64 = p.RandFloat64 + } + return out +} + +// BackoffDelay returns exponential backoff for the already-recorded attempt +// count. attempt=1 returns the base delay; delays are capped before jitter. +func (p RetryPolicy) BackoffDelay(attempt int) time.Duration { + p = p.normalized() + if attempt < 1 { + attempt = 1 + } + mult := math.Pow(2, float64(attempt-1)) + delay := time.Duration(float64(p.BaseDelay) * mult) + if delay > p.MaxDelay || delay <= 0 { + delay = p.MaxDelay + } + if p.Jitter <= 0 { + return delay + } + randFloat := p.RandFloat64 + if randFloat == nil { + randFloat = cryptoRandFloat64 + } + // rand in [0,1) -> factor in [1-jitter, 1+jitter) + factor := 1 - p.Jitter + (2 * p.Jitter * randFloat()) + return time.Duration(float64(delay) * factor) +} + +func cryptoRandFloat64() float64 { + var b [8]byte + if _, err := crand.Read(b[:]); err != nil { + // Fall back to a time-derived value only if the OS CSPRNG fails. The + // fallback still avoids math/rand's deterministic process seed. + return float64(time.Now().UnixNano()&((1<<53)-1)) / float64(1<<53) + } + // Match math/rand.Float64's 53 bits of precision in [0,1). + return float64(binary.BigEndian.Uint64(b[:])>>11) / float64(1<<53) +} + +func (p RetryPolicy) NextAttemptAt(now time.Time, attempt int) time.Time { + return now.Add(p.BackoffDelay(attempt)) +} + +type ErrorClass string + +const ( + ErrorTransient ErrorClass = "transient" + ErrorPermanent ErrorClass = "permanent" +) + +func ClassifyError(code string) ErrorClass { + switch strings.ToLower(strings.TrimSpace(code)) { + case "permanent", "invalid_request", "bad_request", "unauthorized", "forbidden", "not_found", "unsupported_route", "route_disabled": + return ErrorPermanent + default: + return ErrorTransient + } +} + +func ShouldRetry(code string, attempts, maxAttempts int) bool { + if maxAttempts <= 0 { + maxAttempts = 1 + } + return ClassifyError(code) != ErrorPermanent && attempts < maxAttempts +} diff --git a/backend/internal/notification/retry_test.go b/backend/internal/notification/retry_test.go new file mode 100644 index 00000000..c6d37155 --- /dev/null +++ b/backend/internal/notification/retry_test.go @@ -0,0 +1,54 @@ +package notification + +import ( + "testing" + "time" +) + +func TestRetryBackoffExponentialCapped(t *testing.T) { + p := RetryPolicy{ + BaseDelay: time.Second, + MaxDelay: 5 * time.Second, + Jitter: retryJitterFraction, + RandFloat64: func() float64 { return 0.5 }, + } + if got := p.BackoffDelay(1); got != time.Second { + t.Fatalf("attempt 1 delay = %s, want 1s", got) + } + if got := p.BackoffDelay(2); got != 2*time.Second { + t.Fatalf("attempt 2 delay = %s, want 2s", got) + } + if got := p.BackoffDelay(4); got != 5*time.Second { + t.Fatalf("attempt 4 delay = %s, want capped 5s", got) + } +} + +func TestRetryJitterBounds(t *testing.T) { + base := RetryPolicy{BaseDelay: 10 * time.Second, MaxDelay: time.Minute, Jitter: retryJitterFraction} + low := base + low.RandFloat64 = func() float64 { return 0 } + high := base + high.RandFloat64 = func() float64 { return 1 } + + if got := low.BackoffDelay(1); got != 8*time.Second { + t.Fatalf("low jitter = %s, want 8s", got) + } + if got := high.BackoffDelay(1); got != 12*time.Second { + t.Fatalf("high jitter = %s, want 12s", got) + } +} + +func TestErrorClassificationRetry(t *testing.T) { + if ClassifyError("invalid_request") != ErrorPermanent { + t.Fatal("invalid_request should be permanent") + } + if ShouldRetry("invalid_request", 1, 5) { + t.Fatal("permanent errors should not retry") + } + if !ShouldRetry("timeout", 1, 5) { + t.Fatal("transient errors under max attempts should retry") + } + if ShouldRetry("timeout", 5, 5) { + t.Fatal("max attempts should stop retry") + } +} diff --git a/backend/internal/notification/routing.go b/backend/internal/notification/routing.go new file mode 100644 index 00000000..36a13d9b --- /dev/null +++ b/backend/internal/notification/routing.go @@ -0,0 +1,72 @@ +package notification + +import ( + "slices" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type RouteDecision struct { + RouteName string + Sink string + DestinationKey string + Status DeliveryStatus + Reason string + CreateDelivery bool +} + +// ResolveRoutes resolves the configured built-in routes for one notification. +// Dashboard is a read model over the notifications table, so it is represented +// in the decision list but never creates a delivery row. Unknown explicitly +// configured routes become skipped delivery rows for operator visibility. +func ResolveRoutes(settings config.NotificationConfig, priority ports.Priority) []RouteDecision { + settings = NormalizeSettings(settings) + if !settings.Enabled { + return nil + } + + routes, ok := settings.Routing.Priorities[priority] + if !ok { + return nil + } + out := make([]RouteDecision, 0, len(routes)) + for _, name := range routes { + switch name { + case RouteDashboard: + if settings.Dashboard.Enabled { + out = append(out, RouteDecision{RouteName: RouteDashboard}) + } + case RouteDesktop: + if settings.Desktop.Enabled && priorityAllowed(priority, settings.Desktop.Priorities) { + out = append(out, RouteDecision{ + RouteName: RouteDesktop, + Sink: SinkAOApp, + Status: DeliveryQueued, + CreateDelivery: true, + }) + } + case "": + // Ignore empty route names so a stray trailing separator in future + // config parsing does not create a permanent skipped delivery. + default: + out = append(out, RouteDecision{ + RouteName: name, + Sink: SinkUnknown, + Status: DeliverySkipped, + Reason: "unknown route", + CreateDelivery: true, + }) + } + } + return out +} + +func DesktopEligible(settings config.NotificationConfig, priority ports.Priority) bool { + settings = NormalizeSettings(settings) + return settings.Enabled && settings.Desktop.Enabled && priorityAllowed(priority, settings.Desktop.Priorities) +} + +func priorityAllowed(p ports.Priority, allowed []ports.Priority) bool { + return slices.Contains(allowed, p) +} diff --git a/backend/internal/notification/routing_test.go b/backend/internal/notification/routing_test.go new file mode 100644 index 00000000..913da8c2 --- /dev/null +++ b/backend/internal/notification/routing_test.go @@ -0,0 +1,87 @@ +package notification + +import ( + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestResolveRoutes_Defaults(t *testing.T) { + cfg := config.DefaultNotificationConfig() + tests := []struct { + priority ports.Priority + want []string + }{ + {ports.PriorityUrgent, []string{RouteDashboard, RouteDesktop}}, + {ports.PriorityAction, []string{RouteDashboard, RouteDesktop}}, + {ports.PriorityWarning, []string{RouteDashboard}}, + {ports.PriorityInfo, []string{RouteDashboard}}, + } + for _, tc := range tests { + t.Run(string(tc.priority), func(t *testing.T) { + got := routeNames(ResolveRoutes(cfg, tc.priority)) + if len(got) != len(tc.want) { + t.Fatalf("routes = %v, want %v", got, tc.want) + } + for i := range tc.want { + if got[i] != tc.want[i] { + t.Fatalf("routes = %v, want %v", got, tc.want) + } + } + }) + } +} + +func TestResolveRoutes_DesktopDisabledOrIneligible(t *testing.T) { + cfg := config.DefaultNotificationConfig() + cfg.Desktop.Enabled = false + got := routeNames(ResolveRoutes(cfg, ports.PriorityUrgent)) + if len(got) != 1 || got[0] != RouteDashboard { + t.Fatalf("desktop disabled routes = %v, want dashboard only", got) + } + + cfg = config.DefaultNotificationConfig() + cfg.Routing.Priorities[ports.PriorityInfo] = []string{RouteDashboard, RouteDesktop} + got = routeNames(ResolveRoutes(cfg, ports.PriorityInfo)) + if len(got) != 1 || got[0] != RouteDashboard { + t.Fatalf("info desktop ineligible routes = %v, want dashboard only", got) + } +} + +func TestResolveRoutes_GlobalDisabled(t *testing.T) { + cfg := config.DefaultNotificationConfig() + cfg.Enabled = false + if got := ResolveRoutes(cfg, ports.PriorityUrgent); len(got) != 0 { + t.Fatalf("globally disabled routes = %+v, want none", got) + } +} + +func TestResolveRoutes_ExplicitEmptySuppressesPriority(t *testing.T) { + cfg := config.DefaultNotificationConfig() + cfg.Routing.Priorities[ports.PriorityUrgent] = []string{} + if got := ResolveRoutes(cfg, ports.PriorityUrgent); len(got) != 0 { + t.Fatalf("explicit empty routes = %+v, want none", got) + } +} + +func TestResolveRoutes_UnknownExplicitRouteSkipped(t *testing.T) { + cfg := config.DefaultNotificationConfig() + cfg.Routing.Priorities[ports.PriorityUrgent] = []string{RouteDashboard, "pager"} + got := ResolveRoutes(cfg, ports.PriorityUrgent) + if len(got) != 2 { + t.Fatalf("routes = %+v, want dashboard + skipped unknown", got) + } + unknown := got[1] + if unknown.RouteName != "pager" || unknown.Status != DeliverySkipped || !unknown.CreateDelivery || unknown.Sink != SinkUnknown { + t.Fatalf("unknown route decision = %+v", unknown) + } +} + +func routeNames(routes []RouteDecision) []string { + out := make([]string, len(routes)) + for i, r := range routes { + out[i] = r.RouteName + } + return out +} diff --git a/backend/internal/notification/settings.go b/backend/internal/notification/settings.go new file mode 100644 index 00000000..457c5dc2 --- /dev/null +++ b/backend/internal/notification/settings.go @@ -0,0 +1,89 @@ +package notification + +import ( + "context" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type SettingsProvider interface { + Settings(ctx context.Context) config.NotificationConfig +} + +type staticSettings struct { + cfg config.NotificationConfig +} + +func SettingsFromConfig(cfg config.Config) SettingsProvider { + return staticSettings{cfg: NormalizeSettings(cfg.Notifications)} +} + +func StaticSettings(cfg config.NotificationConfig) SettingsProvider { + return staticSettings{cfg: NormalizeSettings(cfg)} +} + +func (s staticSettings) Settings(context.Context) config.NotificationConfig { + return cloneSettings(s.cfg) +} + +// NormalizeSettings fills unset settings with safe defaults while preserving +// explicit route overrides, including an explicit empty route list. +func NormalizeSettings(in config.NotificationConfig) config.NotificationConfig { + def := config.DefaultNotificationConfig() + out := in + + if out.Dashboard.Limit == 0 { + out.Dashboard.Limit = def.Dashboard.Limit + } + if out.Desktop.Priorities == nil { + out.Desktop.Priorities = append([]ports.Priority(nil), def.Desktop.Priorities...) + } + if out.Desktop.SoundPriorities == nil { + out.Desktop.SoundPriorities = append([]ports.Priority(nil), def.Desktop.SoundPriorities...) + } + if out.Routing.Priorities == nil { + out.Routing.Priorities = cloneRoutes(def.Routing.Priorities) + } else { + merged := cloneRoutes(def.Routing.Priorities) + for p, routes := range out.Routing.Priorities { + merged[p] = append([]string(nil), routes...) + } + out.Routing.Priorities = merged + } + if out.Retry.MaxAttempts == 0 { + out.Retry.MaxAttempts = def.Retry.MaxAttempts + } + if out.Retry.BaseDelay == 0 { + out.Retry.BaseDelay = def.Retry.BaseDelay + } + if out.Retry.MaxDelay == 0 { + out.Retry.MaxDelay = def.Retry.MaxDelay + } + if out.Retry.LeaseTTL == 0 { + out.Retry.LeaseTTL = def.Retry.LeaseTTL + } + if out.Retry.BatchSize == 0 { + out.Retry.BatchSize = def.Retry.BatchSize + } + return cloneSettings(out) +} + +func cloneSettings(in config.NotificationConfig) config.NotificationConfig { + out := in + out.Desktop.Priorities = append([]ports.Priority(nil), in.Desktop.Priorities...) + out.Desktop.SoundPriorities = append([]ports.Priority(nil), in.Desktop.SoundPriorities...) + out.Routing.Priorities = cloneRoutes(in.Routing.Priorities) + return out +} + +func cloneRoutes(in map[ports.Priority][]string) map[ports.Priority][]string { + if in == nil { + return nil + } + out := make(map[ports.Priority][]string, len(in)) + for p, routes := range in { + out[p] = append([]string(nil), routes...) + } + return out +} diff --git a/backend/internal/notification/settings_test.go b/backend/internal/notification/settings_test.go new file mode 100644 index 00000000..9fb9590f --- /dev/null +++ b/backend/internal/notification/settings_test.go @@ -0,0 +1,76 @@ +package notification + +import ( + "context" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestSettingsFromConfigDefaultsWhenUnset(t *testing.T) { + got := SettingsFromConfig(config.Config{Notifications: config.DefaultNotificationConfig()}).Settings(context.Background()) + if !got.Enabled || !got.Desktop.Enabled || !got.Dashboard.Enabled { + t.Fatalf("default config should resolve safe enabled defaults: %+v", got) + } + if got.Retry.MaxAttempts != 5 || got.Retry.BatchSize != 50 { + t.Fatalf("retry defaults = %+v", got.Retry) + } +} + +func TestSettingsFromConfigPreservesExplicitGlobalDisable(t *testing.T) { + got := SettingsFromConfig(config.Config{Notifications: config.NotificationConfig{Enabled: false}}).Settings(context.Background()) + if got.Enabled { + t.Fatalf("explicit disabled notifications should stay disabled: %+v", got) + } + if got.Retry.MaxAttempts != 5 || got.Routing.Priorities == nil { + t.Fatalf("disabled config should still receive non-global defaults: %+v", got) + } +} + +func TestNormalizeSettingsPreservesExplicitSurfaceDisables(t *testing.T) { + got := StaticSettings(config.NotificationConfig{ + Enabled: true, + Dashboard: config.DashboardNotificationConfig{Enabled: false}, + Desktop: config.DesktopNotificationConfig{Enabled: false}, + }).Settings(context.Background()) + + if !got.Enabled { + t.Fatalf("global notifications should remain enabled: %+v", got) + } + if got.Dashboard.Enabled { + t.Fatalf("explicit dashboard disable should stay disabled: %+v", got.Dashboard) + } + if got.Desktop.Enabled { + t.Fatalf("explicit desktop disable should stay disabled: %+v", got.Desktop) + } + if got.Dashboard.Limit != 50 || len(got.Desktop.Priorities) == 0 || got.Retry.MaxAttempts != 5 { + t.Fatalf("disabled surfaces should still receive non-bool defaults: %+v", got) + } +} + +func TestNormalizeSettingsPreservesExplicitEmptyRoute(t *testing.T) { + cfg := config.DefaultNotificationConfig() + cfg.Routing.Priorities[ports.PriorityUrgent] = []string{} + + got := StaticSettings(cfg).Settings(context.Background()) + if routes := got.Routing.Priorities[ports.PriorityUrgent]; len(routes) != 0 { + t.Fatalf("explicit empty urgent route should be preserved, got %v", routes) + } +} + +func TestSettingsProviderReturnsClone(t *testing.T) { + cfg := config.DefaultNotificationConfig() + provider := StaticSettings(cfg) + first := provider.Settings(context.Background()) + first.Desktop.Priorities[0] = ports.PriorityInfo + first.Routing.Priorities[ports.PriorityUrgent][0] = "mutated" + + second := provider.Settings(context.Background()) + if second.Desktop.Priorities[0] != ports.PriorityUrgent { + t.Fatalf("desktop priorities were mutated through clone: %v", second.Desktop.Priorities) + } + if second.Routing.Priorities[ports.PriorityUrgent][0] != RouteDashboard { + t.Fatalf("routes were mutated through clone: %v", second.Routing.Priorities[ports.PriorityUrgent]) + } +} diff --git a/backend/internal/notification/store.go b/backend/internal/notification/store.go new file mode 100644 index 00000000..e2cdf511 --- /dev/null +++ b/backend/internal/notification/store.go @@ -0,0 +1,25 @@ +package notification + +import ( + "context" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// Store is the central notifier runtime's durable interface. The lifecycle +// enqueuer writes notification rows; this interface routes them into durable +// delivery rows and lets AO-app/API code claim and complete desktop handoffs. +type Store interface { + ListUnroutedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) + MarkNotificationRouted(ctx context.Context, id domain.NotificationID, at time.Time) error + + GetDelivery(ctx context.Context, id string) (DeliveryRow, bool, error) + EnqueueDelivery(ctx context.Context, row DeliveryRow) (DeliveryRow, bool, error) + ClaimDueDeliveries(ctx context.Context, sink string, owner string, now time.Time, limit int, lease time.Duration) ([]DeliveryRow, error) + ReleaseExpiredDeliveryLeases(ctx context.Context, now time.Time) (int, error) + MarkDeliverySent(ctx context.Context, id string, owner string, externalID string, at time.Time) error + MarkDeliveryRetry(ctx context.Context, id string, owner string, errCode string, errMessage string, next time.Time, at time.Time) error + MarkDeliveryFailed(ctx context.Context, id string, owner string, errCode string, errMessage string, at time.Time) error + MarkDeliverySkipped(ctx context.Context, id string, reason string, at time.Time) error +} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index 992c0ca0..2887a87e 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -36,6 +36,43 @@ type Notification struct { ArchivedAt sql.NullTime CreatedAt time.Time UpdatedAt time.Time + RoutedAt sql.NullTime +} + +type NotificationDelivery struct { + ID string + NotificationID string + NotificationSeq int64 + ProjectID string + SessionID string + RouteName string + Sink string + DestinationKey string + RequestJson string + Status string + Attempts int64 + MaxAttempts int64 + NextAttemptAt time.Time + LeaseOwner string + LeaseExpiresAt sql.NullTime + LastErrorCode string + LastError string + ExternalID string + CreatedAt time.Time + UpdatedAt time.Time + DeliveredAt sql.NullTime +} + +type NotificationDeliveryAttempt struct { + ID int64 + DeliveryID string + AttemptNo int64 + Status string + StartedAt time.Time + FinishedAt sql.NullTime + ErrorCode string + Error string + ResponseJson string } type Pr struct { diff --git a/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go b/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go new file mode 100644 index 00000000..ed2821ee --- /dev/null +++ b/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go @@ -0,0 +1,256 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: notification_deliveries.sql + +package gen + +import ( + "context" + "database/sql" + "time" +) + +const getNotificationDelivery = `-- name: GetNotificationDelivery :one +SELECT id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +FROM notification_deliveries +WHERE id = ? +` + +func (q *Queries) GetNotificationDelivery(ctx context.Context, id string) (NotificationDelivery, error) { + row := q.db.QueryRowContext(ctx, getNotificationDelivery, id) + var i NotificationDelivery + err := row.Scan( + &i.ID, + &i.NotificationID, + &i.NotificationSeq, + &i.ProjectID, + &i.SessionID, + &i.RouteName, + &i.Sink, + &i.DestinationKey, + &i.RequestJson, + &i.Status, + &i.Attempts, + &i.MaxAttempts, + &i.NextAttemptAt, + &i.LeaseOwner, + &i.LeaseExpiresAt, + &i.LastErrorCode, + &i.LastError, + &i.ExternalID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeliveredAt, + ) + return i, err +} + +const getNotificationDeliveryByUnique = `-- name: GetNotificationDeliveryByUnique :one +SELECT id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +FROM notification_deliveries +WHERE notification_id = ? AND route_name = ? AND destination_key = ? +` + +type GetNotificationDeliveryByUniqueParams struct { + NotificationID string + RouteName string + DestinationKey string +} + +func (q *Queries) GetNotificationDeliveryByUnique(ctx context.Context, arg GetNotificationDeliveryByUniqueParams) (NotificationDelivery, error) { + row := q.db.QueryRowContext(ctx, getNotificationDeliveryByUnique, arg.NotificationID, arg.RouteName, arg.DestinationKey) + var i NotificationDelivery + err := row.Scan( + &i.ID, + &i.NotificationID, + &i.NotificationSeq, + &i.ProjectID, + &i.SessionID, + &i.RouteName, + &i.Sink, + &i.DestinationKey, + &i.RequestJson, + &i.Status, + &i.Attempts, + &i.MaxAttempts, + &i.NextAttemptAt, + &i.LeaseOwner, + &i.LeaseExpiresAt, + &i.LastErrorCode, + &i.LastError, + &i.ExternalID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeliveredAt, + ) + return i, err +} + +const insertNotificationDelivery = `-- name: InsertNotificationDelivery :one +INSERT INTO notification_deliveries ( + id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(notification_id, route_name, destination_key) DO NOTHING +RETURNING id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +` + +type InsertNotificationDeliveryParams struct { + ID string + NotificationID string + NotificationSeq int64 + ProjectID string + SessionID string + RouteName string + Sink string + DestinationKey string + RequestJson string + Status string + Attempts int64 + MaxAttempts int64 + NextAttemptAt time.Time + LeaseOwner string + LeaseExpiresAt sql.NullTime + LastErrorCode string + LastError string + ExternalID string + CreatedAt time.Time + UpdatedAt time.Time + DeliveredAt sql.NullTime +} + +func (q *Queries) InsertNotificationDelivery(ctx context.Context, arg InsertNotificationDeliveryParams) (NotificationDelivery, error) { + row := q.db.QueryRowContext(ctx, insertNotificationDelivery, + arg.ID, + arg.NotificationID, + arg.NotificationSeq, + arg.ProjectID, + arg.SessionID, + arg.RouteName, + arg.Sink, + arg.DestinationKey, + arg.RequestJson, + arg.Status, + arg.Attempts, + arg.MaxAttempts, + arg.NextAttemptAt, + arg.LeaseOwner, + arg.LeaseExpiresAt, + arg.LastErrorCode, + arg.LastError, + arg.ExternalID, + arg.CreatedAt, + arg.UpdatedAt, + arg.DeliveredAt, + ) + var i NotificationDelivery + err := row.Scan( + &i.ID, + &i.NotificationID, + &i.NotificationSeq, + &i.ProjectID, + &i.SessionID, + &i.RouteName, + &i.Sink, + &i.DestinationKey, + &i.RequestJson, + &i.Status, + &i.Attempts, + &i.MaxAttempts, + &i.NextAttemptAt, + &i.LeaseOwner, + &i.LeaseExpiresAt, + &i.LastErrorCode, + &i.LastError, + &i.ExternalID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeliveredAt, + ) + return i, err +} + +const listUnroutedNotifications = `-- name: ListUnroutedNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at +FROM notifications +WHERE routed_at IS NULL +ORDER BY seq ASC +LIMIT ? +` + +func (q *Queries) ListUnroutedNotifications(ctx context.Context, limit int64) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listUnroutedNotifications, limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.Seq, + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Source, + &i.EventType, + &i.SemanticType, + &i.Priority, + &i.Message, + &i.PayloadJson, + &i.ActionsJson, + &i.DedupeKey, + &i.CauseKey, + &i.ReadAt, + &i.ArchivedAt, + &i.CreatedAt, + &i.UpdatedAt, + &i.RoutedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markNotificationRouted = `-- name: MarkNotificationRouted :exec +UPDATE notifications +SET routed_at = COALESCE(routed_at, ?), + updated_at = CASE WHEN routed_at IS NULL THEN ? ELSE updated_at END +WHERE id = ? +` + +type MarkNotificationRoutedParams struct { + RoutedAt sql.NullTime + UpdatedAt time.Time + ID string +} + +func (q *Queries) MarkNotificationRouted(ctx context.Context, arg MarkNotificationRoutedParams) error { + _, err := q.db.ExecContext(ctx, markNotificationRouted, arg.RoutedAt, arg.UpdatedAt, arg.ID) + return err +} diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go index 7b2b5493..47ca63d9 100644 --- a/backend/internal/storage/sqlite/gen/notifications.sql.go +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -16,7 +16,7 @@ UPDATE notifications SET archived_at = ?, updated_at = ? WHERE id = ? AND archived_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at ` type ArchiveNotificationParams struct { @@ -46,13 +46,14 @@ func (q *Queries) ArchiveNotification(ctx context.Context, arg ArchiveNotificati &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } const getNotification = `-- name: GetNotification :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE id = ? ` @@ -77,13 +78,14 @@ func (q *Queries) GetNotification(ctx context.Context, id string) (Notification, &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } const getNotificationByDedupeKey = `-- name: GetNotificationByDedupeKey :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE dedupe_key = ? ` @@ -108,6 +110,7 @@ func (q *Queries) GetNotificationByDedupeKey(ctx context.Context, dedupeKey stri &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } @@ -119,7 +122,7 @@ INSERT INTO notifications ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (dedupe_key) DO NOTHING RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at ` type InsertNotificationParams struct { @@ -173,13 +176,14 @@ func (q *Queries) InsertNotification(ctx context.Context, arg InsertNotification &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } const listNotifications = `-- name: ListNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications ORDER BY seq DESC LIMIT ? @@ -212,6 +216,7 @@ func (q *Queries) ListNotifications(ctx context.Context, limit int64) ([]Notific &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ); err != nil { return nil, err } @@ -228,7 +233,7 @@ func (q *Queries) ListNotifications(ctx context.Context, limit int64) ([]Notific const listNotificationsByProject = `-- name: ListNotificationsByProject :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE project_id = ? ORDER BY seq DESC @@ -267,6 +272,7 @@ func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotifi &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ); err != nil { return nil, err } @@ -283,7 +289,7 @@ func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotifi const listNotificationsBySession = `-- name: ListNotificationsBySession :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE session_id = ? ORDER BY seq DESC @@ -322,6 +328,7 @@ func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotifi &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ); err != nil { return nil, err } @@ -338,7 +345,7 @@ func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotifi const listUnreadNotifications = `-- name: ListUnreadNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE read_at IS NULL AND archived_at IS NULL ORDER BY seq DESC @@ -372,6 +379,7 @@ func (q *Queries) ListUnreadNotifications(ctx context.Context, limit int64) ([]N &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ); err != nil { return nil, err } @@ -391,7 +399,7 @@ UPDATE notifications SET read_at = ?, updated_at = ? WHERE id = ? AND read_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at ` type MarkNotificationReadParams struct { @@ -421,6 +429,7 @@ func (q *Queries) MarkNotificationRead(ctx context.Context, arg MarkNotification &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } @@ -430,7 +439,7 @@ UPDATE notifications SET read_at = NULL, updated_at = ? WHERE id = ? AND read_at IS NOT NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at ` type MarkNotificationUnreadParams struct { @@ -459,6 +468,7 @@ func (q *Queries) MarkNotificationUnread(ctx context.Context, arg MarkNotificati &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, + &i.RoutedAt, ) return i, err } diff --git a/backend/internal/storage/sqlite/gen/querier.go b/backend/internal/storage/sqlite/gen/querier.go index 4f91a9d5..87550b12 100644 --- a/backend/internal/storage/sqlite/gen/querier.go +++ b/backend/internal/storage/sqlite/gen/querier.go @@ -16,10 +16,13 @@ type Querier interface { DeleteSession(ctx context.Context, id string) error GetNotification(ctx context.Context, id string) (Notification, error) GetNotificationByDedupeKey(ctx context.Context, dedupeKey string) (Notification, error) + GetNotificationDelivery(ctx context.Context, id string) (NotificationDelivery, error) + GetNotificationDeliveryByUnique(ctx context.Context, arg GetNotificationDeliveryByUniqueParams) (NotificationDelivery, error) GetPR(ctx context.Context, url string) (Pr, error) GetProject(ctx context.Context, id string) (Project, error) GetSession(ctx context.Context, id string) (Session, error) InsertNotification(ctx context.Context, arg InsertNotificationParams) (Notification, error) + InsertNotificationDelivery(ctx context.Context, arg InsertNotificationDeliveryParams) (NotificationDelivery, error) InsertSession(ctx context.Context, arg InsertSessionParams) error ListAllSessions(ctx context.Context) ([]Session, error) ListChecksByPR(ctx context.Context, prUrl string) ([]PrCheck, error) @@ -32,7 +35,9 @@ type Querier interface { ListRecentChecks(ctx context.Context, arg ListRecentChecksParams) ([]ListRecentChecksRow, error) ListSessionsByProject(ctx context.Context, projectID string) ([]Session, error) ListUnreadNotifications(ctx context.Context, limit int64) ([]Notification, error) + ListUnroutedNotifications(ctx context.Context, limit int64) ([]Notification, error) MarkNotificationRead(ctx context.Context, arg MarkNotificationReadParams) (Notification, error) + MarkNotificationRouted(ctx context.Context, arg MarkNotificationRoutedParams) error MarkNotificationUnread(ctx context.Context, arg MarkNotificationUnreadParams) (Notification, error) MaxChangeLogSeq(ctx context.Context) (interface{}, error) NextSessionNum(ctx context.Context, projectID string) (int64, error) diff --git a/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql b/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql new file mode 100644 index 00000000..4c7cca1d --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql @@ -0,0 +1,128 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE notifications ADD COLUMN routed_at TIMESTAMP; +-- Notifications that already exist when this migration runs predate the +-- delivery runtime. Treat them as already routed so an upgrade does not +-- synthesize new AO-app desktop toasts for historical notification rows; the +-- dashboard read model still sees those rows directly from notifications. +UPDATE notifications SET routed_at = updated_at WHERE routed_at IS NULL; +CREATE INDEX idx_notifications_unrouted + ON notifications(seq) + WHERE routed_at IS NULL; + +CREATE TABLE notification_deliveries ( + id TEXT PRIMARY KEY, + notification_id TEXT NOT NULL REFERENCES notifications(id) ON DELETE CASCADE, + notification_seq INTEGER NOT NULL, + project_id TEXT NOT NULL REFERENCES projects(id), + session_id TEXT NOT NULL REFERENCES sessions(id), + + route_name TEXT NOT NULL, + sink TEXT NOT NULL, + destination_key TEXT NOT NULL DEFAULT '', + request_json TEXT NOT NULL DEFAULT '{}' CHECK (json_valid(request_json)), + + status TEXT NOT NULL CHECK (status IN ('queued','leased','sent','retry_wait','failed','skipped','cancelled')), + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 5, + next_attempt_at TIMESTAMP NOT NULL, + lease_owner TEXT NOT NULL DEFAULT '', + lease_expires_at TIMESTAMP, + + last_error_code TEXT NOT NULL DEFAULT '', + last_error TEXT NOT NULL DEFAULT '', + external_id TEXT NOT NULL DEFAULT '', + + created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), + updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), + delivered_at TIMESTAMP, + + UNIQUE(notification_id, route_name, destination_key) +); + +CREATE INDEX idx_notification_deliveries_due + ON notification_deliveries(status, next_attempt_at, lease_expires_at, created_at); + +CREATE INDEX idx_notification_deliveries_notification + ON notification_deliveries(notification_id, status); + +CREATE INDEX idx_notification_deliveries_project + ON notification_deliveries(project_id, created_at DESC); + +CREATE TABLE notification_delivery_attempts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + delivery_id TEXT NOT NULL REFERENCES notification_deliveries(id) ON DELETE CASCADE, + attempt_no INTEGER NOT NULL, + status TEXT NOT NULL CHECK (status IN ('started','sent','retryable_failed','failed')), + started_at TIMESTAMP NOT NULL, + finished_at TIMESTAMP, + error_code TEXT NOT NULL DEFAULT '', + error TEXT NOT NULL DEFAULT '', + response_json TEXT NOT NULL DEFAULT '{}' CHECK (json_valid(response_json)), + UNIQUE(delivery_id, attempt_no) +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notification_deliveries_cdc_insert +AFTER INSERT ON notification_deliveries +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_delivery_created', + json_object( + 'id', NEW.id, + 'notificationId', NEW.notification_id, + 'routeName', NEW.route_name, + 'sink', NEW.sink, + 'status', NEW.status, + 'attempts', NEW.attempts, + 'lastErrorCode', NEW.last_error_code, + 'lastError', NEW.last_error + ), + NEW.created_at + ); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notification_deliveries_cdc_update +AFTER UPDATE ON notification_deliveries +WHEN OLD.status <> NEW.status + OR OLD.attempts <> NEW.attempts + OR OLD.last_error_code <> NEW.last_error_code + OR OLD.last_error <> NEW.last_error + OR OLD.external_id <> NEW.external_id + OR OLD.delivered_at IS NOT NEW.delivered_at +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_delivery_updated', + json_object( + 'id', NEW.id, + 'notificationId', NEW.notification_id, + 'routeName', NEW.route_name, + 'sink', NEW.sink, + 'status', NEW.status, + 'attempts', NEW.attempts, + 'lastErrorCode', NEW.last_error_code, + 'lastError', NEW.last_error + ), + NEW.updated_at + ); +END; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TRIGGER IF EXISTS notification_deliveries_cdc_update; +DROP TRIGGER IF EXISTS notification_deliveries_cdc_insert; +DROP TABLE IF EXISTS notification_delivery_attempts; +DROP TABLE IF EXISTS notification_deliveries; +DROP INDEX IF EXISTS idx_notifications_unrouted; +ALTER TABLE notifications DROP COLUMN routed_at; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/notification_delivery_store.go b/backend/internal/storage/sqlite/notification_delivery_store.go new file mode 100644 index 00000000..9cdddc74 --- /dev/null +++ b/backend/internal/storage/sqlite/notification_delivery_store.go @@ -0,0 +1,462 @@ +package sqlite + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +const deliveryColumns = `id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at` + +const ( + defaultDeliveryLimit = 100 + defaultDeliveryMaxAttempts = 5 // mirrors notification_deliveries.max_attempts schema default +) + +type DeliveryFilter struct { + NotificationID string + ProjectID string + Status notification.DeliveryStatus + Limit int +} + +func (s *Store) ListUnroutedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) { + if limit <= 0 { + limit = defaultNotificationLimit + } + rows, err := s.qr.ListUnroutedNotifications(ctx, int64(limit)) + if err != nil { + return nil, fmt.Errorf("list unrouted notifications: %w", err) + } + return notificationsFromGen(rows) +} + +func (s *Store) MarkNotificationRouted(ctx context.Context, id domain.NotificationID, at time.Time) error { + if at.IsZero() { + at = time.Now().UTC() + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + if err := s.qw.MarkNotificationRouted(ctx, gen.MarkNotificationRoutedParams{ + RoutedAt: nullTime(at), + UpdatedAt: at, + ID: string(id), + }); err != nil { + return fmt.Errorf("mark notification routed %s: %w", id, err) + } + return nil +} + +func (s *Store) EnqueueDelivery(ctx context.Context, row notification.DeliveryRow) (notification.DeliveryRow, bool, error) { + now := row.CreatedAt + if now.IsZero() { + now = time.Now().UTC() + } + row, err := notification.NormalizeDelivery(row, now, defaultDeliveryMaxAttempts) + if err != nil { + return notification.DeliveryRow{}, false, err + } + + s.writeMu.Lock() + defer s.writeMu.Unlock() + + got, err := s.qw.InsertNotificationDelivery(ctx, gen.InsertNotificationDeliveryParams{ + ID: row.ID, + NotificationID: string(row.NotificationID), + NotificationSeq: row.NotificationSeq, + ProjectID: string(row.ProjectID), + SessionID: string(row.SessionID), + RouteName: row.RouteName, + Sink: row.Sink, + DestinationKey: row.DestinationKey, + RequestJson: string(row.RequestJSON), + Status: string(row.Status), + Attempts: int64(row.Attempts), + MaxAttempts: int64(row.MaxAttempts), + NextAttemptAt: row.NextAttemptAt, + LeaseOwner: row.LeaseOwner, + LeaseExpiresAt: nullTime(row.LeaseExpiresAt), + LastErrorCode: row.LastErrorCode, + LastError: row.LastError, + ExternalID: row.ExternalID, + CreatedAt: row.CreatedAt, + UpdatedAt: row.UpdatedAt, + DeliveredAt: nullTime(row.DeliveredAt), + }) + if errors.Is(err, sql.ErrNoRows) { + existing, readErr := s.getDeliveryByUniqueLocked(ctx, row.NotificationID, row.RouteName, row.DestinationKey) + if readErr != nil { + return notification.DeliveryRow{}, false, readErr + } + return existing, false, nil + } + if err != nil { + return notification.DeliveryRow{}, false, fmt.Errorf("insert notification delivery: %w", err) + } + return deliveryFromGen(got), true, nil +} + +func (s *Store) ClaimDueDeliveries(ctx context.Context, sink string, owner string, now time.Time, limit int, lease time.Duration) ([]notification.DeliveryRow, error) { + if now.IsZero() { + now = time.Now().UTC() + } + if limit <= 0 { + limit = defaultDeliveryLimit + } + if lease <= 0 { + lease = 30 * time.Second + } + expires := now.Add(lease) + + s.writeMu.Lock() + defer s.writeMu.Unlock() + + tx, err := s.writeDB.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("begin claim deliveries: %w", err) + } + defer tx.Rollback() + + rows, err := tx.QueryContext(ctx, `SELECT id +FROM notification_deliveries +WHERE sink = ? + AND status IN ('queued','retry_wait') + AND next_attempt_at <= ? + AND attempts < max_attempts +ORDER BY next_attempt_at ASC, created_at ASC, id ASC +LIMIT ?`, sink, now, limit) + if err != nil { + return nil, fmt.Errorf("select due deliveries: %w", err) + } + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + rows.Close() + return nil, err + } + ids = append(ids, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + + out := make([]notification.DeliveryRow, 0, len(ids)) + for _, id := range ids { + res, err := tx.ExecContext(ctx, `UPDATE notification_deliveries +SET status = 'leased', + lease_owner = ?, + lease_expires_at = ?, + updated_at = ? +WHERE id = ? + AND status IN ('queued','retry_wait') + AND next_attempt_at <= ? + AND attempts < max_attempts`, owner, expires, now, id, now) + if err != nil { + return nil, fmt.Errorf("lease delivery %s: %w", id, err) + } + changed, err := res.RowsAffected() + if err != nil { + return nil, err + } + if changed == 0 { + continue + } + row, err := scanDelivery(tx.QueryRowContext(ctx, `SELECT `+deliveryColumns+` FROM notification_deliveries WHERE id = ?`, id)) + if err != nil { + return nil, fmt.Errorf("read leased delivery %s: %w", id, err) + } + out = append(out, row) + } + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("commit claim deliveries: %w", err) + } + return out, nil +} + +func (s *Store) ReleaseExpiredDeliveryLeases(ctx context.Context, now time.Time) (int, error) { + if now.IsZero() { + now = time.Now().UTC() + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + res, err := s.writeDB.ExecContext(ctx, `UPDATE notification_deliveries +SET attempts = attempts + 1, + status = CASE WHEN attempts + 1 >= max_attempts THEN 'failed' ELSE 'queued' END, + next_attempt_at = ?, + lease_owner = '', + lease_expires_at = NULL, + last_error_code = 'lease_expired', + last_error = 'delivery lease expired', + updated_at = ? +WHERE status = 'leased' + AND lease_expires_at IS NOT NULL + AND lease_expires_at <= ?`, now, now, now) + if err != nil { + return 0, fmt.Errorf("release expired delivery leases: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, err + } + return int(n), nil +} + +func (s *Store) MarkDeliverySent(ctx context.Context, id string, owner string, externalID string, at time.Time) error { + if at.IsZero() { + at = time.Now().UTC() + } + return s.updateDelivery(ctx, "mark delivery sent", `UPDATE notification_deliveries +SET status = 'sent', + attempts = attempts + 1, + lease_owner = '', + lease_expires_at = NULL, + external_id = ?, + delivered_at = ?, + updated_at = ? +WHERE id = ? + AND status = 'leased' + AND lease_owner = ? + AND lease_expires_at > ?`, externalID, at, at, id, owner, at) +} + +func (s *Store) MarkDeliveryRetry(ctx context.Context, id string, owner string, errCode string, errMessage string, next time.Time, at time.Time) error { + if at.IsZero() { + at = time.Now().UTC() + } + if next.IsZero() { + next = at + } + return s.updateDelivery(ctx, "mark delivery retry", `UPDATE notification_deliveries +SET attempts = attempts + 1, + status = CASE WHEN attempts + 1 >= max_attempts THEN 'failed' ELSE 'retry_wait' END, + next_attempt_at = ?, + lease_owner = '', + lease_expires_at = NULL, + last_error_code = ?, + last_error = ?, + updated_at = ? +WHERE id = ? + AND status = 'leased' + AND lease_owner = ? + AND lease_expires_at > ?`, next, errCode, errMessage, at, id, owner, at) +} + +func (s *Store) MarkDeliveryFailed(ctx context.Context, id string, owner string, errCode string, errMessage string, at time.Time) error { + if at.IsZero() { + at = time.Now().UTC() + } + return s.updateDelivery(ctx, "mark delivery failed", `UPDATE notification_deliveries +SET status = 'failed', + attempts = CASE WHEN status = 'leased' THEN attempts + 1 ELSE attempts END, + lease_owner = '', + lease_expires_at = NULL, + last_error_code = ?, + last_error = ?, + updated_at = ? +WHERE id = ? + AND status = 'leased' + AND lease_owner = ? + AND lease_expires_at > ?`, errCode, errMessage, at, id, owner, at) +} + +func (s *Store) MarkDeliverySkipped(ctx context.Context, id string, reason string, at time.Time) error { + if at.IsZero() { + at = time.Now().UTC() + } + return s.updateDelivery(ctx, "mark delivery skipped", `UPDATE notification_deliveries +SET status = 'skipped', + lease_owner = '', + lease_expires_at = NULL, + last_error_code = 'skipped', + last_error = ?, + updated_at = ? +WHERE id = ? AND status NOT IN ('sent','failed','skipped','cancelled')`, reason, at, id) +} + +func (s *Store) GetDelivery(ctx context.Context, id string) (notification.DeliveryRow, bool, error) { + row, err := s.qr.GetNotificationDelivery(ctx, id) + if errors.Is(err, sql.ErrNoRows) { + return notification.DeliveryRow{}, false, nil + } + if err != nil { + return notification.DeliveryRow{}, false, fmt.Errorf("get notification delivery %s: %w", id, err) + } + return deliveryFromGen(row), true, nil +} + +func (s *Store) ListDeliveries(ctx context.Context, filter DeliveryFilter) ([]notification.DeliveryRow, error) { + limit := filter.Limit + if limit <= 0 { + limit = defaultDeliveryLimit + } + base := `SELECT ` + deliveryColumns + ` FROM notification_deliveries` + order := ` ORDER BY created_at ASC, id ASC LIMIT ?` + var ( + rows *sql.Rows + err error + ) + switch { + case filter.NotificationID != "": + if filter.Status != "" { + rows, err = s.readDB.QueryContext(ctx, base+` WHERE notification_id = ? AND status = ?`+order, filter.NotificationID, string(filter.Status), limit) + } else { + rows, err = s.readDB.QueryContext(ctx, base+` WHERE notification_id = ?`+order, filter.NotificationID, limit) + } + case filter.ProjectID != "": + if filter.Status != "" { + rows, err = s.readDB.QueryContext(ctx, base+` WHERE project_id = ? AND status = ?`+order, filter.ProjectID, string(filter.Status), limit) + } else { + rows, err = s.readDB.QueryContext(ctx, base+` WHERE project_id = ?`+order, filter.ProjectID, limit) + } + default: + if filter.Status != "" { + rows, err = s.readDB.QueryContext(ctx, base+` WHERE status = ?`+order, string(filter.Status), limit) + } else { + rows, err = s.readDB.QueryContext(ctx, base+order, limit) + } + } + if err != nil { + return nil, fmt.Errorf("list notification deliveries: %w", err) + } + defer rows.Close() + out := []notification.DeliveryRow{} + for rows.Next() { + row, err := scanDelivery(rows) + if err != nil { + return nil, err + } + out = append(out, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func (s *Store) updateDelivery(ctx context.Context, what string, query string, args ...any) error { + s.writeMu.Lock() + defer s.writeMu.Unlock() + res, err := s.writeDB.ExecContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("%s: %w", what, err) + } + affected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("%s rows affected: %w", what, err) + } + if affected == 0 { + return fmt.Errorf("%s: %w", what, notification.ErrDeliveryUpdateConflict) + } + return nil +} + +func (s *Store) getDeliveryByUniqueLocked(ctx context.Context, id domain.NotificationID, routeName, destinationKey string) (notification.DeliveryRow, error) { + row, err := s.qw.GetNotificationDeliveryByUnique(ctx, gen.GetNotificationDeliveryByUniqueParams{ + NotificationID: string(id), + RouteName: routeName, + DestinationKey: destinationKey, + }) + if err != nil { + return notification.DeliveryRow{}, fmt.Errorf("get notification delivery by unique key: %w", err) + } + return deliveryFromGen(row), nil +} + +type rowScanner interface { + Scan(dest ...any) error +} + +func scanDelivery(scanner rowScanner) (notification.DeliveryRow, error) { + var ( + row notification.DeliveryRow + notificationID string + projectID string + sessionID string + status string + requestJSON string + leaseExpires sql.NullTime + deliveredAt sql.NullTime + ) + if err := scanner.Scan( + &row.ID, + ¬ificationID, + &row.NotificationSeq, + &projectID, + &sessionID, + &row.RouteName, + &row.Sink, + &row.DestinationKey, + &requestJSON, + &status, + &row.Attempts, + &row.MaxAttempts, + &row.NextAttemptAt, + &row.LeaseOwner, + &leaseExpires, + &row.LastErrorCode, + &row.LastError, + &row.ExternalID, + &row.CreatedAt, + &row.UpdatedAt, + &deliveredAt, + ); err != nil { + return notification.DeliveryRow{}, err + } + row.NotificationID = domain.NotificationID(notificationID) + row.ProjectID = domain.ProjectID(projectID) + row.SessionID = domain.SessionID(sessionID) + row.RequestJSON = []byte(requestJSON) + row.Status = notification.DeliveryStatus(status) + if leaseExpires.Valid { + row.LeaseExpiresAt = leaseExpires.Time + } + if deliveredAt.Valid { + row.DeliveredAt = deliveredAt.Time + } + return row, nil +} + +func deliveryFromGen(r gen.NotificationDelivery) notification.DeliveryRow { + row := notification.DeliveryRow{ + ID: r.ID, + NotificationID: domain.NotificationID(r.NotificationID), + NotificationSeq: r.NotificationSeq, + ProjectID: domain.ProjectID(r.ProjectID), + SessionID: domain.SessionID(r.SessionID), + RouteName: r.RouteName, + Sink: r.Sink, + DestinationKey: r.DestinationKey, + RequestJSON: []byte(r.RequestJson), + Status: notification.DeliveryStatus(r.Status), + Attempts: int(r.Attempts), + MaxAttempts: int(r.MaxAttempts), + NextAttemptAt: r.NextAttemptAt, + LeaseOwner: r.LeaseOwner, + LastErrorCode: r.LastErrorCode, + LastError: r.LastError, + ExternalID: r.ExternalID, + CreatedAt: r.CreatedAt, + UpdatedAt: r.UpdatedAt, + } + if r.LeaseExpiresAt.Valid { + row.LeaseExpiresAt = r.LeaseExpiresAt.Time + } + if r.DeliveredAt.Valid { + row.DeliveredAt = r.DeliveredAt.Time + } + return row +} diff --git a/backend/internal/storage/sqlite/notification_delivery_store_test.go b/backend/internal/storage/sqlite/notification_delivery_store_test.go new file mode 100644 index 00000000..2c675466 --- /dev/null +++ b/backend/internal/storage/sqlite/notification_delivery_store_test.go @@ -0,0 +1,302 @@ +package sqlite + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/cdc" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/notification" +) + +func TestNotificationDeliveryEnqueueIdempotentAndCDC(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-dedupe") + ctx := context.Background() + startSeq, _ := s.MaxChangeLogSeq(ctx) + + row, created, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) + if err != nil { + t.Fatal(err) + } + if !created || row.ID == "" || row.Status != notification.DeliveryQueued { + t.Fatalf("created=%v row=%+v", created, row) + } + dup, created, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) + if err != nil { + t.Fatal(err) + } + if created || dup.ID != row.ID { + t.Fatalf("duplicate should return existing row created=false: created=%v dup=%+v row=%+v", created, dup, row) + } + evs, err := s.ReadChangeLogAfter(ctx, startSeq, 10) + if err != nil { + t.Fatal(err) + } + var createdEvents int + for _, ev := range evs { + if ev.EventType == string(cdc.EventNotificationDeliveryCreated) { + createdEvents++ + } + } + if createdEvents != 1 { + t.Fatalf("delivery created CDC count = %d, want 1 events=%+v", createdEvents, evs) + } +} + +func TestNotificationDeliveryEnqueueDefaultMaxAttempts(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-default-max") + ctx := context.Background() + row := sampleDelivery(ntf, "desktop") + row.MaxAttempts = 0 + got, _, err := s.EnqueueDelivery(ctx, row) + if err != nil { + t.Fatal(err) + } + if got.MaxAttempts != 5 { + t.Fatalf("default max attempts = %d, want 5", got.MaxAttempts) + } +} + +func TestNotificationDeliveryClaimDueStableOrder(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-claim") + ctx := context.Background() + base := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) + for i, d := range []time.Duration{2 * time.Second, time.Second, 3 * time.Second} { + row := sampleDelivery(ntf, fmt.Sprintf("desktop-%d", i)) + row.DestinationKey = fmt.Sprintf("dest-%d", i) + row.NextAttemptAt = base.Add(d) + row.CreatedAt = base.Add(time.Duration(i) * time.Millisecond) + row.UpdatedAt = row.CreatedAt + if _, _, err := s.EnqueueDelivery(ctx, row); err != nil { + t.Fatal(err) + } + } + + claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "electron", base.Add(10*time.Second), 2, time.Minute) + if err != nil { + t.Fatal(err) + } + if len(claimed) != 2 { + t.Fatalf("claimed = %d, want 2", len(claimed)) + } + if claimed[0].DestinationKey != "dest-1" || claimed[1].DestinationKey != "dest-0" { + t.Fatalf("claim order = %s, %s; want dest-1, dest-0", claimed[0].DestinationKey, claimed[1].DestinationKey) + } + if claimed[0].Status != notification.DeliveryLeased || claimed[0].LeaseOwner != "electron" || claimed[0].LeaseExpiresAt.IsZero() { + t.Fatalf("claimed row not leased: %+v", claimed[0]) + } +} + +func TestNotificationDeliveryLeaseExpiryAndMaxAttempts(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-expiry") + ctx := context.Background() + now := time.Now().UTC().Truncate(time.Second) + queued, _, err := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop", now)) + if err != nil { + t.Fatal(err) + } + claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Second) + if err != nil || len(claimed) != 1 { + t.Fatalf("claim len=%d err=%v", len(claimed), err) + } + released, err := s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)) + if err != nil || released != 1 { + t.Fatalf("release = %d err=%v", released, err) + } + got, ok, _ := s.GetDelivery(ctx, queued.ID) + if !ok || got.Status != notification.DeliveryQueued || got.Attempts != 1 || got.LeaseOwner != "" { + t.Fatalf("expired lease should return queued with attempts=1: ok=%v row=%+v", ok, got) + } + + maxOne := sampleDueDelivery(ntf, "desktop-max", now) + maxOne.DestinationKey = "max" + maxOne.MaxAttempts = 1 + maxOne, _, err = s.EnqueueDelivery(ctx, maxOne) + if err != nil { + t.Fatal(err) + } + if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Second); err != nil { + t.Fatal(err) + } + released, err = s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)) + if err != nil || released != 1 { + t.Fatalf("release max = %d err=%v", released, err) + } + got, ok, _ = s.GetDelivery(ctx, maxOne.ID) + if !ok || got.Status != notification.DeliveryFailed || got.Attempts != 1 { + t.Fatalf("max attempts expired lease should fail: ok=%v row=%+v", ok, got) + } +} + +func TestNotificationDeliveryMarkSentRetryFailedAndSkipped(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-mark") + ctx := context.Background() + now := time.Now().UTC().Truncate(time.Second) + + sent, _, _ := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop-sent", now)) + claimed, _ := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) + if len(claimed) != 1 { + t.Fatalf("claim sent row len=%d", len(claimed)) + } + if err := s.MarkDeliverySent(ctx, sent.ID, "owner", "native-1", now.Add(time.Second)); err != nil { + t.Fatal(err) + } + got, _, _ := s.GetDelivery(ctx, sent.ID) + if got.Status != notification.DeliverySent || got.ExternalID != "native-1" || got.Attempts != 1 || got.DeliveredAt.IsZero() { + t.Fatalf("sent row = %+v", got) + } + + retry := sampleDueDelivery(ntf, "desktop-retry", now) + retry.DestinationKey = "retry" + retry, _, _ = s.EnqueueDelivery(ctx, retry) + claimed, _ = s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) + if len(claimed) != 1 { + t.Fatalf("claim retry row len=%d", len(claimed)) + } + next := now.Add(30 * time.Second) + if err := s.MarkDeliveryRetry(ctx, retry.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); err != nil { + t.Fatal(err) + } + got, _, _ = s.GetDelivery(ctx, retry.ID) + if got.Status != notification.DeliveryRetryWait || got.Attempts != 1 || !got.NextAttemptAt.Equal(next) { + t.Fatalf("retry row = %+v", got) + } + + fail := sampleDueDelivery(ntf, "desktop-fail", now) + fail.DestinationKey = "fail" + fail.MaxAttempts = 1 + fail, _, _ = s.EnqueueDelivery(ctx, fail) + claimed, _ = s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) + if len(claimed) != 1 { + t.Fatalf("claim fail row len=%d", len(claimed)) + } + if err := s.MarkDeliveryRetry(ctx, fail.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); err != nil { + t.Fatal(err) + } + got, _, _ = s.GetDelivery(ctx, fail.ID) + if got.Status != notification.DeliveryFailed || got.Attempts != 1 { + t.Fatalf("retry at max should fail: %+v", got) + } + + skipped := sampleDueDelivery(ntf, "desktop-skip", now) + skipped.DestinationKey = "skip" + skipped.Status = notification.DeliverySkipped + skipped, _, _ = s.EnqueueDelivery(ctx, skipped) + claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 10, time.Minute) + if err != nil { + t.Fatal(err) + } + for _, row := range claimed { + if row.ID == skipped.ID { + t.Fatalf("skipped row should not be claimable: %+v", claimed) + } + } + if err := s.MarkDeliveryRetry(ctx, skipped.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); !errors.Is(err, notification.ErrDeliveryUpdateConflict) { + t.Fatalf("retry skipped row err = %v, want update conflict", err) + } + got, _, _ = s.GetDelivery(ctx, skipped.ID) + if got.Status != notification.DeliverySkipped || got.Attempts != 0 { + t.Fatalf("skipped row should be terminal: %+v", got) + } +} + +func TestNotificationDeliveryCompletionFencedByLeaseOwner(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-owner-fence") + ctx := context.Background() + now := time.Now().UTC().Truncate(time.Second) + row, _, err := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop", now)) + if err != nil { + t.Fatal(err) + } + if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner-1", now, 1, time.Second); err != nil { + t.Fatal(err) + } + if released, err := s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)); err != nil || released != 1 { + t.Fatalf("release = %d err=%v", released, err) + } + if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner-2", now.Add(2*time.Second), 1, time.Second); err != nil { + t.Fatal(err) + } + + if err := s.MarkDeliverySent(ctx, row.ID, "owner-1", "stale", now.Add(2500*time.Millisecond)); !errors.Is(err, notification.ErrDeliveryUpdateConflict) { + t.Fatalf("stale owner MarkDeliverySent err = %v, want update conflict", err) + } + got, _, _ := s.GetDelivery(ctx, row.ID) + if got.Status != notification.DeliveryLeased || got.LeaseOwner != "owner-2" || got.ExternalID != "" { + t.Fatalf("stale owner should not change active lease: %+v", got) + } + if err := s.MarkDeliverySent(ctx, row.ID, "owner-2", "native-2", now.Add(2500*time.Millisecond)); err != nil { + t.Fatalf("current owner sent: %v", err) + } + got, _, _ = s.GetDelivery(ctx, row.ID) + if got.Status != notification.DeliverySent || got.ExternalID != "native-2" { + t.Fatalf("current owner should complete delivery: %+v", got) + } +} + +func TestNotificationDeliveryUpdateCDC(t *testing.T) { + s, ntf := newDeliveryTestNotification(t, "delivery-cdc-update") + ctx := context.Background() + row, _, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) + if err != nil { + t.Fatal(err) + } + startSeq, _ := s.MaxChangeLogSeq(ctx) + if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", time.Now().UTC(), 1, time.Minute); err != nil { + t.Fatal(err) + } + if err := s.MarkDeliveryFailed(ctx, row.ID, "owner", "permanent", "bad route", time.Now().UTC()); err != nil { + t.Fatal(err) + } + evs, err := s.ReadChangeLogAfter(ctx, startSeq, 10) + if err != nil { + t.Fatal(err) + } + var updates int + for _, ev := range evs { + if ev.EventType == string(cdc.EventNotificationDeliveryUpdated) { + updates++ + } + } + if updates < 2 { + t.Fatalf("expected claim + failed update CDC events, got %d in %+v", updates, evs) + } +} + +func newDeliveryTestNotification(t *testing.T, dedupe string) (*Store, domain.Notification) { + t.Helper() + s, rec := newNotificationTestSession(t) + row, _, err := s.EnqueueNotification(context.Background(), sampleNotification(rec, dedupe)) + if err != nil { + t.Fatalf("enqueue notification: %v", err) + } + return s, row +} + +func sampleDelivery(ntf domain.Notification, route string) notification.DeliveryRow { + now := time.Now().UTC().Truncate(time.Second) + return notification.DeliveryRow{ + NotificationID: ntf.ID, + NotificationSeq: ntf.Seq, + ProjectID: ntf.ProjectID, + SessionID: ntf.SessionID, + RouteName: route, + Sink: notification.SinkAOApp, + Status: notification.DeliveryQueued, + MaxAttempts: 5, + NextAttemptAt: now, + CreatedAt: now, + UpdatedAt: now, + } +} + +func sampleDueDelivery(ntf domain.Notification, route string, due time.Time) notification.DeliveryRow { + row := sampleDelivery(ntf, route) + row.NextAttemptAt = due + row.CreatedAt = due + row.UpdatedAt = due + return row +} diff --git a/backend/internal/storage/sqlite/notification_store.go b/backend/internal/storage/sqlite/notification_store.go index 90b84331..8e3e1b0e 100644 --- a/backend/internal/storage/sqlite/notification_store.go +++ b/backend/internal/storage/sqlite/notification_store.go @@ -238,5 +238,8 @@ func notificationFromGen(r gen.Notification) (NotificationRow, error) { if r.ArchivedAt.Valid { row.ArchivedAt = r.ArchivedAt.Time } + if r.RoutedAt.Valid { + row.RoutedAt = r.RoutedAt.Time + } return row, nil } diff --git a/backend/internal/storage/sqlite/queries/notification_deliveries.sql b/backend/internal/storage/sqlite/queries/notification_deliveries.sql new file mode 100644 index 00000000..02f403af --- /dev/null +++ b/backend/internal/storage/sqlite/queries/notification_deliveries.sql @@ -0,0 +1,46 @@ +-- name: ListUnroutedNotifications :many +SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at +FROM notifications +WHERE routed_at IS NULL +ORDER BY seq ASC +LIMIT ?; + +-- name: MarkNotificationRouted :exec +UPDATE notifications +SET routed_at = COALESCE(routed_at, ?), + updated_at = CASE WHEN routed_at IS NULL THEN ? ELSE updated_at END +WHERE id = ?; + +-- name: InsertNotificationDelivery :one +INSERT INTO notification_deliveries ( + id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(notification_id, route_name, destination_key) DO NOTHING +RETURNING id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at; + +-- name: GetNotificationDelivery :one +SELECT id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +FROM notification_deliveries +WHERE id = ?; + +-- name: GetNotificationDeliveryByUnique :one +SELECT id, notification_id, notification_seq, project_id, session_id, + route_name, sink, destination_key, request_json, + status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, + last_error_code, last_error, external_id, + created_at, updated_at, delivered_at +FROM notification_deliveries +WHERE notification_id = ? AND route_name = ? AND destination_key = ?; diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql index a896b43c..96732bf5 100644 --- a/backend/internal/storage/sqlite/queries/notifications.sql +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -5,28 +5,28 @@ INSERT INTO notifications ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (dedupe_key) DO NOTHING RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; -- name: GetNotification :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE id = ?; -- name: GetNotificationByDedupeKey :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE dedupe_key = ?; -- name: ListNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications ORDER BY seq DESC LIMIT ?; -- name: ListNotificationsByProject :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE project_id = ? ORDER BY seq DESC @@ -34,7 +34,7 @@ LIMIT ?; -- name: ListNotificationsBySession :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE session_id = ? ORDER BY seq DESC @@ -42,7 +42,7 @@ LIMIT ?; -- name: ListUnreadNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at FROM notifications WHERE read_at IS NULL AND archived_at IS NULL ORDER BY seq DESC @@ -53,18 +53,18 @@ UPDATE notifications SET read_at = ?, updated_at = ? WHERE id = ? AND read_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; -- name: MarkNotificationUnread :one UPDATE notifications SET read_at = NULL, updated_at = ? WHERE id = ? AND read_at IS NOT NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; -- name: ArchiveNotification :one UPDATE notifications SET archived_at = ?, updated_at = ? WHERE id = ? AND archived_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at;