Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions backend/internal/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
type EventType string

const (
EventSessionCreated EventType = "session_created"
EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
EventNotificationDeliveryCreated EventType = "notification_delivery_created"
EventNotificationDeliveryUpdated EventType = "notification_delivery_updated"
EventSessionCreated EventType = "session_created"
EventSessionUpdated EventType = "session_updated"
EventPRCreated EventType = "pr_created"
EventPRUpdated EventType = "pr_updated"
EventPRCheckRecorded EventType = "pr_check_recorded"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
)

// Event is one CDC change read from change_log. Seq is the monotonic ordering +
Expand Down
72 changes: 0 additions & 72 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"path/filepath"
"strconv"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

const (
Expand Down Expand Up @@ -52,46 +50,6 @@ type Config struct {
// DataDir is the directory holding durable state (the SQLite database and
// the CDC JSONL log). It is created on first use by the storage layer.
DataDir string
// Notifications controls the central notifier runtime. The dashboard is the
// durable notifications table itself; desktop delivery is handed off to the
// AO Electron app via notification_deliveries rows.
Notifications NotificationConfig
}

// NotificationConfig contains the global notification settings used by the
// central notifier runtime. It intentionally starts global (not per-project) so
// the routing model can grow without changing lifecycle reactions.
type NotificationConfig struct {
Enabled bool
Dashboard DashboardNotificationConfig
Desktop DesktopNotificationConfig
Routing NotificationRoutingConfig
Retry NotificationRetryConfig
}

type DashboardNotificationConfig struct {
Enabled bool
Limit int
}

type DesktopNotificationConfig struct {
Enabled bool
Priorities []ports.Priority
SoundPriorities []ports.Priority
}

type NotificationRoutingConfig struct {
// Priorities maps notification priority to built-in route names. The
// notifier currently implements dashboard and desktop only.
Priorities map[ports.Priority][]string
}

type NotificationRetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
LeaseTTL time.Duration
BatchSize int
}

// Addr returns the host:port the HTTP server binds. It uses net.JoinHostPort so
Expand Down Expand Up @@ -119,7 +77,6 @@ func Load() (Config, error) {
Port: DefaultPort,
RequestTimeout: DefaultRequestTimeout,
ShutdownTimeout: DefaultShutdownTimeout,
Notifications: DefaultNotificationConfig(),
}

if raw := os.Getenv("AO_PORT"); raw != "" {
Expand Down Expand Up @@ -164,35 +121,6 @@ func Load() (Config, error) {
return cfg, nil
}

// DefaultNotificationConfig returns the safe zero-setup notification settings.
func DefaultNotificationConfig() NotificationConfig {
return NotificationConfig{
Enabled: true,
Dashboard: DashboardNotificationConfig{
Enabled: true,
Limit: 50,
},
Desktop: DesktopNotificationConfig{
Enabled: true,
Priorities: []ports.Priority{ports.PriorityUrgent, ports.PriorityAction},
SoundPriorities: []ports.Priority{ports.PriorityUrgent},
},
Routing: NotificationRoutingConfig{Priorities: map[ports.Priority][]string{
ports.PriorityUrgent: []string{"dashboard", "desktop"},
ports.PriorityAction: []string{"dashboard", "desktop"},
ports.PriorityWarning: []string{"dashboard"},
ports.PriorityInfo: []string{"dashboard"},
}},
Retry: NotificationRetryConfig{
MaxAttempts: 5,
BaseDelay: time.Second,
MaxDelay: 5 * time.Minute,
LeaseTTL: 30 * time.Second,
BatchSize: 50,
},
}
}

// parsePositiveDuration rejects zero and negative durations: a zero
// RequestTimeout would expire every request instantly, and a non-positive
// ShutdownTimeout would defeat graceful shutdown.
Expand Down
14 changes: 0 additions & 14 deletions backend/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package config
import (
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

func TestLoadDefaults(t *testing.T) {
Expand Down Expand Up @@ -33,18 +31,6 @@ func TestLoadDefaults(t *testing.T) {
if cfg.RunFilePath == "" {
t.Error("RunFilePath is empty, want a resolved default path")
}
if !cfg.Notifications.Enabled || !cfg.Notifications.Dashboard.Enabled || !cfg.Notifications.Desktop.Enabled {
t.Fatalf("notification defaults should be enabled: %+v", cfg.Notifications)
}
if cfg.Notifications.Dashboard.Limit != 50 {
t.Fatalf("dashboard limit = %d, want 50", cfg.Notifications.Dashboard.Limit)
}
if got := cfg.Notifications.Routing.Priorities[ports.PriorityUrgent]; len(got) != 2 || got[0] != "dashboard" || got[1] != "desktop" {
t.Fatalf("urgent routes = %v, want dashboard+desktop", got)
}
if cfg.Notifications.Retry.MaxAttempts != 5 || cfg.Notifications.Retry.LeaseTTL != 30*time.Second {
t.Fatalf("retry defaults = %+v", cfg.Notifications.Retry)
}
}

func TestLoadOverrides(t *testing.T) {
Expand Down
14 changes: 0 additions & 14 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func Run() error {
return err
}

notifier := startNotifier(ctx, cfg, store, log)

// Terminal streaming: the tmux runtime supplies the PTY-attach command and
// liveness; the CDC broadcaster feeds the session-state channel. The manager
// is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow
Expand All @@ -73,11 +71,6 @@ func Run() error {

srv, err := httpd.New(cfg, log, termMgr)
if err != nil {
stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
return err
}

Expand All @@ -86,11 +79,6 @@ func Run() error {
// trigger -> change_log -> poller -> broadcaster.
lcStack, err := startLifecycle(ctx, store, log)
if err != nil {
stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
return err
}
Comment on lines 72 to 83

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 cdcPipe not drained before store.Close on early-return error paths

PR #58 had added stop() + cdcPipe.Stop() to both the httpd.New() and startLifecycle() error paths. The revert removes those calls, restoring the pre-#58 state where, if either call fails, return err fires and the deferred sequence runs as: termMgr.Close()stop()store.Close(). Cancelling the context causes the CDC poller goroutines to start winding down, but store.Close() fires immediately after stop() with no drain wait. Any CDC goroutine that is mid-query at that instant will attempt to use a closed DB. The fix that was added in #58 — calling stop() and cdcPipe.Stop() before returning — was correct independently of the notifier, and should be retained (without notifier.Stop()) to preserve the safe shutdown ordering.


Expand All @@ -110,7 +98,6 @@ func Run() error {
// the LIFO trap (see comment after srv.Run), hence explicit.
stop()
lcStack.Stop()
notifier.Stop()
if cdcErr := cdcPipe.Stop(); cdcErr != nil {
log.Error("cdc pipeline shutdown", "err", cdcErr)
}
Expand All @@ -126,7 +113,6 @@ func Run() error {
// runs before the cancel — which would hang any non-signal exit path.
stop()
lcStack.Stop()
notifier.Stop()
if err := cdcPipe.Stop(); err != nil {
log.Error("cdc pipeline shutdown", "err", err)
}
Expand Down
28 changes: 0 additions & 28 deletions backend/internal/daemon/notifier_wiring.go

This file was deleted.

1 change: 0 additions & 1 deletion backend/internal/domain/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Notification struct {
CauseKey string
ReadAt time.Time
ArchivedAt time.Time
RoutedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
Expand Down
82 changes: 0 additions & 82 deletions backend/internal/integration/notification_runtime_test.go

This file was deleted.

Loading
Loading