diff --git a/backend/internal/cdc/event.go b/backend/internal/cdc/event.go index 35576cf6..5d37f47e 100644 --- a/backend/internal/cdc/event.go +++ b/backend/internal/cdc/event.go @@ -18,15 +18,13 @@ import ( type EventType string const ( - EventSessionCreated EventType = "session_created" - EventSessionUpdated EventType = "session_updated" - EventPRCreated EventType = "pr_created" - EventPRUpdated EventType = "pr_updated" - EventPRCheckRecorded EventType = "pr_check_recorded" - EventNotificationCreated EventType = "notification_created" - EventNotificationUpdated EventType = "notification_updated" - EventNotificationDeliveryCreated EventType = "notification_delivery_created" - EventNotificationDeliveryUpdated EventType = "notification_delivery_updated" + EventSessionCreated EventType = "session_created" + EventSessionUpdated EventType = "session_updated" + EventPRCreated EventType = "pr_created" + EventPRUpdated EventType = "pr_updated" + EventPRCheckRecorded EventType = "pr_check_recorded" + EventNotificationCreated EventType = "notification_created" + EventNotificationUpdated EventType = "notification_updated" ) // Event is one CDC change read from change_log. Seq is the monotonic ordering + diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 62eb41a2..719e7524 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -11,8 +11,6 @@ import ( "path/filepath" "strconv" "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) const ( @@ -52,46 +50,6 @@ type Config struct { // DataDir is the directory holding durable state (the SQLite database and // the CDC JSONL log). It is created on first use by the storage layer. DataDir string - // Notifications controls the central notifier runtime. The dashboard is the - // durable notifications table itself; desktop delivery is handed off to the - // AO Electron app via notification_deliveries rows. - Notifications NotificationConfig -} - -// NotificationConfig contains the global notification settings used by the -// central notifier runtime. It intentionally starts global (not per-project) so -// the routing model can grow without changing lifecycle reactions. -type NotificationConfig struct { - Enabled bool - Dashboard DashboardNotificationConfig - Desktop DesktopNotificationConfig - Routing NotificationRoutingConfig - Retry NotificationRetryConfig -} - -type DashboardNotificationConfig struct { - Enabled bool - Limit int -} - -type DesktopNotificationConfig struct { - Enabled bool - Priorities []ports.Priority - SoundPriorities []ports.Priority -} - -type NotificationRoutingConfig struct { - // Priorities maps notification priority to built-in route names. The - // notifier currently implements dashboard and desktop only. - Priorities map[ports.Priority][]string -} - -type NotificationRetryConfig struct { - MaxAttempts int - BaseDelay time.Duration - MaxDelay time.Duration - LeaseTTL time.Duration - BatchSize int } // Addr returns the host:port the HTTP server binds. It uses net.JoinHostPort so @@ -119,7 +77,6 @@ func Load() (Config, error) { Port: DefaultPort, RequestTimeout: DefaultRequestTimeout, ShutdownTimeout: DefaultShutdownTimeout, - Notifications: DefaultNotificationConfig(), } if raw := os.Getenv("AO_PORT"); raw != "" { @@ -164,35 +121,6 @@ func Load() (Config, error) { return cfg, nil } -// DefaultNotificationConfig returns the safe zero-setup notification settings. -func DefaultNotificationConfig() NotificationConfig { - return NotificationConfig{ - Enabled: true, - Dashboard: DashboardNotificationConfig{ - Enabled: true, - Limit: 50, - }, - Desktop: DesktopNotificationConfig{ - Enabled: true, - Priorities: []ports.Priority{ports.PriorityUrgent, ports.PriorityAction}, - SoundPriorities: []ports.Priority{ports.PriorityUrgent}, - }, - Routing: NotificationRoutingConfig{Priorities: map[ports.Priority][]string{ - ports.PriorityUrgent: []string{"dashboard", "desktop"}, - ports.PriorityAction: []string{"dashboard", "desktop"}, - ports.PriorityWarning: []string{"dashboard"}, - ports.PriorityInfo: []string{"dashboard"}, - }}, - Retry: NotificationRetryConfig{ - MaxAttempts: 5, - BaseDelay: time.Second, - MaxDelay: 5 * time.Minute, - LeaseTTL: 30 * time.Second, - BatchSize: 50, - }, - } -} - // parsePositiveDuration rejects zero and negative durations: a zero // RequestTimeout would expire every request instantly, and a non-positive // ShutdownTimeout would defeat graceful shutdown. diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 88f0d927..dfcb5b8a 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -3,8 +3,6 @@ package config import ( "testing" "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) func TestLoadDefaults(t *testing.T) { @@ -33,18 +31,6 @@ func TestLoadDefaults(t *testing.T) { if cfg.RunFilePath == "" { t.Error("RunFilePath is empty, want a resolved default path") } - if !cfg.Notifications.Enabled || !cfg.Notifications.Dashboard.Enabled || !cfg.Notifications.Desktop.Enabled { - t.Fatalf("notification defaults should be enabled: %+v", cfg.Notifications) - } - if cfg.Notifications.Dashboard.Limit != 50 { - t.Fatalf("dashboard limit = %d, want 50", cfg.Notifications.Dashboard.Limit) - } - if got := cfg.Notifications.Routing.Priorities[ports.PriorityUrgent]; len(got) != 2 || got[0] != "dashboard" || got[1] != "desktop" { - t.Fatalf("urgent routes = %v, want dashboard+desktop", got) - } - if cfg.Notifications.Retry.MaxAttempts != 5 || cfg.Notifications.Retry.LeaseTTL != 30*time.Second { - t.Fatalf("retry defaults = %+v", cfg.Notifications.Retry) - } } func TestLoadOverrides(t *testing.T) { diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index fd426e7c..556fe5f0 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -61,8 +61,6 @@ func Run() error { return err } - notifier := startNotifier(ctx, cfg, store, log) - // Terminal streaming: the tmux runtime supplies the PTY-attach command and // liveness; the CDC broadcaster feeds the session-state channel. The manager // is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow @@ -73,11 +71,6 @@ func Run() error { srv, err := httpd.New(cfg, log, termMgr) if err != nil { - stop() - notifier.Stop() - if cdcErr := cdcPipe.Stop(); cdcErr != nil { - log.Error("cdc pipeline shutdown", "err", cdcErr) - } return err } @@ -86,11 +79,6 @@ func Run() error { // trigger -> change_log -> poller -> broadcaster. lcStack, err := startLifecycle(ctx, store, log) if err != nil { - stop() - notifier.Stop() - if cdcErr := cdcPipe.Stop(); cdcErr != nil { - log.Error("cdc pipeline shutdown", "err", cdcErr) - } return err } @@ -110,7 +98,6 @@ func Run() error { // the LIFO trap (see comment after srv.Run), hence explicit. stop() lcStack.Stop() - notifier.Stop() if cdcErr := cdcPipe.Stop(); cdcErr != nil { log.Error("cdc pipeline shutdown", "err", cdcErr) } @@ -126,7 +113,6 @@ func Run() error { // runs before the cancel — which would hang any non-signal exit path. stop() lcStack.Stop() - notifier.Stop() if err := cdcPipe.Stop(); err != nil { log.Error("cdc pipeline shutdown", "err", err) } diff --git a/backend/internal/daemon/notifier_wiring.go b/backend/internal/daemon/notifier_wiring.go deleted file mode 100644 index 5864b1e9..00000000 --- a/backend/internal/daemon/notifier_wiring.go +++ /dev/null @@ -1,28 +0,0 @@ -package daemon - -import ( - "context" - "log/slog" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/notification" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" -) - -type notifierStack struct { - Manager *notification.Manager - done <-chan struct{} -} - -func startNotifier(ctx context.Context, cfg config.Config, store *sqlite.Store, log *slog.Logger) *notifierStack { - mgr := notification.NewManager(store, notification.SettingsFromConfig(cfg), log) - done := mgr.Start(ctx) - return ¬ifierStack{Manager: mgr, done: done} -} - -func (s *notifierStack) Stop() { - if s == nil || s.done == nil { - return - } - <-s.done -} diff --git a/backend/internal/domain/notification.go b/backend/internal/domain/notification.go index 8af49550..8c64c9bc 100644 --- a/backend/internal/domain/notification.go +++ b/backend/internal/domain/notification.go @@ -27,7 +27,6 @@ type Notification struct { CauseKey string ReadAt time.Time ArchivedAt time.Time - RoutedAt time.Time CreatedAt time.Time UpdatedAt time.Time } diff --git a/backend/internal/integration/notification_runtime_test.go b/backend/internal/integration/notification_runtime_test.go deleted file mode 100644 index c837bf29..00000000 --- a/backend/internal/integration/notification_runtime_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package integration - -import ( - "context" - "encoding/json" - "io" - "log/slog" - "testing" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/notification" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" -) - -func TestNotificationRuntimeRoutesDesktopEligiblePriorities(t *testing.T) { - t.Parallel() - ctx := context.Background() - store, err := sqlite.Open(t.TempDir()) - if err != nil { - t.Fatal(err) - } - defer store.Close() - seedProject(t, store, "mer") - rec, err := store.CreateSession(ctx, durableRecord("mer", "MER-55", "feat/notifier")) - if err != nil { - t.Fatal(err) - } - - urgent := enqueueRuntimeNotification(t, store, rec, "urgent", "urgent") - action := enqueueRuntimeNotification(t, store, rec, "action", "action") - info := enqueueRuntimeNotification(t, store, rec, "info", "info") - - mgr := notification.NewManager(store, notification.StaticSettings(config.DefaultNotificationConfig()), slog.New(slog.NewTextHandler(io.Discard, nil))) - routed, err := mgr.RoutePending(ctx, 50) - if err != nil { - t.Fatal(err) - } - if routed != 3 { - t.Fatalf("routed = %d, want 3", routed) - } - - for _, ntf := range []domain.Notification{urgent, action} { - rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(ntf.ID), Limit: 10}) - if err != nil { - t.Fatal(err) - } - if len(rows) != 1 || rows[0].Sink != notification.SinkAOApp || rows[0].RouteName != notification.RouteDesktop { - t.Fatalf("%s should have one AO-app desktop delivery, got %+v", ntf.Priority, rows) - } - } - rows, err := store.ListDeliveries(ctx, sqlite.DeliveryFilter{NotificationID: string(info.ID), Limit: 10}) - if err != nil { - t.Fatal(err) - } - if len(rows) != 0 { - t.Fatalf("info should remain dashboard/read-model only, got deliveries %+v", rows) - } -} - -func enqueueRuntimeNotification(t *testing.T, store *sqlite.Store, rec domain.SessionRecord, priority, dedupe string) domain.Notification { - t.Helper() - now := time.Now().UTC().Truncate(time.Second) - row, _, err := store.EnqueueNotification(context.Background(), domain.Notification{ - ProjectID: rec.ProjectID, - SessionID: rec.ID, - Source: "lifecycle", - EventType: "reaction.test", - SemanticType: "test." + priority, - Priority: priority, - Message: "test " + priority, - Payload: json.RawMessage(`{}`), - DedupeKey: "runtime-" + dedupe, - CreatedAt: now, - UpdatedAt: now, - }) - if err != nil { - t.Fatalf("enqueue notification: %v", err) - } - return row -} diff --git a/backend/internal/notification/delivery.go b/backend/internal/notification/delivery.go deleted file mode 100644 index c7fa0311..00000000 --- a/backend/internal/notification/delivery.go +++ /dev/null @@ -1,118 +0,0 @@ -package notification - -import ( - "crypto/rand" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/domain" -) - -var ErrDeliveryUpdateConflict = errors.New("notification delivery update conflict") - -const ( - RouteDashboard = "dashboard" - RouteDesktop = "desktop" - - SinkAOApp = "ao-app" - SinkUnknown = "unknown" -) - -type DeliveryStatus string - -const ( - DeliveryQueued DeliveryStatus = "queued" - DeliveryLeased DeliveryStatus = "leased" - DeliverySent DeliveryStatus = "sent" - DeliveryRetryWait DeliveryStatus = "retry_wait" - DeliveryFailed DeliveryStatus = "failed" - DeliverySkipped DeliveryStatus = "skipped" - DeliveryCancelled DeliveryStatus = "cancelled" -) - -// DeliveryRow is the durable handoff state for one notification route. The -// backend creates AO-app rows; Electron claims them later and reports success or -// failure. External sinks can use the same shape in future issues. -type DeliveryRow struct { - ID string - NotificationID domain.NotificationID - NotificationSeq int64 - ProjectID domain.ProjectID - SessionID domain.SessionID - - RouteName string - Sink string - DestinationKey string - RequestJSON json.RawMessage - - Status DeliveryStatus - Attempts int - MaxAttempts int - NextAttemptAt time.Time - LeaseOwner string - // LeaseExpiresAt is zero when the row is not leased. - LeaseExpiresAt time.Time - - LastErrorCode string - LastError string - ExternalID string - - CreatedAt time.Time - UpdatedAt time.Time - DeliveredAt time.Time -} - -func NewDeliveryID() (string, error) { - var b [16]byte - if _, err := rand.Read(b[:]); err != nil { - return "", fmt.Errorf("generate delivery id: %w", err) - } - return "del_" + hex.EncodeToString(b[:]), nil -} - -func NormalizeDelivery(row DeliveryRow, now time.Time, maxAttempts int) (DeliveryRow, error) { - if row.ID == "" { - id, err := NewDeliveryID() - if err != nil { - return DeliveryRow{}, err - } - row.ID = id - } - if len(row.RequestJSON) == 0 { - row.RequestJSON = json.RawMessage(`{}`) - } - if !json.Valid(row.RequestJSON) { - return DeliveryRow{}, fmt.Errorf("invalid delivery request JSON for %s", row.ID) - } - if row.Status == "" { - row.Status = DeliveryQueued - } - if row.MaxAttempts <= 0 { - row.MaxAttempts = maxAttempts - } - if row.MaxAttempts <= 0 { - row.MaxAttempts = 1 - } - if row.NextAttemptAt.IsZero() { - row.NextAttemptAt = now - } - if row.CreatedAt.IsZero() { - row.CreatedAt = now - } - if row.UpdatedAt.IsZero() { - row.UpdatedAt = row.CreatedAt - } - return row, nil -} - -func TerminalStatus(s DeliveryStatus) bool { - switch s { - case DeliverySent, DeliveryFailed, DeliverySkipped, DeliveryCancelled: - return true - default: - return false - } -} diff --git a/backend/internal/notification/dispatcher.go b/backend/internal/notification/dispatcher.go deleted file mode 100644 index 19d0cbf5..00000000 --- a/backend/internal/notification/dispatcher.go +++ /dev/null @@ -1,36 +0,0 @@ -package notification - -import ( - "context" - "time" -) - -func startDispatcher(ctx context.Context, m *Manager) <-chan struct{} { - done := make(chan struct{}) - go func() { - defer close(done) - runDispatcherOnce(ctx, m) - - interval := m.interval - if interval <= 0 { - interval = time.Second - } - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - runDispatcherOnce(ctx, m) - } - } - }() - return done -} - -func runDispatcherOnce(ctx context.Context, m *Manager) { - if err := m.RunOnce(ctx); err != nil { - m.logger.ErrorContext(ctx, "notification dispatcher tick", "err", err) - } -} diff --git a/backend/internal/notification/dispatcher_test.go b/backend/internal/notification/dispatcher_test.go deleted file mode 100644 index 34b7b23e..00000000 --- a/backend/internal/notification/dispatcher_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package notification - -import ( - "context" - "errors" - "io" - "log/slog" - "sync" - "testing" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" -) - -type fakeRuntimeStore struct { - mu sync.Mutex - unrouted []domain.Notification - deliveries []DeliveryRow - byID map[string]DeliveryRow - routed []domain.NotificationID - releases int - retryNext time.Time - failEnqueue map[domain.NotificationID]error -} - -func (f *fakeRuntimeStore) ListUnroutedNotifications(context.Context, int) ([]domain.Notification, error) { - f.mu.Lock() - defer f.mu.Unlock() - return append([]domain.Notification(nil), f.unrouted...), nil -} - -func (f *fakeRuntimeStore) MarkNotificationRouted(_ context.Context, id domain.NotificationID, _ time.Time) error { - f.mu.Lock() - defer f.mu.Unlock() - f.routed = append(f.routed, id) - return nil -} - -func (f *fakeRuntimeStore) EnqueueDelivery(_ context.Context, row DeliveryRow) (DeliveryRow, bool, error) { - f.mu.Lock() - defer f.mu.Unlock() - if err := f.failEnqueue[row.NotificationID]; err != nil { - return DeliveryRow{}, false, err - } - f.deliveries = append(f.deliveries, row) - return row, true, nil -} - -func (f *fakeRuntimeStore) GetDelivery(_ context.Context, id string) (DeliveryRow, bool, error) { - f.mu.Lock() - defer f.mu.Unlock() - row, ok := f.byID[id] - return row, ok, nil -} - -func (f *fakeRuntimeStore) ClaimDueDeliveries(context.Context, string, string, time.Time, int, time.Duration) ([]DeliveryRow, error) { - return nil, nil -} -func (f *fakeRuntimeStore) ReleaseExpiredDeliveryLeases(context.Context, time.Time) (int, error) { - f.mu.Lock() - defer f.mu.Unlock() - f.releases++ - return 0, nil -} -func (f *fakeRuntimeStore) MarkDeliverySent(context.Context, string, string, string, time.Time) error { - return nil -} -func (f *fakeRuntimeStore) MarkDeliveryRetry(_ context.Context, _ string, _ string, _ string, _ string, next time.Time, _ time.Time) error { - f.mu.Lock() - defer f.mu.Unlock() - f.retryNext = next - return nil -} -func (f *fakeRuntimeStore) MarkDeliveryFailed(context.Context, string, string, string, string, time.Time) error { - return nil -} -func (f *fakeRuntimeStore) MarkDeliverySkipped(context.Context, string, string, time.Time) error { - return nil -} - -func TestDispatcherStartReleasesAndStops(t *testing.T) { - store := &fakeRuntimeStore{} - mgr := NewManager(store, StaticSettings(config.DefaultNotificationConfig()), discardLogger()) - mgr.interval = 10 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - done := mgr.Start(ctx) - - deadline := time.After(time.Second) - for { - store.mu.Lock() - released := store.releases - store.mu.Unlock() - if released > 0 { - break - } - select { - case <-deadline: - t.Fatal("dispatcher did not run initial release") - case <-time.After(time.Millisecond): - } - } - cancel() - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("dispatcher did not stop after context cancel") - } -} - -func TestRoutePendingDeliveryFailureDoesNotBlockOtherNotifications(t *testing.T) { - n1 := sampleDomainNotification("ntf_1", "urgent") - n2 := sampleDomainNotification("ntf_2", "urgent") - store := &fakeRuntimeStore{ - unrouted: []domain.Notification{n1, n2}, - failEnqueue: map[domain.NotificationID]error{n1.ID: errors.New("boom")}, - } - mgr := NewManager(store, StaticSettings(config.DefaultNotificationConfig()), discardLogger()) - - routed, err := mgr.RoutePending(context.Background(), 10) - if err == nil { - t.Fatal("RoutePending should return the first routing error") - } - if routed != 1 { - t.Fatalf("routed = %d, want one successful notification", routed) - } - store.mu.Lock() - defer store.mu.Unlock() - if len(store.routed) != 1 || store.routed[0] != n2.ID { - t.Fatalf("routed IDs = %v, want only %s", store.routed, n2.ID) - } -} - -func TestMarkDeliveryErrorUsesAttemptAwareBackoff(t *testing.T) { - now := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) - cfg := config.DefaultNotificationConfig() - cfg.Retry.BaseDelay = time.Second - cfg.Retry.MaxDelay = time.Minute - store := &fakeRuntimeStore{byID: map[string]DeliveryRow{ - "del_1": {ID: "del_1", Attempts: 2, MaxAttempts: 5}, - }} - mgr := NewManager(store, StaticSettings(cfg), discardLogger()) - mgr.clock = func() time.Time { return now } - - if err := mgr.MarkDeliveryError(context.Background(), "del_1", "electron", "timeout", "timed out"); err != nil { - t.Fatal(err) - } - store.mu.Lock() - next := store.retryNext - store.mu.Unlock() - delay := next.Sub(now) - if delay < 3200*time.Millisecond || delay > 4800*time.Millisecond { - t.Fatalf("retry delay for third attempt = %s, want jittered 4s backoff", delay) - } -} - -func sampleDomainNotification(id domain.NotificationID, priority string) domain.Notification { - return domain.Notification{ - Seq: 1, - ID: id, - ProjectID: "ao", - SessionID: "ao-1", - Priority: priority, - Message: "hello", - } -} - -func discardLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) -} diff --git a/backend/internal/notification/enqueuer.go b/backend/internal/notification/enqueuer.go index 59538305..79e902bf 100644 --- a/backend/internal/notification/enqueuer.go +++ b/backend/internal/notification/enqueuer.go @@ -8,23 +8,23 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) -// EnqueueStore is the durable write-side used by the enqueuer. *sqlite.Store -// satisfies this interface. -type EnqueueStore interface { +// Store is the durable write-side used by the enqueuer. *sqlite.Store satisfies +// this interface. +type Store interface { EnqueueNotification(ctx context.Context, row domain.Notification) (domain.Notification, bool, error) } // Enqueuer is a store-backed ports.Notifier. It does not deliver to external // sinks; it renders and persists the notification for later dashboard/app sinks. type Enqueuer struct { - store EnqueueStore + store Store renderer *Renderer logger *slog.Logger } var _ ports.Notifier = (*Enqueuer)(nil) -func NewEnqueuer(store EnqueueStore, renderer *Renderer, logger *slog.Logger) *Enqueuer { +func NewEnqueuer(store Store, renderer *Renderer, logger *slog.Logger) *Enqueuer { if logger == nil { logger = slog.Default() } diff --git a/backend/internal/notification/manager.go b/backend/internal/notification/manager.go deleted file mode 100644 index 235f893e..00000000 --- a/backend/internal/notification/manager.go +++ /dev/null @@ -1,157 +0,0 @@ -package notification - -import ( - "context" - "errors" - "fmt" - "log/slog" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -type Manager struct { - store Store - settings SettingsProvider - clock func() time.Time - logger *slog.Logger - - interval time.Duration -} - -func NewManager(store Store, settings SettingsProvider, logger *slog.Logger) *Manager { - if logger == nil { - logger = slog.Default() - } - if settings == nil { - settings = StaticSettings(config.DefaultNotificationConfig()) - } - return &Manager{ - store: store, - settings: settings, - clock: time.Now, - logger: logger, - interval: time.Second, - } -} - -func (m *Manager) Start(ctx context.Context) <-chan struct{} { - return startDispatcher(ctx, m) -} - -// RunOnce performs one maintenance/routing pass. It is exposed for tests and -// for future API-triggered nudges; Start calls it on every dispatcher tick. -func (m *Manager) RunOnce(ctx context.Context) error { - settings := m.settings.Settings(ctx) - policy := RetryPolicyFromConfig(settings.Retry) - now := m.clock().UTC() - - if released, err := m.store.ReleaseExpiredDeliveryLeases(ctx, now); err != nil { - return err - } else if released > 0 { - m.logger.DebugContext(ctx, "notification delivery leases released", "count", released) - } - - _, err := m.RoutePending(ctx, policy.BatchSize) - return err -} - -func (m *Manager) RoutePending(ctx context.Context, limit int) (int, error) { - if limit <= 0 { - limit = RetryPolicyFromConfig(m.settings.Settings(ctx).Retry).BatchSize - } - rows, err := m.store.ListUnroutedNotifications(ctx, limit) - if err != nil { - return 0, err - } - var firstErr error - var routed int - for _, row := range rows { - if err := m.RouteNotification(ctx, row); err != nil { - m.logger.ErrorContext(ctx, "route notification", "notification", row.ID, "err", err) - if firstErr == nil { - firstErr = err - } else { - firstErr = errors.Join(firstErr, err) - } - continue - } - routed++ - } - return routed, firstErr -} - -func (m *Manager) RouteNotification(ctx context.Context, row domain.Notification) error { - settings := m.settings.Settings(ctx) - now := m.clock().UTC() - if !settings.Enabled || !row.ArchivedAt.IsZero() { - return m.store.MarkNotificationRouted(ctx, row.ID, now) - } - - decisions := ResolveRoutes(settings, ports.Priority(row.Priority)) - maxAttempts := RetryPolicyFromConfig(settings.Retry).MaxAttempts - for _, decision := range decisions { - if !decision.CreateDelivery { - continue - } - delivery := DeliveryRow{ - NotificationID: row.ID, - NotificationSeq: row.Seq, - ProjectID: row.ProjectID, - SessionID: row.SessionID, - RouteName: decision.RouteName, - Sink: decision.Sink, - DestinationKey: decision.DestinationKey, - Status: decision.Status, - MaxAttempts: maxAttempts, - NextAttemptAt: now, - CreatedAt: now, - UpdatedAt: now, - } - if delivery.Status == "" { - delivery.Status = DeliveryQueued - } - if decision.Reason != "" { - delivery.LastErrorCode = "route_skipped" - delivery.LastError = decision.Reason - } - if _, _, err := m.store.EnqueueDelivery(ctx, delivery); err != nil { - return err - } - } - return m.store.MarkNotificationRouted(ctx, row.ID, now) -} - -func (m *Manager) ClaimDesktopDeliveries(ctx context.Context, owner string, limit int) ([]DeliveryRow, error) { - settings := m.settings.Settings(ctx) - policy := RetryPolicyFromConfig(settings.Retry) - return m.store.ClaimDueDeliveries(ctx, SinkAOApp, owner, m.clock().UTC(), limit, policy.LeaseTTL) -} - -func (m *Manager) MarkDeliverySent(ctx context.Context, id, owner, externalID string) error { - return m.store.MarkDeliverySent(ctx, id, owner, externalID, m.clock().UTC()) -} - -func (m *Manager) MarkDeliveryError(ctx context.Context, id, owner, code, message string) error { - settings := m.settings.Settings(ctx) - policy := RetryPolicyFromConfig(settings.Retry) - now := m.clock().UTC() - // The store is the source of truth for attempts/max-attempt terminal - // handling. Permanent classification short-circuits to failed; otherwise we - // fetch the current delivery attempts and provide the attempt-aware next - // retry timestamp for retry_wait rows. - if ClassifyError(code) == ErrorPermanent { - return m.store.MarkDeliveryFailed(ctx, id, owner, code, message, now) - } - row, ok, err := m.store.GetDelivery(ctx, id) - if err != nil { - return err - } - if !ok { - return fmt.Errorf("notification delivery %s not found", id) - } - nextAttemptNo := row.Attempts + 1 - return m.store.MarkDeliveryRetry(ctx, id, owner, code, message, policy.NextAttemptAt(now, nextAttemptNo), now) -} diff --git a/backend/internal/notification/retry.go b/backend/internal/notification/retry.go deleted file mode 100644 index 72c54775..00000000 --- a/backend/internal/notification/retry.go +++ /dev/null @@ -1,116 +0,0 @@ -package notification - -import ( - crand "crypto/rand" - "encoding/binary" - "math" - "strings" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" -) - -const retryJitterFraction = 0.20 - -type RetryPolicy struct { - MaxAttempts int - BaseDelay time.Duration - MaxDelay time.Duration - LeaseTTL time.Duration - BatchSize int - Jitter float64 - RandFloat64 func() float64 -} - -func RetryPolicyFromConfig(cfg config.NotificationRetryConfig) RetryPolicy { - settings := NormalizeSettings(config.NotificationConfig{Enabled: true, Retry: cfg}) - return RetryPolicy{ - MaxAttempts: settings.Retry.MaxAttempts, - BaseDelay: settings.Retry.BaseDelay, - MaxDelay: settings.Retry.MaxDelay, - LeaseTTL: settings.Retry.LeaseTTL, - BatchSize: settings.Retry.BatchSize, - Jitter: retryJitterFraction, - RandFloat64: cryptoRandFloat64, - } -} - -func (p RetryPolicy) normalized() RetryPolicy { - cfg := config.NotificationRetryConfig{ - MaxAttempts: p.MaxAttempts, - BaseDelay: p.BaseDelay, - MaxDelay: p.MaxDelay, - LeaseTTL: p.LeaseTTL, - BatchSize: p.BatchSize, - } - out := RetryPolicyFromConfig(cfg) - if p.Jitter != 0 { - out.Jitter = p.Jitter - } - if p.RandFloat64 != nil { - out.RandFloat64 = p.RandFloat64 - } - return out -} - -// BackoffDelay returns exponential backoff for the already-recorded attempt -// count. attempt=1 returns the base delay; delays are capped before jitter. -func (p RetryPolicy) BackoffDelay(attempt int) time.Duration { - p = p.normalized() - if attempt < 1 { - attempt = 1 - } - mult := math.Pow(2, float64(attempt-1)) - delay := time.Duration(float64(p.BaseDelay) * mult) - if delay > p.MaxDelay || delay <= 0 { - delay = p.MaxDelay - } - if p.Jitter <= 0 { - return delay - } - randFloat := p.RandFloat64 - if randFloat == nil { - randFloat = cryptoRandFloat64 - } - // rand in [0,1) -> factor in [1-jitter, 1+jitter) - factor := 1 - p.Jitter + (2 * p.Jitter * randFloat()) - return time.Duration(float64(delay) * factor) -} - -func cryptoRandFloat64() float64 { - var b [8]byte - if _, err := crand.Read(b[:]); err != nil { - // Fall back to a time-derived value only if the OS CSPRNG fails. The - // fallback still avoids math/rand's deterministic process seed. - return float64(time.Now().UnixNano()&((1<<53)-1)) / float64(1<<53) - } - // Match math/rand.Float64's 53 bits of precision in [0,1). - return float64(binary.BigEndian.Uint64(b[:])>>11) / float64(1<<53) -} - -func (p RetryPolicy) NextAttemptAt(now time.Time, attempt int) time.Time { - return now.Add(p.BackoffDelay(attempt)) -} - -type ErrorClass string - -const ( - ErrorTransient ErrorClass = "transient" - ErrorPermanent ErrorClass = "permanent" -) - -func ClassifyError(code string) ErrorClass { - switch strings.ToLower(strings.TrimSpace(code)) { - case "permanent", "invalid_request", "bad_request", "unauthorized", "forbidden", "not_found", "unsupported_route", "route_disabled": - return ErrorPermanent - default: - return ErrorTransient - } -} - -func ShouldRetry(code string, attempts, maxAttempts int) bool { - if maxAttempts <= 0 { - maxAttempts = 1 - } - return ClassifyError(code) != ErrorPermanent && attempts < maxAttempts -} diff --git a/backend/internal/notification/retry_test.go b/backend/internal/notification/retry_test.go deleted file mode 100644 index c6d37155..00000000 --- a/backend/internal/notification/retry_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package notification - -import ( - "testing" - "time" -) - -func TestRetryBackoffExponentialCapped(t *testing.T) { - p := RetryPolicy{ - BaseDelay: time.Second, - MaxDelay: 5 * time.Second, - Jitter: retryJitterFraction, - RandFloat64: func() float64 { return 0.5 }, - } - if got := p.BackoffDelay(1); got != time.Second { - t.Fatalf("attempt 1 delay = %s, want 1s", got) - } - if got := p.BackoffDelay(2); got != 2*time.Second { - t.Fatalf("attempt 2 delay = %s, want 2s", got) - } - if got := p.BackoffDelay(4); got != 5*time.Second { - t.Fatalf("attempt 4 delay = %s, want capped 5s", got) - } -} - -func TestRetryJitterBounds(t *testing.T) { - base := RetryPolicy{BaseDelay: 10 * time.Second, MaxDelay: time.Minute, Jitter: retryJitterFraction} - low := base - low.RandFloat64 = func() float64 { return 0 } - high := base - high.RandFloat64 = func() float64 { return 1 } - - if got := low.BackoffDelay(1); got != 8*time.Second { - t.Fatalf("low jitter = %s, want 8s", got) - } - if got := high.BackoffDelay(1); got != 12*time.Second { - t.Fatalf("high jitter = %s, want 12s", got) - } -} - -func TestErrorClassificationRetry(t *testing.T) { - if ClassifyError("invalid_request") != ErrorPermanent { - t.Fatal("invalid_request should be permanent") - } - if ShouldRetry("invalid_request", 1, 5) { - t.Fatal("permanent errors should not retry") - } - if !ShouldRetry("timeout", 1, 5) { - t.Fatal("transient errors under max attempts should retry") - } - if ShouldRetry("timeout", 5, 5) { - t.Fatal("max attempts should stop retry") - } -} diff --git a/backend/internal/notification/routing.go b/backend/internal/notification/routing.go deleted file mode 100644 index 36a13d9b..00000000 --- a/backend/internal/notification/routing.go +++ /dev/null @@ -1,72 +0,0 @@ -package notification - -import ( - "slices" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -type RouteDecision struct { - RouteName string - Sink string - DestinationKey string - Status DeliveryStatus - Reason string - CreateDelivery bool -} - -// ResolveRoutes resolves the configured built-in routes for one notification. -// Dashboard is a read model over the notifications table, so it is represented -// in the decision list but never creates a delivery row. Unknown explicitly -// configured routes become skipped delivery rows for operator visibility. -func ResolveRoutes(settings config.NotificationConfig, priority ports.Priority) []RouteDecision { - settings = NormalizeSettings(settings) - if !settings.Enabled { - return nil - } - - routes, ok := settings.Routing.Priorities[priority] - if !ok { - return nil - } - out := make([]RouteDecision, 0, len(routes)) - for _, name := range routes { - switch name { - case RouteDashboard: - if settings.Dashboard.Enabled { - out = append(out, RouteDecision{RouteName: RouteDashboard}) - } - case RouteDesktop: - if settings.Desktop.Enabled && priorityAllowed(priority, settings.Desktop.Priorities) { - out = append(out, RouteDecision{ - RouteName: RouteDesktop, - Sink: SinkAOApp, - Status: DeliveryQueued, - CreateDelivery: true, - }) - } - case "": - // Ignore empty route names so a stray trailing separator in future - // config parsing does not create a permanent skipped delivery. - default: - out = append(out, RouteDecision{ - RouteName: name, - Sink: SinkUnknown, - Status: DeliverySkipped, - Reason: "unknown route", - CreateDelivery: true, - }) - } - } - return out -} - -func DesktopEligible(settings config.NotificationConfig, priority ports.Priority) bool { - settings = NormalizeSettings(settings) - return settings.Enabled && settings.Desktop.Enabled && priorityAllowed(priority, settings.Desktop.Priorities) -} - -func priorityAllowed(p ports.Priority, allowed []ports.Priority) bool { - return slices.Contains(allowed, p) -} diff --git a/backend/internal/notification/routing_test.go b/backend/internal/notification/routing_test.go deleted file mode 100644 index 913da8c2..00000000 --- a/backend/internal/notification/routing_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package notification - -import ( - "testing" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -func TestResolveRoutes_Defaults(t *testing.T) { - cfg := config.DefaultNotificationConfig() - tests := []struct { - priority ports.Priority - want []string - }{ - {ports.PriorityUrgent, []string{RouteDashboard, RouteDesktop}}, - {ports.PriorityAction, []string{RouteDashboard, RouteDesktop}}, - {ports.PriorityWarning, []string{RouteDashboard}}, - {ports.PriorityInfo, []string{RouteDashboard}}, - } - for _, tc := range tests { - t.Run(string(tc.priority), func(t *testing.T) { - got := routeNames(ResolveRoutes(cfg, tc.priority)) - if len(got) != len(tc.want) { - t.Fatalf("routes = %v, want %v", got, tc.want) - } - for i := range tc.want { - if got[i] != tc.want[i] { - t.Fatalf("routes = %v, want %v", got, tc.want) - } - } - }) - } -} - -func TestResolveRoutes_DesktopDisabledOrIneligible(t *testing.T) { - cfg := config.DefaultNotificationConfig() - cfg.Desktop.Enabled = false - got := routeNames(ResolveRoutes(cfg, ports.PriorityUrgent)) - if len(got) != 1 || got[0] != RouteDashboard { - t.Fatalf("desktop disabled routes = %v, want dashboard only", got) - } - - cfg = config.DefaultNotificationConfig() - cfg.Routing.Priorities[ports.PriorityInfo] = []string{RouteDashboard, RouteDesktop} - got = routeNames(ResolveRoutes(cfg, ports.PriorityInfo)) - if len(got) != 1 || got[0] != RouteDashboard { - t.Fatalf("info desktop ineligible routes = %v, want dashboard only", got) - } -} - -func TestResolveRoutes_GlobalDisabled(t *testing.T) { - cfg := config.DefaultNotificationConfig() - cfg.Enabled = false - if got := ResolveRoutes(cfg, ports.PriorityUrgent); len(got) != 0 { - t.Fatalf("globally disabled routes = %+v, want none", got) - } -} - -func TestResolveRoutes_ExplicitEmptySuppressesPriority(t *testing.T) { - cfg := config.DefaultNotificationConfig() - cfg.Routing.Priorities[ports.PriorityUrgent] = []string{} - if got := ResolveRoutes(cfg, ports.PriorityUrgent); len(got) != 0 { - t.Fatalf("explicit empty routes = %+v, want none", got) - } -} - -func TestResolveRoutes_UnknownExplicitRouteSkipped(t *testing.T) { - cfg := config.DefaultNotificationConfig() - cfg.Routing.Priorities[ports.PriorityUrgent] = []string{RouteDashboard, "pager"} - got := ResolveRoutes(cfg, ports.PriorityUrgent) - if len(got) != 2 { - t.Fatalf("routes = %+v, want dashboard + skipped unknown", got) - } - unknown := got[1] - if unknown.RouteName != "pager" || unknown.Status != DeliverySkipped || !unknown.CreateDelivery || unknown.Sink != SinkUnknown { - t.Fatalf("unknown route decision = %+v", unknown) - } -} - -func routeNames(routes []RouteDecision) []string { - out := make([]string, len(routes)) - for i, r := range routes { - out[i] = r.RouteName - } - return out -} diff --git a/backend/internal/notification/settings.go b/backend/internal/notification/settings.go deleted file mode 100644 index 457c5dc2..00000000 --- a/backend/internal/notification/settings.go +++ /dev/null @@ -1,89 +0,0 @@ -package notification - -import ( - "context" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -type SettingsProvider interface { - Settings(ctx context.Context) config.NotificationConfig -} - -type staticSettings struct { - cfg config.NotificationConfig -} - -func SettingsFromConfig(cfg config.Config) SettingsProvider { - return staticSettings{cfg: NormalizeSettings(cfg.Notifications)} -} - -func StaticSettings(cfg config.NotificationConfig) SettingsProvider { - return staticSettings{cfg: NormalizeSettings(cfg)} -} - -func (s staticSettings) Settings(context.Context) config.NotificationConfig { - return cloneSettings(s.cfg) -} - -// NormalizeSettings fills unset settings with safe defaults while preserving -// explicit route overrides, including an explicit empty route list. -func NormalizeSettings(in config.NotificationConfig) config.NotificationConfig { - def := config.DefaultNotificationConfig() - out := in - - if out.Dashboard.Limit == 0 { - out.Dashboard.Limit = def.Dashboard.Limit - } - if out.Desktop.Priorities == nil { - out.Desktop.Priorities = append([]ports.Priority(nil), def.Desktop.Priorities...) - } - if out.Desktop.SoundPriorities == nil { - out.Desktop.SoundPriorities = append([]ports.Priority(nil), def.Desktop.SoundPriorities...) - } - if out.Routing.Priorities == nil { - out.Routing.Priorities = cloneRoutes(def.Routing.Priorities) - } else { - merged := cloneRoutes(def.Routing.Priorities) - for p, routes := range out.Routing.Priorities { - merged[p] = append([]string(nil), routes...) - } - out.Routing.Priorities = merged - } - if out.Retry.MaxAttempts == 0 { - out.Retry.MaxAttempts = def.Retry.MaxAttempts - } - if out.Retry.BaseDelay == 0 { - out.Retry.BaseDelay = def.Retry.BaseDelay - } - if out.Retry.MaxDelay == 0 { - out.Retry.MaxDelay = def.Retry.MaxDelay - } - if out.Retry.LeaseTTL == 0 { - out.Retry.LeaseTTL = def.Retry.LeaseTTL - } - if out.Retry.BatchSize == 0 { - out.Retry.BatchSize = def.Retry.BatchSize - } - return cloneSettings(out) -} - -func cloneSettings(in config.NotificationConfig) config.NotificationConfig { - out := in - out.Desktop.Priorities = append([]ports.Priority(nil), in.Desktop.Priorities...) - out.Desktop.SoundPriorities = append([]ports.Priority(nil), in.Desktop.SoundPriorities...) - out.Routing.Priorities = cloneRoutes(in.Routing.Priorities) - return out -} - -func cloneRoutes(in map[ports.Priority][]string) map[ports.Priority][]string { - if in == nil { - return nil - } - out := make(map[ports.Priority][]string, len(in)) - for p, routes := range in { - out[p] = append([]string(nil), routes...) - } - return out -} diff --git a/backend/internal/notification/settings_test.go b/backend/internal/notification/settings_test.go deleted file mode 100644 index 9fb9590f..00000000 --- a/backend/internal/notification/settings_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package notification - -import ( - "context" - "testing" - - "github.com/aoagents/agent-orchestrator/backend/internal/config" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -func TestSettingsFromConfigDefaultsWhenUnset(t *testing.T) { - got := SettingsFromConfig(config.Config{Notifications: config.DefaultNotificationConfig()}).Settings(context.Background()) - if !got.Enabled || !got.Desktop.Enabled || !got.Dashboard.Enabled { - t.Fatalf("default config should resolve safe enabled defaults: %+v", got) - } - if got.Retry.MaxAttempts != 5 || got.Retry.BatchSize != 50 { - t.Fatalf("retry defaults = %+v", got.Retry) - } -} - -func TestSettingsFromConfigPreservesExplicitGlobalDisable(t *testing.T) { - got := SettingsFromConfig(config.Config{Notifications: config.NotificationConfig{Enabled: false}}).Settings(context.Background()) - if got.Enabled { - t.Fatalf("explicit disabled notifications should stay disabled: %+v", got) - } - if got.Retry.MaxAttempts != 5 || got.Routing.Priorities == nil { - t.Fatalf("disabled config should still receive non-global defaults: %+v", got) - } -} - -func TestNormalizeSettingsPreservesExplicitSurfaceDisables(t *testing.T) { - got := StaticSettings(config.NotificationConfig{ - Enabled: true, - Dashboard: config.DashboardNotificationConfig{Enabled: false}, - Desktop: config.DesktopNotificationConfig{Enabled: false}, - }).Settings(context.Background()) - - if !got.Enabled { - t.Fatalf("global notifications should remain enabled: %+v", got) - } - if got.Dashboard.Enabled { - t.Fatalf("explicit dashboard disable should stay disabled: %+v", got.Dashboard) - } - if got.Desktop.Enabled { - t.Fatalf("explicit desktop disable should stay disabled: %+v", got.Desktop) - } - if got.Dashboard.Limit != 50 || len(got.Desktop.Priorities) == 0 || got.Retry.MaxAttempts != 5 { - t.Fatalf("disabled surfaces should still receive non-bool defaults: %+v", got) - } -} - -func TestNormalizeSettingsPreservesExplicitEmptyRoute(t *testing.T) { - cfg := config.DefaultNotificationConfig() - cfg.Routing.Priorities[ports.PriorityUrgent] = []string{} - - got := StaticSettings(cfg).Settings(context.Background()) - if routes := got.Routing.Priorities[ports.PriorityUrgent]; len(routes) != 0 { - t.Fatalf("explicit empty urgent route should be preserved, got %v", routes) - } -} - -func TestSettingsProviderReturnsClone(t *testing.T) { - cfg := config.DefaultNotificationConfig() - provider := StaticSettings(cfg) - first := provider.Settings(context.Background()) - first.Desktop.Priorities[0] = ports.PriorityInfo - first.Routing.Priorities[ports.PriorityUrgent][0] = "mutated" - - second := provider.Settings(context.Background()) - if second.Desktop.Priorities[0] != ports.PriorityUrgent { - t.Fatalf("desktop priorities were mutated through clone: %v", second.Desktop.Priorities) - } - if second.Routing.Priorities[ports.PriorityUrgent][0] != RouteDashboard { - t.Fatalf("routes were mutated through clone: %v", second.Routing.Priorities[ports.PriorityUrgent]) - } -} diff --git a/backend/internal/notification/store.go b/backend/internal/notification/store.go deleted file mode 100644 index e2cdf511..00000000 --- a/backend/internal/notification/store.go +++ /dev/null @@ -1,25 +0,0 @@ -package notification - -import ( - "context" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/domain" -) - -// Store is the central notifier runtime's durable interface. The lifecycle -// enqueuer writes notification rows; this interface routes them into durable -// delivery rows and lets AO-app/API code claim and complete desktop handoffs. -type Store interface { - ListUnroutedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) - MarkNotificationRouted(ctx context.Context, id domain.NotificationID, at time.Time) error - - GetDelivery(ctx context.Context, id string) (DeliveryRow, bool, error) - EnqueueDelivery(ctx context.Context, row DeliveryRow) (DeliveryRow, bool, error) - ClaimDueDeliveries(ctx context.Context, sink string, owner string, now time.Time, limit int, lease time.Duration) ([]DeliveryRow, error) - ReleaseExpiredDeliveryLeases(ctx context.Context, now time.Time) (int, error) - MarkDeliverySent(ctx context.Context, id string, owner string, externalID string, at time.Time) error - MarkDeliveryRetry(ctx context.Context, id string, owner string, errCode string, errMessage string, next time.Time, at time.Time) error - MarkDeliveryFailed(ctx context.Context, id string, owner string, errCode string, errMessage string, at time.Time) error - MarkDeliverySkipped(ctx context.Context, id string, reason string, at time.Time) error -} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index 2887a87e..992c0ca0 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -36,43 +36,6 @@ type Notification struct { ArchivedAt sql.NullTime CreatedAt time.Time UpdatedAt time.Time - RoutedAt sql.NullTime -} - -type NotificationDelivery struct { - ID string - NotificationID string - NotificationSeq int64 - ProjectID string - SessionID string - RouteName string - Sink string - DestinationKey string - RequestJson string - Status string - Attempts int64 - MaxAttempts int64 - NextAttemptAt time.Time - LeaseOwner string - LeaseExpiresAt sql.NullTime - LastErrorCode string - LastError string - ExternalID string - CreatedAt time.Time - UpdatedAt time.Time - DeliveredAt sql.NullTime -} - -type NotificationDeliveryAttempt struct { - ID int64 - DeliveryID string - AttemptNo int64 - Status string - StartedAt time.Time - FinishedAt sql.NullTime - ErrorCode string - Error string - ResponseJson string } type Pr struct { diff --git a/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go b/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go deleted file mode 100644 index ed2821ee..00000000 --- a/backend/internal/storage/sqlite/gen/notification_deliveries.sql.go +++ /dev/null @@ -1,256 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.31.1 -// source: notification_deliveries.sql - -package gen - -import ( - "context" - "database/sql" - "time" -) - -const getNotificationDelivery = `-- name: GetNotificationDelivery :one -SELECT id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -FROM notification_deliveries -WHERE id = ? -` - -func (q *Queries) GetNotificationDelivery(ctx context.Context, id string) (NotificationDelivery, error) { - row := q.db.QueryRowContext(ctx, getNotificationDelivery, id) - var i NotificationDelivery - err := row.Scan( - &i.ID, - &i.NotificationID, - &i.NotificationSeq, - &i.ProjectID, - &i.SessionID, - &i.RouteName, - &i.Sink, - &i.DestinationKey, - &i.RequestJson, - &i.Status, - &i.Attempts, - &i.MaxAttempts, - &i.NextAttemptAt, - &i.LeaseOwner, - &i.LeaseExpiresAt, - &i.LastErrorCode, - &i.LastError, - &i.ExternalID, - &i.CreatedAt, - &i.UpdatedAt, - &i.DeliveredAt, - ) - return i, err -} - -const getNotificationDeliveryByUnique = `-- name: GetNotificationDeliveryByUnique :one -SELECT id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -FROM notification_deliveries -WHERE notification_id = ? AND route_name = ? AND destination_key = ? -` - -type GetNotificationDeliveryByUniqueParams struct { - NotificationID string - RouteName string - DestinationKey string -} - -func (q *Queries) GetNotificationDeliveryByUnique(ctx context.Context, arg GetNotificationDeliveryByUniqueParams) (NotificationDelivery, error) { - row := q.db.QueryRowContext(ctx, getNotificationDeliveryByUnique, arg.NotificationID, arg.RouteName, arg.DestinationKey) - var i NotificationDelivery - err := row.Scan( - &i.ID, - &i.NotificationID, - &i.NotificationSeq, - &i.ProjectID, - &i.SessionID, - &i.RouteName, - &i.Sink, - &i.DestinationKey, - &i.RequestJson, - &i.Status, - &i.Attempts, - &i.MaxAttempts, - &i.NextAttemptAt, - &i.LeaseOwner, - &i.LeaseExpiresAt, - &i.LastErrorCode, - &i.LastError, - &i.ExternalID, - &i.CreatedAt, - &i.UpdatedAt, - &i.DeliveredAt, - ) - return i, err -} - -const insertNotificationDelivery = `-- name: InsertNotificationDelivery :one -INSERT INTO notification_deliveries ( - id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) -ON CONFLICT(notification_id, route_name, destination_key) DO NOTHING -RETURNING id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -` - -type InsertNotificationDeliveryParams struct { - ID string - NotificationID string - NotificationSeq int64 - ProjectID string - SessionID string - RouteName string - Sink string - DestinationKey string - RequestJson string - Status string - Attempts int64 - MaxAttempts int64 - NextAttemptAt time.Time - LeaseOwner string - LeaseExpiresAt sql.NullTime - LastErrorCode string - LastError string - ExternalID string - CreatedAt time.Time - UpdatedAt time.Time - DeliveredAt sql.NullTime -} - -func (q *Queries) InsertNotificationDelivery(ctx context.Context, arg InsertNotificationDeliveryParams) (NotificationDelivery, error) { - row := q.db.QueryRowContext(ctx, insertNotificationDelivery, - arg.ID, - arg.NotificationID, - arg.NotificationSeq, - arg.ProjectID, - arg.SessionID, - arg.RouteName, - arg.Sink, - arg.DestinationKey, - arg.RequestJson, - arg.Status, - arg.Attempts, - arg.MaxAttempts, - arg.NextAttemptAt, - arg.LeaseOwner, - arg.LeaseExpiresAt, - arg.LastErrorCode, - arg.LastError, - arg.ExternalID, - arg.CreatedAt, - arg.UpdatedAt, - arg.DeliveredAt, - ) - var i NotificationDelivery - err := row.Scan( - &i.ID, - &i.NotificationID, - &i.NotificationSeq, - &i.ProjectID, - &i.SessionID, - &i.RouteName, - &i.Sink, - &i.DestinationKey, - &i.RequestJson, - &i.Status, - &i.Attempts, - &i.MaxAttempts, - &i.NextAttemptAt, - &i.LeaseOwner, - &i.LeaseExpiresAt, - &i.LastErrorCode, - &i.LastError, - &i.ExternalID, - &i.CreatedAt, - &i.UpdatedAt, - &i.DeliveredAt, - ) - return i, err -} - -const listUnroutedNotifications = `-- name: ListUnroutedNotifications :many -SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at -FROM notifications -WHERE routed_at IS NULL -ORDER BY seq ASC -LIMIT ? -` - -func (q *Queries) ListUnroutedNotifications(ctx context.Context, limit int64) ([]Notification, error) { - rows, err := q.db.QueryContext(ctx, listUnroutedNotifications, limit) - if err != nil { - return nil, err - } - defer rows.Close() - items := []Notification{} - for rows.Next() { - var i Notification - if err := rows.Scan( - &i.Seq, - &i.ID, - &i.ProjectID, - &i.SessionID, - &i.Source, - &i.EventType, - &i.SemanticType, - &i.Priority, - &i.Message, - &i.PayloadJson, - &i.ActionsJson, - &i.DedupeKey, - &i.CauseKey, - &i.ReadAt, - &i.ArchivedAt, - &i.CreatedAt, - &i.UpdatedAt, - &i.RoutedAt, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const markNotificationRouted = `-- name: MarkNotificationRouted :exec -UPDATE notifications -SET routed_at = COALESCE(routed_at, ?), - updated_at = CASE WHEN routed_at IS NULL THEN ? ELSE updated_at END -WHERE id = ? -` - -type MarkNotificationRoutedParams struct { - RoutedAt sql.NullTime - UpdatedAt time.Time - ID string -} - -func (q *Queries) MarkNotificationRouted(ctx context.Context, arg MarkNotificationRoutedParams) error { - _, err := q.db.ExecContext(ctx, markNotificationRouted, arg.RoutedAt, arg.UpdatedAt, arg.ID) - return err -} diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go index 47ca63d9..7b2b5493 100644 --- a/backend/internal/storage/sqlite/gen/notifications.sql.go +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -16,7 +16,7 @@ UPDATE notifications SET archived_at = ?, updated_at = ? WHERE id = ? AND archived_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at ` type ArchiveNotificationParams struct { @@ -46,14 +46,13 @@ func (q *Queries) ArchiveNotification(ctx context.Context, arg ArchiveNotificati &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } const getNotification = `-- name: GetNotification :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE id = ? ` @@ -78,14 +77,13 @@ func (q *Queries) GetNotification(ctx context.Context, id string) (Notification, &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } const getNotificationByDedupeKey = `-- name: GetNotificationByDedupeKey :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE dedupe_key = ? ` @@ -110,7 +108,6 @@ func (q *Queries) GetNotificationByDedupeKey(ctx context.Context, dedupeKey stri &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } @@ -122,7 +119,7 @@ INSERT INTO notifications ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (dedupe_key) DO NOTHING RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at ` type InsertNotificationParams struct { @@ -176,14 +173,13 @@ func (q *Queries) InsertNotification(ctx context.Context, arg InsertNotification &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } const listNotifications = `-- name: ListNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications ORDER BY seq DESC LIMIT ? @@ -216,7 +212,6 @@ func (q *Queries) ListNotifications(ctx context.Context, limit int64) ([]Notific &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ); err != nil { return nil, err } @@ -233,7 +228,7 @@ func (q *Queries) ListNotifications(ctx context.Context, limit int64) ([]Notific const listNotificationsByProject = `-- name: ListNotificationsByProject :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE project_id = ? ORDER BY seq DESC @@ -272,7 +267,6 @@ func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotifi &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ); err != nil { return nil, err } @@ -289,7 +283,7 @@ func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotifi const listNotificationsBySession = `-- name: ListNotificationsBySession :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE session_id = ? ORDER BY seq DESC @@ -328,7 +322,6 @@ func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotifi &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ); err != nil { return nil, err } @@ -345,7 +338,7 @@ func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotifi const listUnreadNotifications = `-- name: ListUnreadNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE read_at IS NULL AND archived_at IS NULL ORDER BY seq DESC @@ -379,7 +372,6 @@ func (q *Queries) ListUnreadNotifications(ctx context.Context, limit int64) ([]N &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ); err != nil { return nil, err } @@ -399,7 +391,7 @@ UPDATE notifications SET read_at = ?, updated_at = ? WHERE id = ? AND read_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at ` type MarkNotificationReadParams struct { @@ -429,7 +421,6 @@ func (q *Queries) MarkNotificationRead(ctx context.Context, arg MarkNotification &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } @@ -439,7 +430,7 @@ UPDATE notifications SET read_at = NULL, updated_at = ? WHERE id = ? AND read_at IS NOT NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at ` type MarkNotificationUnreadParams struct { @@ -468,7 +459,6 @@ func (q *Queries) MarkNotificationUnread(ctx context.Context, arg MarkNotificati &i.ArchivedAt, &i.CreatedAt, &i.UpdatedAt, - &i.RoutedAt, ) return i, err } diff --git a/backend/internal/storage/sqlite/gen/querier.go b/backend/internal/storage/sqlite/gen/querier.go index 87550b12..4f91a9d5 100644 --- a/backend/internal/storage/sqlite/gen/querier.go +++ b/backend/internal/storage/sqlite/gen/querier.go @@ -16,13 +16,10 @@ type Querier interface { DeleteSession(ctx context.Context, id string) error GetNotification(ctx context.Context, id string) (Notification, error) GetNotificationByDedupeKey(ctx context.Context, dedupeKey string) (Notification, error) - GetNotificationDelivery(ctx context.Context, id string) (NotificationDelivery, error) - GetNotificationDeliveryByUnique(ctx context.Context, arg GetNotificationDeliveryByUniqueParams) (NotificationDelivery, error) GetPR(ctx context.Context, url string) (Pr, error) GetProject(ctx context.Context, id string) (Project, error) GetSession(ctx context.Context, id string) (Session, error) InsertNotification(ctx context.Context, arg InsertNotificationParams) (Notification, error) - InsertNotificationDelivery(ctx context.Context, arg InsertNotificationDeliveryParams) (NotificationDelivery, error) InsertSession(ctx context.Context, arg InsertSessionParams) error ListAllSessions(ctx context.Context) ([]Session, error) ListChecksByPR(ctx context.Context, prUrl string) ([]PrCheck, error) @@ -35,9 +32,7 @@ type Querier interface { ListRecentChecks(ctx context.Context, arg ListRecentChecksParams) ([]ListRecentChecksRow, error) ListSessionsByProject(ctx context.Context, projectID string) ([]Session, error) ListUnreadNotifications(ctx context.Context, limit int64) ([]Notification, error) - ListUnroutedNotifications(ctx context.Context, limit int64) ([]Notification, error) MarkNotificationRead(ctx context.Context, arg MarkNotificationReadParams) (Notification, error) - MarkNotificationRouted(ctx context.Context, arg MarkNotificationRoutedParams) error MarkNotificationUnread(ctx context.Context, arg MarkNotificationUnreadParams) (Notification, error) MaxChangeLogSeq(ctx context.Context) (interface{}, error) NextSessionNum(ctx context.Context, projectID string) (int64, error) diff --git a/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql b/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql deleted file mode 100644 index 4c7cca1d..00000000 --- a/backend/internal/storage/sqlite/migrations/0003_notification_deliveries.sql +++ /dev/null @@ -1,128 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE notifications ADD COLUMN routed_at TIMESTAMP; --- Notifications that already exist when this migration runs predate the --- delivery runtime. Treat them as already routed so an upgrade does not --- synthesize new AO-app desktop toasts for historical notification rows; the --- dashboard read model still sees those rows directly from notifications. -UPDATE notifications SET routed_at = updated_at WHERE routed_at IS NULL; -CREATE INDEX idx_notifications_unrouted - ON notifications(seq) - WHERE routed_at IS NULL; - -CREATE TABLE notification_deliveries ( - id TEXT PRIMARY KEY, - notification_id TEXT NOT NULL REFERENCES notifications(id) ON DELETE CASCADE, - notification_seq INTEGER NOT NULL, - project_id TEXT NOT NULL REFERENCES projects(id), - session_id TEXT NOT NULL REFERENCES sessions(id), - - route_name TEXT NOT NULL, - sink TEXT NOT NULL, - destination_key TEXT NOT NULL DEFAULT '', - request_json TEXT NOT NULL DEFAULT '{}' CHECK (json_valid(request_json)), - - status TEXT NOT NULL CHECK (status IN ('queued','leased','sent','retry_wait','failed','skipped','cancelled')), - attempts INTEGER NOT NULL DEFAULT 0, - max_attempts INTEGER NOT NULL DEFAULT 5, - next_attempt_at TIMESTAMP NOT NULL, - lease_owner TEXT NOT NULL DEFAULT '', - lease_expires_at TIMESTAMP, - - last_error_code TEXT NOT NULL DEFAULT '', - last_error TEXT NOT NULL DEFAULT '', - external_id TEXT NOT NULL DEFAULT '', - - created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), - updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), - delivered_at TIMESTAMP, - - UNIQUE(notification_id, route_name, destination_key) -); - -CREATE INDEX idx_notification_deliveries_due - ON notification_deliveries(status, next_attempt_at, lease_expires_at, created_at); - -CREATE INDEX idx_notification_deliveries_notification - ON notification_deliveries(notification_id, status); - -CREATE INDEX idx_notification_deliveries_project - ON notification_deliveries(project_id, created_at DESC); - -CREATE TABLE notification_delivery_attempts ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - delivery_id TEXT NOT NULL REFERENCES notification_deliveries(id) ON DELETE CASCADE, - attempt_no INTEGER NOT NULL, - status TEXT NOT NULL CHECK (status IN ('started','sent','retryable_failed','failed')), - started_at TIMESTAMP NOT NULL, - finished_at TIMESTAMP, - error_code TEXT NOT NULL DEFAULT '', - error TEXT NOT NULL DEFAULT '', - response_json TEXT NOT NULL DEFAULT '{}' CHECK (json_valid(response_json)), - UNIQUE(delivery_id, attempt_no) -); --- +goose StatementEnd - --- +goose StatementBegin -CREATE TRIGGER notification_deliveries_cdc_insert -AFTER INSERT ON notification_deliveries -BEGIN - INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) - VALUES ( - NEW.project_id, - NEW.session_id, - 'notification_delivery_created', - json_object( - 'id', NEW.id, - 'notificationId', NEW.notification_id, - 'routeName', NEW.route_name, - 'sink', NEW.sink, - 'status', NEW.status, - 'attempts', NEW.attempts, - 'lastErrorCode', NEW.last_error_code, - 'lastError', NEW.last_error - ), - NEW.created_at - ); -END; --- +goose StatementEnd - --- +goose StatementBegin -CREATE TRIGGER notification_deliveries_cdc_update -AFTER UPDATE ON notification_deliveries -WHEN OLD.status <> NEW.status - OR OLD.attempts <> NEW.attempts - OR OLD.last_error_code <> NEW.last_error_code - OR OLD.last_error <> NEW.last_error - OR OLD.external_id <> NEW.external_id - OR OLD.delivered_at IS NOT NEW.delivered_at -BEGIN - INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) - VALUES ( - NEW.project_id, - NEW.session_id, - 'notification_delivery_updated', - json_object( - 'id', NEW.id, - 'notificationId', NEW.notification_id, - 'routeName', NEW.route_name, - 'sink', NEW.sink, - 'status', NEW.status, - 'attempts', NEW.attempts, - 'lastErrorCode', NEW.last_error_code, - 'lastError', NEW.last_error - ), - NEW.updated_at - ); -END; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -DROP TRIGGER IF EXISTS notification_deliveries_cdc_update; -DROP TRIGGER IF EXISTS notification_deliveries_cdc_insert; -DROP TABLE IF EXISTS notification_delivery_attempts; -DROP TABLE IF EXISTS notification_deliveries; -DROP INDEX IF EXISTS idx_notifications_unrouted; -ALTER TABLE notifications DROP COLUMN routed_at; --- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/notification_delivery_store.go b/backend/internal/storage/sqlite/notification_delivery_store.go deleted file mode 100644 index 9cdddc74..00000000 --- a/backend/internal/storage/sqlite/notification_delivery_store.go +++ /dev/null @@ -1,462 +0,0 @@ -package sqlite - -import ( - "context" - "database/sql" - "errors" - "fmt" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/notification" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" -) - -const deliveryColumns = `id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at` - -const ( - defaultDeliveryLimit = 100 - defaultDeliveryMaxAttempts = 5 // mirrors notification_deliveries.max_attempts schema default -) - -type DeliveryFilter struct { - NotificationID string - ProjectID string - Status notification.DeliveryStatus - Limit int -} - -func (s *Store) ListUnroutedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) { - if limit <= 0 { - limit = defaultNotificationLimit - } - rows, err := s.qr.ListUnroutedNotifications(ctx, int64(limit)) - if err != nil { - return nil, fmt.Errorf("list unrouted notifications: %w", err) - } - return notificationsFromGen(rows) -} - -func (s *Store) MarkNotificationRouted(ctx context.Context, id domain.NotificationID, at time.Time) error { - if at.IsZero() { - at = time.Now().UTC() - } - s.writeMu.Lock() - defer s.writeMu.Unlock() - if err := s.qw.MarkNotificationRouted(ctx, gen.MarkNotificationRoutedParams{ - RoutedAt: nullTime(at), - UpdatedAt: at, - ID: string(id), - }); err != nil { - return fmt.Errorf("mark notification routed %s: %w", id, err) - } - return nil -} - -func (s *Store) EnqueueDelivery(ctx context.Context, row notification.DeliveryRow) (notification.DeliveryRow, bool, error) { - now := row.CreatedAt - if now.IsZero() { - now = time.Now().UTC() - } - row, err := notification.NormalizeDelivery(row, now, defaultDeliveryMaxAttempts) - if err != nil { - return notification.DeliveryRow{}, false, err - } - - s.writeMu.Lock() - defer s.writeMu.Unlock() - - got, err := s.qw.InsertNotificationDelivery(ctx, gen.InsertNotificationDeliveryParams{ - ID: row.ID, - NotificationID: string(row.NotificationID), - NotificationSeq: row.NotificationSeq, - ProjectID: string(row.ProjectID), - SessionID: string(row.SessionID), - RouteName: row.RouteName, - Sink: row.Sink, - DestinationKey: row.DestinationKey, - RequestJson: string(row.RequestJSON), - Status: string(row.Status), - Attempts: int64(row.Attempts), - MaxAttempts: int64(row.MaxAttempts), - NextAttemptAt: row.NextAttemptAt, - LeaseOwner: row.LeaseOwner, - LeaseExpiresAt: nullTime(row.LeaseExpiresAt), - LastErrorCode: row.LastErrorCode, - LastError: row.LastError, - ExternalID: row.ExternalID, - CreatedAt: row.CreatedAt, - UpdatedAt: row.UpdatedAt, - DeliveredAt: nullTime(row.DeliveredAt), - }) - if errors.Is(err, sql.ErrNoRows) { - existing, readErr := s.getDeliveryByUniqueLocked(ctx, row.NotificationID, row.RouteName, row.DestinationKey) - if readErr != nil { - return notification.DeliveryRow{}, false, readErr - } - return existing, false, nil - } - if err != nil { - return notification.DeliveryRow{}, false, fmt.Errorf("insert notification delivery: %w", err) - } - return deliveryFromGen(got), true, nil -} - -func (s *Store) ClaimDueDeliveries(ctx context.Context, sink string, owner string, now time.Time, limit int, lease time.Duration) ([]notification.DeliveryRow, error) { - if now.IsZero() { - now = time.Now().UTC() - } - if limit <= 0 { - limit = defaultDeliveryLimit - } - if lease <= 0 { - lease = 30 * time.Second - } - expires := now.Add(lease) - - s.writeMu.Lock() - defer s.writeMu.Unlock() - - tx, err := s.writeDB.BeginTx(ctx, nil) - if err != nil { - return nil, fmt.Errorf("begin claim deliveries: %w", err) - } - defer tx.Rollback() - - rows, err := tx.QueryContext(ctx, `SELECT id -FROM notification_deliveries -WHERE sink = ? - AND status IN ('queued','retry_wait') - AND next_attempt_at <= ? - AND attempts < max_attempts -ORDER BY next_attempt_at ASC, created_at ASC, id ASC -LIMIT ?`, sink, now, limit) - if err != nil { - return nil, fmt.Errorf("select due deliveries: %w", err) - } - var ids []string - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - rows.Close() - return nil, err - } - ids = append(ids, id) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - - out := make([]notification.DeliveryRow, 0, len(ids)) - for _, id := range ids { - res, err := tx.ExecContext(ctx, `UPDATE notification_deliveries -SET status = 'leased', - lease_owner = ?, - lease_expires_at = ?, - updated_at = ? -WHERE id = ? - AND status IN ('queued','retry_wait') - AND next_attempt_at <= ? - AND attempts < max_attempts`, owner, expires, now, id, now) - if err != nil { - return nil, fmt.Errorf("lease delivery %s: %w", id, err) - } - changed, err := res.RowsAffected() - if err != nil { - return nil, err - } - if changed == 0 { - continue - } - row, err := scanDelivery(tx.QueryRowContext(ctx, `SELECT `+deliveryColumns+` FROM notification_deliveries WHERE id = ?`, id)) - if err != nil { - return nil, fmt.Errorf("read leased delivery %s: %w", id, err) - } - out = append(out, row) - } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("commit claim deliveries: %w", err) - } - return out, nil -} - -func (s *Store) ReleaseExpiredDeliveryLeases(ctx context.Context, now time.Time) (int, error) { - if now.IsZero() { - now = time.Now().UTC() - } - s.writeMu.Lock() - defer s.writeMu.Unlock() - res, err := s.writeDB.ExecContext(ctx, `UPDATE notification_deliveries -SET attempts = attempts + 1, - status = CASE WHEN attempts + 1 >= max_attempts THEN 'failed' ELSE 'queued' END, - next_attempt_at = ?, - lease_owner = '', - lease_expires_at = NULL, - last_error_code = 'lease_expired', - last_error = 'delivery lease expired', - updated_at = ? -WHERE status = 'leased' - AND lease_expires_at IS NOT NULL - AND lease_expires_at <= ?`, now, now, now) - if err != nil { - return 0, fmt.Errorf("release expired delivery leases: %w", err) - } - n, err := res.RowsAffected() - if err != nil { - return 0, err - } - return int(n), nil -} - -func (s *Store) MarkDeliverySent(ctx context.Context, id string, owner string, externalID string, at time.Time) error { - if at.IsZero() { - at = time.Now().UTC() - } - return s.updateDelivery(ctx, "mark delivery sent", `UPDATE notification_deliveries -SET status = 'sent', - attempts = attempts + 1, - lease_owner = '', - lease_expires_at = NULL, - external_id = ?, - delivered_at = ?, - updated_at = ? -WHERE id = ? - AND status = 'leased' - AND lease_owner = ? - AND lease_expires_at > ?`, externalID, at, at, id, owner, at) -} - -func (s *Store) MarkDeliveryRetry(ctx context.Context, id string, owner string, errCode string, errMessage string, next time.Time, at time.Time) error { - if at.IsZero() { - at = time.Now().UTC() - } - if next.IsZero() { - next = at - } - return s.updateDelivery(ctx, "mark delivery retry", `UPDATE notification_deliveries -SET attempts = attempts + 1, - status = CASE WHEN attempts + 1 >= max_attempts THEN 'failed' ELSE 'retry_wait' END, - next_attempt_at = ?, - lease_owner = '', - lease_expires_at = NULL, - last_error_code = ?, - last_error = ?, - updated_at = ? -WHERE id = ? - AND status = 'leased' - AND lease_owner = ? - AND lease_expires_at > ?`, next, errCode, errMessage, at, id, owner, at) -} - -func (s *Store) MarkDeliveryFailed(ctx context.Context, id string, owner string, errCode string, errMessage string, at time.Time) error { - if at.IsZero() { - at = time.Now().UTC() - } - return s.updateDelivery(ctx, "mark delivery failed", `UPDATE notification_deliveries -SET status = 'failed', - attempts = CASE WHEN status = 'leased' THEN attempts + 1 ELSE attempts END, - lease_owner = '', - lease_expires_at = NULL, - last_error_code = ?, - last_error = ?, - updated_at = ? -WHERE id = ? - AND status = 'leased' - AND lease_owner = ? - AND lease_expires_at > ?`, errCode, errMessage, at, id, owner, at) -} - -func (s *Store) MarkDeliverySkipped(ctx context.Context, id string, reason string, at time.Time) error { - if at.IsZero() { - at = time.Now().UTC() - } - return s.updateDelivery(ctx, "mark delivery skipped", `UPDATE notification_deliveries -SET status = 'skipped', - lease_owner = '', - lease_expires_at = NULL, - last_error_code = 'skipped', - last_error = ?, - updated_at = ? -WHERE id = ? AND status NOT IN ('sent','failed','skipped','cancelled')`, reason, at, id) -} - -func (s *Store) GetDelivery(ctx context.Context, id string) (notification.DeliveryRow, bool, error) { - row, err := s.qr.GetNotificationDelivery(ctx, id) - if errors.Is(err, sql.ErrNoRows) { - return notification.DeliveryRow{}, false, nil - } - if err != nil { - return notification.DeliveryRow{}, false, fmt.Errorf("get notification delivery %s: %w", id, err) - } - return deliveryFromGen(row), true, nil -} - -func (s *Store) ListDeliveries(ctx context.Context, filter DeliveryFilter) ([]notification.DeliveryRow, error) { - limit := filter.Limit - if limit <= 0 { - limit = defaultDeliveryLimit - } - base := `SELECT ` + deliveryColumns + ` FROM notification_deliveries` - order := ` ORDER BY created_at ASC, id ASC LIMIT ?` - var ( - rows *sql.Rows - err error - ) - switch { - case filter.NotificationID != "": - if filter.Status != "" { - rows, err = s.readDB.QueryContext(ctx, base+` WHERE notification_id = ? AND status = ?`+order, filter.NotificationID, string(filter.Status), limit) - } else { - rows, err = s.readDB.QueryContext(ctx, base+` WHERE notification_id = ?`+order, filter.NotificationID, limit) - } - case filter.ProjectID != "": - if filter.Status != "" { - rows, err = s.readDB.QueryContext(ctx, base+` WHERE project_id = ? AND status = ?`+order, filter.ProjectID, string(filter.Status), limit) - } else { - rows, err = s.readDB.QueryContext(ctx, base+` WHERE project_id = ?`+order, filter.ProjectID, limit) - } - default: - if filter.Status != "" { - rows, err = s.readDB.QueryContext(ctx, base+` WHERE status = ?`+order, string(filter.Status), limit) - } else { - rows, err = s.readDB.QueryContext(ctx, base+order, limit) - } - } - if err != nil { - return nil, fmt.Errorf("list notification deliveries: %w", err) - } - defer rows.Close() - out := []notification.DeliveryRow{} - for rows.Next() { - row, err := scanDelivery(rows) - if err != nil { - return nil, err - } - out = append(out, row) - } - if err := rows.Err(); err != nil { - return nil, err - } - return out, nil -} - -func (s *Store) updateDelivery(ctx context.Context, what string, query string, args ...any) error { - s.writeMu.Lock() - defer s.writeMu.Unlock() - res, err := s.writeDB.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("%s: %w", what, err) - } - affected, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("%s rows affected: %w", what, err) - } - if affected == 0 { - return fmt.Errorf("%s: %w", what, notification.ErrDeliveryUpdateConflict) - } - return nil -} - -func (s *Store) getDeliveryByUniqueLocked(ctx context.Context, id domain.NotificationID, routeName, destinationKey string) (notification.DeliveryRow, error) { - row, err := s.qw.GetNotificationDeliveryByUnique(ctx, gen.GetNotificationDeliveryByUniqueParams{ - NotificationID: string(id), - RouteName: routeName, - DestinationKey: destinationKey, - }) - if err != nil { - return notification.DeliveryRow{}, fmt.Errorf("get notification delivery by unique key: %w", err) - } - return deliveryFromGen(row), nil -} - -type rowScanner interface { - Scan(dest ...any) error -} - -func scanDelivery(scanner rowScanner) (notification.DeliveryRow, error) { - var ( - row notification.DeliveryRow - notificationID string - projectID string - sessionID string - status string - requestJSON string - leaseExpires sql.NullTime - deliveredAt sql.NullTime - ) - if err := scanner.Scan( - &row.ID, - ¬ificationID, - &row.NotificationSeq, - &projectID, - &sessionID, - &row.RouteName, - &row.Sink, - &row.DestinationKey, - &requestJSON, - &status, - &row.Attempts, - &row.MaxAttempts, - &row.NextAttemptAt, - &row.LeaseOwner, - &leaseExpires, - &row.LastErrorCode, - &row.LastError, - &row.ExternalID, - &row.CreatedAt, - &row.UpdatedAt, - &deliveredAt, - ); err != nil { - return notification.DeliveryRow{}, err - } - row.NotificationID = domain.NotificationID(notificationID) - row.ProjectID = domain.ProjectID(projectID) - row.SessionID = domain.SessionID(sessionID) - row.RequestJSON = []byte(requestJSON) - row.Status = notification.DeliveryStatus(status) - if leaseExpires.Valid { - row.LeaseExpiresAt = leaseExpires.Time - } - if deliveredAt.Valid { - row.DeliveredAt = deliveredAt.Time - } - return row, nil -} - -func deliveryFromGen(r gen.NotificationDelivery) notification.DeliveryRow { - row := notification.DeliveryRow{ - ID: r.ID, - NotificationID: domain.NotificationID(r.NotificationID), - NotificationSeq: r.NotificationSeq, - ProjectID: domain.ProjectID(r.ProjectID), - SessionID: domain.SessionID(r.SessionID), - RouteName: r.RouteName, - Sink: r.Sink, - DestinationKey: r.DestinationKey, - RequestJSON: []byte(r.RequestJson), - Status: notification.DeliveryStatus(r.Status), - Attempts: int(r.Attempts), - MaxAttempts: int(r.MaxAttempts), - NextAttemptAt: r.NextAttemptAt, - LeaseOwner: r.LeaseOwner, - LastErrorCode: r.LastErrorCode, - LastError: r.LastError, - ExternalID: r.ExternalID, - CreatedAt: r.CreatedAt, - UpdatedAt: r.UpdatedAt, - } - if r.LeaseExpiresAt.Valid { - row.LeaseExpiresAt = r.LeaseExpiresAt.Time - } - if r.DeliveredAt.Valid { - row.DeliveredAt = r.DeliveredAt.Time - } - return row -} diff --git a/backend/internal/storage/sqlite/notification_delivery_store_test.go b/backend/internal/storage/sqlite/notification_delivery_store_test.go deleted file mode 100644 index 2c675466..00000000 --- a/backend/internal/storage/sqlite/notification_delivery_store_test.go +++ /dev/null @@ -1,302 +0,0 @@ -package sqlite - -import ( - "context" - "errors" - "fmt" - "testing" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/cdc" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/notification" -) - -func TestNotificationDeliveryEnqueueIdempotentAndCDC(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-dedupe") - ctx := context.Background() - startSeq, _ := s.MaxChangeLogSeq(ctx) - - row, created, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) - if err != nil { - t.Fatal(err) - } - if !created || row.ID == "" || row.Status != notification.DeliveryQueued { - t.Fatalf("created=%v row=%+v", created, row) - } - dup, created, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) - if err != nil { - t.Fatal(err) - } - if created || dup.ID != row.ID { - t.Fatalf("duplicate should return existing row created=false: created=%v dup=%+v row=%+v", created, dup, row) - } - evs, err := s.ReadChangeLogAfter(ctx, startSeq, 10) - if err != nil { - t.Fatal(err) - } - var createdEvents int - for _, ev := range evs { - if ev.EventType == string(cdc.EventNotificationDeliveryCreated) { - createdEvents++ - } - } - if createdEvents != 1 { - t.Fatalf("delivery created CDC count = %d, want 1 events=%+v", createdEvents, evs) - } -} - -func TestNotificationDeliveryEnqueueDefaultMaxAttempts(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-default-max") - ctx := context.Background() - row := sampleDelivery(ntf, "desktop") - row.MaxAttempts = 0 - got, _, err := s.EnqueueDelivery(ctx, row) - if err != nil { - t.Fatal(err) - } - if got.MaxAttempts != 5 { - t.Fatalf("default max attempts = %d, want 5", got.MaxAttempts) - } -} - -func TestNotificationDeliveryClaimDueStableOrder(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-claim") - ctx := context.Background() - base := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) - for i, d := range []time.Duration{2 * time.Second, time.Second, 3 * time.Second} { - row := sampleDelivery(ntf, fmt.Sprintf("desktop-%d", i)) - row.DestinationKey = fmt.Sprintf("dest-%d", i) - row.NextAttemptAt = base.Add(d) - row.CreatedAt = base.Add(time.Duration(i) * time.Millisecond) - row.UpdatedAt = row.CreatedAt - if _, _, err := s.EnqueueDelivery(ctx, row); err != nil { - t.Fatal(err) - } - } - - claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "electron", base.Add(10*time.Second), 2, time.Minute) - if err != nil { - t.Fatal(err) - } - if len(claimed) != 2 { - t.Fatalf("claimed = %d, want 2", len(claimed)) - } - if claimed[0].DestinationKey != "dest-1" || claimed[1].DestinationKey != "dest-0" { - t.Fatalf("claim order = %s, %s; want dest-1, dest-0", claimed[0].DestinationKey, claimed[1].DestinationKey) - } - if claimed[0].Status != notification.DeliveryLeased || claimed[0].LeaseOwner != "electron" || claimed[0].LeaseExpiresAt.IsZero() { - t.Fatalf("claimed row not leased: %+v", claimed[0]) - } -} - -func TestNotificationDeliveryLeaseExpiryAndMaxAttempts(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-expiry") - ctx := context.Background() - now := time.Now().UTC().Truncate(time.Second) - queued, _, err := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop", now)) - if err != nil { - t.Fatal(err) - } - claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Second) - if err != nil || len(claimed) != 1 { - t.Fatalf("claim len=%d err=%v", len(claimed), err) - } - released, err := s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)) - if err != nil || released != 1 { - t.Fatalf("release = %d err=%v", released, err) - } - got, ok, _ := s.GetDelivery(ctx, queued.ID) - if !ok || got.Status != notification.DeliveryQueued || got.Attempts != 1 || got.LeaseOwner != "" { - t.Fatalf("expired lease should return queued with attempts=1: ok=%v row=%+v", ok, got) - } - - maxOne := sampleDueDelivery(ntf, "desktop-max", now) - maxOne.DestinationKey = "max" - maxOne.MaxAttempts = 1 - maxOne, _, err = s.EnqueueDelivery(ctx, maxOne) - if err != nil { - t.Fatal(err) - } - if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Second); err != nil { - t.Fatal(err) - } - released, err = s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)) - if err != nil || released != 1 { - t.Fatalf("release max = %d err=%v", released, err) - } - got, ok, _ = s.GetDelivery(ctx, maxOne.ID) - if !ok || got.Status != notification.DeliveryFailed || got.Attempts != 1 { - t.Fatalf("max attempts expired lease should fail: ok=%v row=%+v", ok, got) - } -} - -func TestNotificationDeliveryMarkSentRetryFailedAndSkipped(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-mark") - ctx := context.Background() - now := time.Now().UTC().Truncate(time.Second) - - sent, _, _ := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop-sent", now)) - claimed, _ := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) - if len(claimed) != 1 { - t.Fatalf("claim sent row len=%d", len(claimed)) - } - if err := s.MarkDeliverySent(ctx, sent.ID, "owner", "native-1", now.Add(time.Second)); err != nil { - t.Fatal(err) - } - got, _, _ := s.GetDelivery(ctx, sent.ID) - if got.Status != notification.DeliverySent || got.ExternalID != "native-1" || got.Attempts != 1 || got.DeliveredAt.IsZero() { - t.Fatalf("sent row = %+v", got) - } - - retry := sampleDueDelivery(ntf, "desktop-retry", now) - retry.DestinationKey = "retry" - retry, _, _ = s.EnqueueDelivery(ctx, retry) - claimed, _ = s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) - if len(claimed) != 1 { - t.Fatalf("claim retry row len=%d", len(claimed)) - } - next := now.Add(30 * time.Second) - if err := s.MarkDeliveryRetry(ctx, retry.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); err != nil { - t.Fatal(err) - } - got, _, _ = s.GetDelivery(ctx, retry.ID) - if got.Status != notification.DeliveryRetryWait || got.Attempts != 1 || !got.NextAttemptAt.Equal(next) { - t.Fatalf("retry row = %+v", got) - } - - fail := sampleDueDelivery(ntf, "desktop-fail", now) - fail.DestinationKey = "fail" - fail.MaxAttempts = 1 - fail, _, _ = s.EnqueueDelivery(ctx, fail) - claimed, _ = s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 1, time.Minute) - if len(claimed) != 1 { - t.Fatalf("claim fail row len=%d", len(claimed)) - } - if err := s.MarkDeliveryRetry(ctx, fail.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); err != nil { - t.Fatal(err) - } - got, _, _ = s.GetDelivery(ctx, fail.ID) - if got.Status != notification.DeliveryFailed || got.Attempts != 1 { - t.Fatalf("retry at max should fail: %+v", got) - } - - skipped := sampleDueDelivery(ntf, "desktop-skip", now) - skipped.DestinationKey = "skip" - skipped.Status = notification.DeliverySkipped - skipped, _, _ = s.EnqueueDelivery(ctx, skipped) - claimed, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", now, 10, time.Minute) - if err != nil { - t.Fatal(err) - } - for _, row := range claimed { - if row.ID == skipped.ID { - t.Fatalf("skipped row should not be claimable: %+v", claimed) - } - } - if err := s.MarkDeliveryRetry(ctx, skipped.ID, "owner", "timeout", "timed out", next, now.Add(time.Second)); !errors.Is(err, notification.ErrDeliveryUpdateConflict) { - t.Fatalf("retry skipped row err = %v, want update conflict", err) - } - got, _, _ = s.GetDelivery(ctx, skipped.ID) - if got.Status != notification.DeliverySkipped || got.Attempts != 0 { - t.Fatalf("skipped row should be terminal: %+v", got) - } -} - -func TestNotificationDeliveryCompletionFencedByLeaseOwner(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-owner-fence") - ctx := context.Background() - now := time.Now().UTC().Truncate(time.Second) - row, _, err := s.EnqueueDelivery(ctx, sampleDueDelivery(ntf, "desktop", now)) - if err != nil { - t.Fatal(err) - } - if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner-1", now, 1, time.Second); err != nil { - t.Fatal(err) - } - if released, err := s.ReleaseExpiredDeliveryLeases(ctx, now.Add(2*time.Second)); err != nil || released != 1 { - t.Fatalf("release = %d err=%v", released, err) - } - if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner-2", now.Add(2*time.Second), 1, time.Second); err != nil { - t.Fatal(err) - } - - if err := s.MarkDeliverySent(ctx, row.ID, "owner-1", "stale", now.Add(2500*time.Millisecond)); !errors.Is(err, notification.ErrDeliveryUpdateConflict) { - t.Fatalf("stale owner MarkDeliverySent err = %v, want update conflict", err) - } - got, _, _ := s.GetDelivery(ctx, row.ID) - if got.Status != notification.DeliveryLeased || got.LeaseOwner != "owner-2" || got.ExternalID != "" { - t.Fatalf("stale owner should not change active lease: %+v", got) - } - if err := s.MarkDeliverySent(ctx, row.ID, "owner-2", "native-2", now.Add(2500*time.Millisecond)); err != nil { - t.Fatalf("current owner sent: %v", err) - } - got, _, _ = s.GetDelivery(ctx, row.ID) - if got.Status != notification.DeliverySent || got.ExternalID != "native-2" { - t.Fatalf("current owner should complete delivery: %+v", got) - } -} - -func TestNotificationDeliveryUpdateCDC(t *testing.T) { - s, ntf := newDeliveryTestNotification(t, "delivery-cdc-update") - ctx := context.Background() - row, _, err := s.EnqueueDelivery(ctx, sampleDelivery(ntf, "desktop")) - if err != nil { - t.Fatal(err) - } - startSeq, _ := s.MaxChangeLogSeq(ctx) - if _, err := s.ClaimDueDeliveries(ctx, notification.SinkAOApp, "owner", time.Now().UTC(), 1, time.Minute); err != nil { - t.Fatal(err) - } - if err := s.MarkDeliveryFailed(ctx, row.ID, "owner", "permanent", "bad route", time.Now().UTC()); err != nil { - t.Fatal(err) - } - evs, err := s.ReadChangeLogAfter(ctx, startSeq, 10) - if err != nil { - t.Fatal(err) - } - var updates int - for _, ev := range evs { - if ev.EventType == string(cdc.EventNotificationDeliveryUpdated) { - updates++ - } - } - if updates < 2 { - t.Fatalf("expected claim + failed update CDC events, got %d in %+v", updates, evs) - } -} - -func newDeliveryTestNotification(t *testing.T, dedupe string) (*Store, domain.Notification) { - t.Helper() - s, rec := newNotificationTestSession(t) - row, _, err := s.EnqueueNotification(context.Background(), sampleNotification(rec, dedupe)) - if err != nil { - t.Fatalf("enqueue notification: %v", err) - } - return s, row -} - -func sampleDelivery(ntf domain.Notification, route string) notification.DeliveryRow { - now := time.Now().UTC().Truncate(time.Second) - return notification.DeliveryRow{ - NotificationID: ntf.ID, - NotificationSeq: ntf.Seq, - ProjectID: ntf.ProjectID, - SessionID: ntf.SessionID, - RouteName: route, - Sink: notification.SinkAOApp, - Status: notification.DeliveryQueued, - MaxAttempts: 5, - NextAttemptAt: now, - CreatedAt: now, - UpdatedAt: now, - } -} - -func sampleDueDelivery(ntf domain.Notification, route string, due time.Time) notification.DeliveryRow { - row := sampleDelivery(ntf, route) - row.NextAttemptAt = due - row.CreatedAt = due - row.UpdatedAt = due - return row -} diff --git a/backend/internal/storage/sqlite/notification_store.go b/backend/internal/storage/sqlite/notification_store.go index 8e3e1b0e..90b84331 100644 --- a/backend/internal/storage/sqlite/notification_store.go +++ b/backend/internal/storage/sqlite/notification_store.go @@ -238,8 +238,5 @@ func notificationFromGen(r gen.Notification) (NotificationRow, error) { if r.ArchivedAt.Valid { row.ArchivedAt = r.ArchivedAt.Time } - if r.RoutedAt.Valid { - row.RoutedAt = r.RoutedAt.Time - } return row, nil } diff --git a/backend/internal/storage/sqlite/queries/notification_deliveries.sql b/backend/internal/storage/sqlite/queries/notification_deliveries.sql deleted file mode 100644 index 02f403af..00000000 --- a/backend/internal/storage/sqlite/queries/notification_deliveries.sql +++ /dev/null @@ -1,46 +0,0 @@ --- name: ListUnroutedNotifications :many -SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at -FROM notifications -WHERE routed_at IS NULL -ORDER BY seq ASC -LIMIT ?; - --- name: MarkNotificationRouted :exec -UPDATE notifications -SET routed_at = COALESCE(routed_at, ?), - updated_at = CASE WHEN routed_at IS NULL THEN ? ELSE updated_at END -WHERE id = ?; - --- name: InsertNotificationDelivery :one -INSERT INTO notification_deliveries ( - id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) -ON CONFLICT(notification_id, route_name, destination_key) DO NOTHING -RETURNING id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at; - --- name: GetNotificationDelivery :one -SELECT id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -FROM notification_deliveries -WHERE id = ?; - --- name: GetNotificationDeliveryByUnique :one -SELECT id, notification_id, notification_seq, project_id, session_id, - route_name, sink, destination_key, request_json, - status, attempts, max_attempts, next_attempt_at, lease_owner, lease_expires_at, - last_error_code, last_error, external_id, - created_at, updated_at, delivered_at -FROM notification_deliveries -WHERE notification_id = ? AND route_name = ? AND destination_key = ?; diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql index 96732bf5..a896b43c 100644 --- a/backend/internal/storage/sqlite/queries/notifications.sql +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -5,28 +5,28 @@ INSERT INTO notifications ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (dedupe_key) DO NOTHING RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; -- name: GetNotification :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE id = ?; -- name: GetNotificationByDedupeKey :one SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE dedupe_key = ?; -- name: ListNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications ORDER BY seq DESC LIMIT ?; -- name: ListNotificationsByProject :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE project_id = ? ORDER BY seq DESC @@ -34,7 +34,7 @@ LIMIT ?; -- name: ListNotificationsBySession :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE session_id = ? ORDER BY seq DESC @@ -42,7 +42,7 @@ LIMIT ?; -- name: ListUnreadNotifications :many SELECT seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at FROM notifications WHERE read_at IS NULL AND archived_at IS NULL ORDER BY seq DESC @@ -53,18 +53,18 @@ UPDATE notifications SET read_at = ?, updated_at = ? WHERE id = ? AND read_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; -- name: MarkNotificationUnread :one UPDATE notifications SET read_at = NULL, updated_at = ? WHERE id = ? AND read_at IS NOT NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at; -- name: ArchiveNotification :one UPDATE notifications SET archived_at = ?, updated_at = ? WHERE id = ? AND archived_at IS NULL RETURNING seq, id, project_id, session_id, source, event_type, semantic_type, priority, - message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at, routed_at; + message, payload_json, actions_json, dedupe_key, cause_key, read_at, archived_at, created_at, updated_at;