diff --git a/.env.example b/.env.example
index b625355f..c3872c83 100644
--- a/.env.example
+++ b/.env.example
@@ -8,8 +8,9 @@ CCF_JWT_PRIVATE_KEY=private.pem
CCF_JWT_PUBLIC_KEY=public.pem
CCF_API_ALLOWED_ORIGINS="http://localhost:3000,http://localhost:8000"
+CCF_RISK_CONFIG="risk.yaml"
CCF_ENVIRONMENT="" # Defaults to production.
## This configuration disables cookie setting to allow testing on Safari
## It is insecure so use it with caution
-#CCF_ENVIRONMENT="local"
\ No newline at end of file
+#CCF_ENVIRONMENT="local"
diff --git a/cmd/root.go b/cmd/root.go
index 9d164bc0..891c6017 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -46,6 +46,7 @@ func bindEnvironmentVariables() {
viper.MustBindEnv("sso_config")
viper.MustBindEnv("email_config")
viper.MustBindEnv("workflow_config")
+ viper.MustBindEnv("risk_config")
viper.MustBindEnv("metrics_enabled")
viper.MustBindEnv("metrics_port")
viper.MustBindEnv("use_dev_logger")
diff --git a/docs/docs.go b/docs/docs.go
index c85333ec..63ffccea 100644
--- a/docs/docs.go
+++ b/docs/docs.go
@@ -24309,6 +24309,9 @@ const docTemplate = `{
"handler.SubscriptionsResponse": {
"type": "object",
"properties": {
+ "riskNotificationsSubscribed": {
+ "type": "boolean"
+ },
"subscribed": {
"type": "boolean"
},
@@ -24323,6 +24326,9 @@ const docTemplate = `{
"handler.UpdateSubscriptionsRequest": {
"type": "object",
"properties": {
+ "riskNotificationsSubscribed": {
+ "type": "boolean"
+ },
"subscribed": {
"type": "boolean"
},
@@ -32321,6 +32327,10 @@ const docTemplate = `{
"lastName": {
"type": "string"
},
+ "riskNotificationsSubscribed": {
+ "description": "RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.\nThe DB default is intentionally true so existing users are opted in when the column is introduced.",
+ "type": "boolean"
+ },
"taskAvailableEmailSubscribed": {
"description": "TaskAvailableEmailSubscribed indicates if the user wants an email when tasks become available",
"type": "boolean"
diff --git a/docs/swagger.json b/docs/swagger.json
index 4165d632..8405552f 100644
--- a/docs/swagger.json
+++ b/docs/swagger.json
@@ -24303,6 +24303,9 @@
"handler.SubscriptionsResponse": {
"type": "object",
"properties": {
+ "riskNotificationsSubscribed": {
+ "type": "boolean"
+ },
"subscribed": {
"type": "boolean"
},
@@ -24317,6 +24320,9 @@
"handler.UpdateSubscriptionsRequest": {
"type": "object",
"properties": {
+ "riskNotificationsSubscribed": {
+ "type": "boolean"
+ },
"subscribed": {
"type": "boolean"
},
@@ -32315,6 +32321,10 @@
"lastName": {
"type": "string"
},
+ "riskNotificationsSubscribed": {
+ "description": "RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.\nThe DB default is intentionally true so existing users are opted in when the column is introduced.",
+ "type": "boolean"
+ },
"taskAvailableEmailSubscribed": {
"description": "TaskAvailableEmailSubscribed indicates if the user wants an email when tasks become available",
"type": "boolean"
diff --git a/docs/swagger.yaml b/docs/swagger.yaml
index c195c04b..d82db2f4 100644
--- a/docs/swagger.yaml
+++ b/docs/swagger.yaml
@@ -1344,6 +1344,8 @@ definitions:
type: object
handler.SubscriptionsResponse:
properties:
+ riskNotificationsSubscribed:
+ type: boolean
subscribed:
type: boolean
taskAvailableEmailSubscribed:
@@ -1353,6 +1355,8 @@ definitions:
type: object
handler.UpdateSubscriptionsRequest:
properties:
+ riskNotificationsSubscribed:
+ type: boolean
subscribed:
type: boolean
taskAvailableEmailSubscribed:
@@ -6642,6 +6646,11 @@ definitions:
type: string
lastName:
type: string
+ riskNotificationsSubscribed:
+ description: |-
+ RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.
+ The DB default is intentionally true so existing users are opted in when the column is introduced.
+ type: boolean
taskAvailableEmailSubscribed:
description: TaskAvailableEmailSubscribed indicates if the user wants an email
when tasks become available
diff --git a/internal/api/handler/users.go b/internal/api/handler/users.go
index f95098c8..dd23e790 100644
--- a/internal/api/handler/users.go
+++ b/internal/api/handler/users.go
@@ -27,12 +27,14 @@ type SubscriptionsResponse struct {
Subscribed bool `json:"subscribed"`
TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"`
TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"`
+ RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"`
}
type UpdateSubscriptionsRequest struct {
Subscribed *bool `json:"subscribed"`
TaskAvailableEmailSubscribed *bool `json:"taskAvailableEmailSubscribed"`
TaskDailyDigestSubscribed *bool `json:"taskDailyDigestSubscribed"`
+ RiskNotificationsSubscribed *bool `json:"riskNotificationsSubscribed"`
}
func NewUserHandler(sugar *zap.SugaredLogger, db *gorm.DB) *UserHandler {
@@ -437,6 +439,7 @@ func (h *UserHandler) GetSubscriptions(ctx echo.Context) error {
Subscribed: user.DigestSubscribed,
TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed,
TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed,
+ RiskNotificationsSubscribed: user.RiskNotificationsSubscribed,
},
})
}
@@ -484,6 +487,9 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error {
if req.TaskDailyDigestSubscribed != nil {
user.TaskDailyDigestSubscribed = *req.TaskDailyDigestSubscribed
}
+ if req.RiskNotificationsSubscribed != nil {
+ user.RiskNotificationsSubscribed = *req.RiskNotificationsSubscribed
+ }
if err := h.db.Save(&user).Error; err != nil {
h.sugar.Errorw("Failed to update user subscriptions", "error", err)
@@ -496,6 +502,7 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error {
"subscribed", user.DigestSubscribed,
"taskAvailableEmailSubscribed", user.TaskAvailableEmailSubscribed,
"taskDailyDigestSubscribed", user.TaskDailyDigestSubscribed,
+ "riskNotificationsSubscribed", user.RiskNotificationsSubscribed,
)
return ctx.JSON(200, GenericDataResponse[SubscriptionsResponse]{
@@ -503,6 +510,7 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error {
Subscribed: user.DigestSubscribed,
TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed,
TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed,
+ RiskNotificationsSubscribed: user.RiskNotificationsSubscribed,
},
})
}
diff --git a/internal/api/handler/users_integration_test.go b/internal/api/handler/users_integration_test.go
index 5d4a85d8..33d65ca9 100644
--- a/internal/api/handler/users_integration_test.go
+++ b/internal/api/handler/users_integration_test.go
@@ -403,6 +403,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
Subscribed bool `json:"subscribed"`
TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"`
TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"`
+ RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"`
} `json:"data"`
}
err = json.Unmarshal(rec.Body.Bytes(), &response)
@@ -412,6 +413,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
suite.False(response.Data.Subscribed, "Expected default digest subscription to be false")
suite.False(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to default to false")
suite.False(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to default to false")
+ suite.True(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to default to true")
})
suite.Run("UpdateSubscriptions", func() {
@@ -420,6 +422,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
"subscribed": true,
"taskAvailableEmailSubscribed": true,
"taskDailyDigestSubscribed": true,
+ "riskNotificationsSubscribed": false,
}
payloadJSON, err := json.Marshal(payload)
suite.Require().NoError(err, "Failed to marshal update subscriptions request")
@@ -437,6 +440,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
Subscribed bool `json:"subscribed"`
TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"`
TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"`
+ RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"`
} `json:"data"`
}
err = json.Unmarshal(rec.Body.Bytes(), &response)
@@ -445,11 +449,13 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
suite.True(response.Data.Subscribed, "Expected digest subscription to be updated to true")
suite.True(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to be updated to true")
suite.True(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to be updated to true")
+ suite.False(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to be updated to false")
// Test unsubscribing from digest
payload = map[string]interface{}{
- "subscribed": false,
- "taskDailyDigestSubscribed": false,
+ "subscribed": false,
+ "taskDailyDigestSubscribed": false,
+ "riskNotificationsSubscribed": true,
}
payloadJSON, err = json.Marshal(payload)
suite.Require().NoError(err, "Failed to marshal unsubscribe request")
@@ -468,6 +474,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() {
suite.False(response.Data.Subscribed, "Expected digest subscription to be updated to false")
suite.True(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to remain unchanged when omitted")
suite.False(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to be updated to false")
+ suite.True(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to be updated to true")
})
suite.Run("UpdateSubscriptionsInvalidPayload", func() {
diff --git a/internal/config/config.go b/internal/config/config.go
index c0ed9fb4..a71f55c5 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -38,6 +38,7 @@ type Config struct {
DigestEnabled bool // Enable or disable the digest scheduler
DigestSchedule string // Cron schedule for digest emails
Workflow *WorkflowConfig
+ Risk *RiskConfig
}
func NewConfig(logger *zap.SugaredLogger) *Config {
@@ -174,6 +175,16 @@ func NewConfig(logger *zap.SugaredLogger) *Config {
workflowConfig = &WorkflowConfig{SchedulerEnabled: false}
}
+ riskConfigPath := viper.GetString("risk_config")
+ if riskConfigPath == "" {
+ riskConfigPath = "risk.yaml"
+ }
+ riskConfig, err := LoadRiskConfig(riskConfigPath)
+ if err != nil {
+ logger.Warnw("Failed to load risk config, risk jobs will be disabled", "error", err, "path", riskConfigPath)
+ riskConfig = DefaultRiskConfig()
+ }
+
// Worker configuration
workerConfig := DefaultWorkerConfig()
if viper.IsSet("worker_enabled") {
@@ -206,6 +217,7 @@ func NewConfig(logger *zap.SugaredLogger) *Config {
DigestEnabled: digestEnabled,
DigestSchedule: digestSchedule,
Workflow: workflowConfig,
+ Risk: riskConfig,
}
}
diff --git a/internal/config/risk.go b/internal/config/risk.go
new file mode 100644
index 00000000..9ce2a6af
--- /dev/null
+++ b/internal/config/risk.go
@@ -0,0 +1,118 @@
+package config
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/robfig/cron/v3"
+ "github.com/spf13/viper"
+)
+
+// RiskConfig contains configuration for risk-related periodic workers.
+type RiskConfig struct {
+ ReviewDeadlineReminderEnabled bool `mapstructure:"review_deadline_reminder_enabled" yaml:"review_deadline_reminder_enabled" json:"reviewDeadlineReminderEnabled"`
+ ReviewDeadlineReminderSchedule string `mapstructure:"review_deadline_reminder_schedule" yaml:"review_deadline_reminder_schedule" json:"reviewDeadlineReminderSchedule"`
+
+ ReviewOverdueEscalationEnabled bool `mapstructure:"review_overdue_escalation_enabled" yaml:"review_overdue_escalation_enabled" json:"reviewOverdueEscalationEnabled"`
+ ReviewOverdueEscalationSchedule string `mapstructure:"review_overdue_escalation_schedule" yaml:"review_overdue_escalation_schedule" json:"reviewOverdueEscalationSchedule"`
+
+ StaleRiskScannerEnabled bool `mapstructure:"stale_risk_scanner_enabled" yaml:"stale_risk_scanner_enabled" json:"staleRiskScannerEnabled"`
+ StaleRiskScannerSchedule string `mapstructure:"stale_risk_scanner_schedule" yaml:"stale_risk_scanner_schedule" json:"staleRiskScannerSchedule"`
+
+ EvidenceReconciliationEnabled bool `mapstructure:"evidence_reconciliation_enabled" yaml:"evidence_reconciliation_enabled" json:"evidenceReconciliationEnabled"`
+ EvidenceReconciliationSchedule string `mapstructure:"evidence_reconciliation_schedule" yaml:"evidence_reconciliation_schedule" json:"evidenceReconciliationSchedule"`
+
+ AutoReopenEnabled bool `mapstructure:"auto_reopen_enabled" yaml:"auto_reopen_enabled" json:"autoReopenEnabled"`
+ AutoReopenThresholdDays int `mapstructure:"auto_reopen_threshold_days" yaml:"auto_reopen_threshold_days" json:"autoReopenThresholdDays"`
+}
+
+func DefaultRiskConfig() *RiskConfig {
+ return &RiskConfig{
+ ReviewDeadlineReminderEnabled: false,
+ ReviewDeadlineReminderSchedule: "0 0 8 * * *",
+ ReviewOverdueEscalationEnabled: false,
+ ReviewOverdueEscalationSchedule: "0 0 9 * * *",
+ StaleRiskScannerEnabled: false,
+ StaleRiskScannerSchedule: "0 0 10 * * 1",
+ EvidenceReconciliationEnabled: false,
+ EvidenceReconciliationSchedule: "0 30 10 * * *",
+ AutoReopenEnabled: false,
+ AutoReopenThresholdDays: 30,
+ }
+}
+
+func LoadRiskConfig(path string) (*RiskConfig, error) {
+ v := viper.NewWithOptions(viper.KeyDelimiter("::"))
+
+ def := DefaultRiskConfig()
+ v.SetDefault("review_deadline_reminder_enabled", def.ReviewDeadlineReminderEnabled)
+ v.SetDefault("review_deadline_reminder_schedule", def.ReviewDeadlineReminderSchedule)
+ v.SetDefault("review_overdue_escalation_enabled", def.ReviewOverdueEscalationEnabled)
+ v.SetDefault("review_overdue_escalation_schedule", def.ReviewOverdueEscalationSchedule)
+ v.SetDefault("stale_risk_scanner_enabled", def.StaleRiskScannerEnabled)
+ v.SetDefault("stale_risk_scanner_schedule", def.StaleRiskScannerSchedule)
+ v.SetDefault("evidence_reconciliation_enabled", def.EvidenceReconciliationEnabled)
+ v.SetDefault("evidence_reconciliation_schedule", def.EvidenceReconciliationSchedule)
+ v.SetDefault("auto_reopen_enabled", def.AutoReopenEnabled)
+ v.SetDefault("auto_reopen_threshold_days", def.AutoReopenThresholdDays)
+
+ v.SetEnvPrefix("CCF_RISK")
+ v.SetEnvKeyReplacer(strings.NewReplacer("::", "_", ".", "_", "-", "_"))
+ v.AutomaticEnv()
+
+ if path != "" {
+ v.SetConfigFile(path)
+ v.SetConfigType("yaml")
+ if err := v.ReadInConfig(); err != nil {
+ var notFound viper.ConfigFileNotFoundError
+ if !errors.As(err, ¬Found) && !errors.Is(err, os.ErrNotExist) {
+ return nil, fmt.Errorf("failed to read risk config file: %w", err)
+ }
+ }
+ }
+
+ var cfg RiskConfig
+ if err := v.Unmarshal(&cfg); err != nil {
+ return nil, fmt.Errorf("failed to parse risk config: %w", err)
+ }
+ if err := cfg.Validate(); err != nil {
+ return nil, err
+ }
+
+ return &cfg, nil
+}
+
+func (c *RiskConfig) Validate() error {
+ parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
+
+ if c.ReviewDeadlineReminderEnabled {
+ if _, err := parser.Parse(c.ReviewDeadlineReminderSchedule); err != nil {
+ return fmt.Errorf("invalid review_deadline_reminder_schedule: %w", err)
+ }
+ }
+ if c.ReviewOverdueEscalationEnabled {
+ if _, err := parser.Parse(c.ReviewOverdueEscalationSchedule); err != nil {
+ return fmt.Errorf("invalid review_overdue_escalation_schedule: %w", err)
+ }
+ }
+ if c.StaleRiskScannerEnabled {
+ if _, err := parser.Parse(c.StaleRiskScannerSchedule); err != nil {
+ return fmt.Errorf("invalid stale_risk_scanner_schedule: %w", err)
+ }
+ }
+ if c.EvidenceReconciliationEnabled {
+ if _, err := parser.Parse(c.EvidenceReconciliationSchedule); err != nil {
+ return fmt.Errorf("invalid evidence_reconciliation_schedule: %w", err)
+ }
+ }
+ if c.AutoReopenThresholdDays < 0 {
+ return fmt.Errorf("risk auto reopen threshold days must be non-negative")
+ }
+ if c.AutoReopenEnabled && c.AutoReopenThresholdDays <= 0 {
+ return fmt.Errorf("risk auto reopen threshold days must be greater than zero when auto reopen is enabled")
+ }
+
+ return nil
+}
diff --git a/internal/config/risk_test.go b/internal/config/risk_test.go
new file mode 100644
index 00000000..d9d1d722
--- /dev/null
+++ b/internal/config/risk_test.go
@@ -0,0 +1,164 @@
+package config
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestDefaultRiskConfig(t *testing.T) {
+ cfg := DefaultRiskConfig()
+ assert.False(t, cfg.ReviewDeadlineReminderEnabled)
+ assert.Equal(t, "0 0 8 * * *", cfg.ReviewDeadlineReminderSchedule)
+ assert.False(t, cfg.ReviewOverdueEscalationEnabled)
+ assert.Equal(t, "0 0 9 * * *", cfg.ReviewOverdueEscalationSchedule)
+ assert.False(t, cfg.StaleRiskScannerEnabled)
+ assert.Equal(t, "0 0 10 * * 1", cfg.StaleRiskScannerSchedule)
+ assert.False(t, cfg.EvidenceReconciliationEnabled)
+ assert.Equal(t, "0 30 10 * * *", cfg.EvidenceReconciliationSchedule)
+ assert.False(t, cfg.AutoReopenEnabled)
+ assert.Equal(t, 30, cfg.AutoReopenThresholdDays)
+}
+
+func TestLoadRiskConfigDefaults(t *testing.T) {
+ require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_OVERDUE_ESCALATION_ENABLED"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_OVERDUE_ESCALATION_SCHEDULE"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_STALE_RISK_SCANNER_ENABLED"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_STALE_RISK_SCANNER_SCHEDULE"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_EVIDENCE_RECONCILIATION_ENABLED"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_EVIDENCE_RECONCILIATION_SCHEDULE"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_AUTO_REOPEN_ENABLED"))
+ require.NoError(t, os.Unsetenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS"))
+
+ cfg, err := LoadRiskConfig("")
+ require.NoError(t, err)
+ assert.False(t, cfg.ReviewDeadlineReminderEnabled)
+ assert.False(t, cfg.ReviewOverdueEscalationEnabled)
+ assert.False(t, cfg.StaleRiskScannerEnabled)
+ assert.False(t, cfg.EvidenceReconciliationEnabled)
+ assert.False(t, cfg.AutoReopenEnabled)
+ assert.Equal(t, 30, cfg.AutoReopenThresholdDays)
+}
+
+func TestLoadRiskConfigFromEnv(t *testing.T) {
+ require.NoError(t, os.Setenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED", "true"))
+ require.NoError(t, os.Setenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE", "0 15 8 * * *"))
+ require.NoError(t, os.Setenv("CCF_RISK_AUTO_REOPEN_ENABLED", "true"))
+ require.NoError(t, os.Setenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS", "45"))
+ defer func() {
+ _ = os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED")
+ _ = os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE")
+ _ = os.Unsetenv("CCF_RISK_AUTO_REOPEN_ENABLED")
+ _ = os.Unsetenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS")
+ }()
+
+ cfg, err := LoadRiskConfig("")
+ require.NoError(t, err)
+ assert.True(t, cfg.ReviewDeadlineReminderEnabled)
+ assert.Equal(t, "0 15 8 * * *", cfg.ReviewDeadlineReminderSchedule)
+ assert.True(t, cfg.AutoReopenEnabled)
+ assert.Equal(t, 45, cfg.AutoReopenThresholdDays)
+}
+
+func TestLoadRiskConfigFromFile(t *testing.T) {
+ content := `
+review_deadline_reminder_enabled: true
+review_deadline_reminder_schedule: "0 0 8 * * *"
+review_overdue_escalation_enabled: true
+review_overdue_escalation_schedule: "0 0 9 * * *"
+stale_risk_scanner_enabled: true
+stale_risk_scanner_schedule: "0 0 10 * * 1"
+evidence_reconciliation_enabled: true
+evidence_reconciliation_schedule: "0 30 10 * * *"
+auto_reopen_enabled: true
+auto_reopen_threshold_days: 60
+`
+ tmpDir := t.TempDir()
+ cfgPath := filepath.Join(tmpDir, "risk.yaml")
+ require.NoError(t, os.WriteFile(cfgPath, []byte(content), 0644))
+
+ cfg, err := LoadRiskConfig(cfgPath)
+ require.NoError(t, err)
+ assert.True(t, cfg.ReviewDeadlineReminderEnabled)
+ assert.True(t, cfg.ReviewOverdueEscalationEnabled)
+ assert.True(t, cfg.StaleRiskScannerEnabled)
+ assert.True(t, cfg.EvidenceReconciliationEnabled)
+ assert.True(t, cfg.AutoReopenEnabled)
+ assert.Equal(t, 60, cfg.AutoReopenThresholdDays)
+}
+
+func TestLoadRiskConfigMissingFileUsesDefaults(t *testing.T) {
+ cfg, err := LoadRiskConfig(filepath.Join(t.TempDir(), "does-not-exist.yaml"))
+ require.NoError(t, err)
+
+ def := DefaultRiskConfig()
+ assert.Equal(t, def.ReviewDeadlineReminderEnabled, cfg.ReviewDeadlineReminderEnabled)
+ assert.Equal(t, def.ReviewDeadlineReminderSchedule, cfg.ReviewDeadlineReminderSchedule)
+ assert.Equal(t, def.AutoReopenThresholdDays, cfg.AutoReopenThresholdDays)
+}
+
+func TestRiskConfigValidate(t *testing.T) {
+ tests := []struct {
+ name string
+ cfg *RiskConfig
+ wantErr bool
+ }{
+ {
+ name: "valid config",
+ cfg: &RiskConfig{
+ ReviewDeadlineReminderEnabled: true,
+ ReviewDeadlineReminderSchedule: "0 0 8 * * *",
+ AutoReopenThresholdDays: 30,
+ },
+ wantErr: false,
+ },
+ {
+ name: "invalid reminder schedule",
+ cfg: &RiskConfig{
+ ReviewDeadlineReminderEnabled: true,
+ ReviewDeadlineReminderSchedule: "not-cron",
+ },
+ wantErr: true,
+ },
+ {
+ name: "negative threshold",
+ cfg: &RiskConfig{
+ AutoReopenThresholdDays: -1,
+ },
+ wantErr: true,
+ },
+ {
+ name: "zero threshold invalid when auto reopen enabled",
+ cfg: &RiskConfig{
+ AutoReopenEnabled: true,
+ AutoReopenThresholdDays: 0,
+ },
+ wantErr: true,
+ },
+ {
+ name: "disabled schedule may be invalid",
+ cfg: &RiskConfig{
+ ReviewDeadlineReminderEnabled: false,
+ ReviewDeadlineReminderSchedule: "not-cron",
+ AutoReopenThresholdDays: 0,
+ },
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := tt.cfg.Validate()
+ if tt.wantErr {
+ assert.Error(t, err)
+ return
+ }
+ assert.NoError(t, err)
+ })
+ }
+}
diff --git a/internal/config/workflow.go b/internal/config/workflow.go
index 4c2b4953..333e3a38 100644
--- a/internal/config/workflow.go
+++ b/internal/config/workflow.go
@@ -3,6 +3,7 @@ package config
import (
"errors"
"fmt"
+ "os"
"strings"
"github.com/robfig/cron/v3"
@@ -75,7 +76,7 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) {
v.SetConfigType("yaml")
if err := v.ReadInConfig(); err != nil {
var notFound viper.ConfigFileNotFoundError
- if !errors.As(err, ¬Found) {
+ if !errors.As(err, ¬Found) && !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("failed to read workflow config file: %w", err)
}
// If file not found, we continue with defaults and env vars
diff --git a/internal/config/workflow_test.go b/internal/config/workflow_test.go
index 6b06d101..7cf7564c 100644
--- a/internal/config/workflow_test.go
+++ b/internal/config/workflow_test.go
@@ -108,6 +108,16 @@ overdue_check_enabled: false
assert.False(t, config.OverdueCheckEnabled)
}
+func TestLoadWorkflowConfig_MissingFileUsesDefaults(t *testing.T) {
+ config, err := LoadWorkflowConfig(filepath.Join(t.TempDir(), "does-not-exist.yaml"))
+ require.NoError(t, err)
+
+ def := DefaultWorkflowConfig()
+ assert.Equal(t, def.SchedulerEnabled, config.SchedulerEnabled)
+ assert.Equal(t, def.Schedule, config.Schedule)
+ assert.Equal(t, def.GracePeriodDays, config.GracePeriodDays)
+}
+
func TestWorkflowConfig_Validate(t *testing.T) {
tests := []struct {
name string
diff --git a/internal/service/email/templates/service_test.go b/internal/service/email/templates/service_test.go
index baf9f814..216d07de 100644
--- a/internal/service/email/templates/service_test.go
+++ b/internal/service/email/templates/service_test.go
@@ -250,6 +250,35 @@ func TestTemplateService_WorkflowTaskDigest_EmptyTasks(t *testing.T) {
require.Contains(t, html, "Bob")
}
+func TestTemplateService_RiskTemplates(t *testing.T) {
+ service, err := NewTemplateService()
+ require.NoError(t, err)
+
+ data := TemplateData{
+ "OwnerName": "Alice Smith",
+ "RiskTitle": "Weak MFA Controls",
+ "SSPName": "GoodRead SSP",
+ "RiskStatus": "risk-accepted",
+ "ReviewDeadline": "01/mar/2026",
+ "LastSeenAt": "15/feb/2026",
+ "RiskURL": "https://app.example.com/risks/123",
+ }
+
+ for _, name := range []string{
+ "risk-review-due-reminder",
+ "risk-review-overdue-escalation",
+ "risk-stale-open-reminder",
+ } {
+ html, text, err := service.Use(name, data)
+ require.NoError(t, err)
+ require.NotEmpty(t, html)
+ require.NotEmpty(t, text)
+ require.Contains(t, html, "Alice Smith")
+ require.Contains(t, text, "Weak MFA Controls")
+ require.Contains(t, text, "https://app.example.com/risks/123")
+ }
+}
+
func TestTemplateService_ListTemplates(t *testing.T) {
service, err := NewTemplateService()
require.NoError(t, err, "Failed to create template service")
diff --git a/internal/service/email/templates/templates/risk-review-due-reminder.html b/internal/service/email/templates/templates/risk-review-due-reminder.html
new file mode 100644
index 00000000..2e0088fd
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-review-due-reminder.html
@@ -0,0 +1,14 @@
+
+
+
+ Hello {{.OwnerName}},
+ A risk review deadline is approaching.
+
+ - Risk: {{.RiskTitle}}
+ - SSP: {{.SSPName}}
+ - Status: {{.RiskStatus}}
+ - Review deadline: {{.ReviewDeadline}}
+
+ Open risk
+
+
diff --git a/internal/service/email/templates/templates/risk-review-due-reminder.txt b/internal/service/email/templates/templates/risk-review-due-reminder.txt
new file mode 100644
index 00000000..50e7c0e7
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-review-due-reminder.txt
@@ -0,0 +1,10 @@
+Hello {{.OwnerName}},
+
+A risk review deadline is approaching.
+
+Risk: {{.RiskTitle}}
+SSP: {{.SSPName}}
+Status: {{.RiskStatus}}
+Review deadline: {{.ReviewDeadline}}
+
+Open risk: {{.RiskURL}}
diff --git a/internal/service/email/templates/templates/risk-review-overdue-escalation.html b/internal/service/email/templates/templates/risk-review-overdue-escalation.html
new file mode 100644
index 00000000..18b2f902
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-review-overdue-escalation.html
@@ -0,0 +1,14 @@
+
+
+
+ Hello {{.OwnerName}},
+ A risk review is overdue and requires attention.
+
+ - Risk: {{.RiskTitle}}
+ - SSP: {{.SSPName}}
+ - Status: {{.RiskStatus}}
+ - Review deadline: {{.ReviewDeadline}}
+
+ Open risk
+
+
diff --git a/internal/service/email/templates/templates/risk-review-overdue-escalation.txt b/internal/service/email/templates/templates/risk-review-overdue-escalation.txt
new file mode 100644
index 00000000..7768e75f
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-review-overdue-escalation.txt
@@ -0,0 +1,10 @@
+Hello {{.OwnerName}},
+
+A risk review is overdue and requires attention.
+
+Risk: {{.RiskTitle}}
+SSP: {{.SSPName}}
+Status: {{.RiskStatus}}
+Review deadline: {{.ReviewDeadline}}
+
+Open risk: {{.RiskURL}}
diff --git a/internal/service/email/templates/templates/risk-stale-open-reminder.html b/internal/service/email/templates/templates/risk-stale-open-reminder.html
new file mode 100644
index 00000000..98fd8353
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-stale-open-reminder.html
@@ -0,0 +1,14 @@
+
+
+
+ Hello {{.OwnerName}},
+ A risk appears stale and should be reviewed.
+
+ - Risk: {{.RiskTitle}}
+ - SSP: {{.SSPName}}
+ - Status: {{.RiskStatus}}
+ - Last seen: {{.LastSeenAt}}
+
+ Open risk
+
+
diff --git a/internal/service/email/templates/templates/risk-stale-open-reminder.txt b/internal/service/email/templates/templates/risk-stale-open-reminder.txt
new file mode 100644
index 00000000..484ba6f5
--- /dev/null
+++ b/internal/service/email/templates/templates/risk-stale-open-reminder.txt
@@ -0,0 +1,10 @@
+Hello {{.OwnerName}},
+
+A risk appears stale and should be reviewed.
+
+Risk: {{.RiskTitle}}
+SSP: {{.SSPName}}
+Status: {{.RiskStatus}}
+Last seen: {{.LastSeenAt}}
+
+Open risk: {{.RiskURL}}
diff --git a/internal/service/relational/ccf_internal.go b/internal/service/relational/ccf_internal.go
index 4ee7dbf5..c3b233f1 100644
--- a/internal/service/relational/ccf_internal.go
+++ b/internal/service/relational/ccf_internal.go
@@ -35,6 +35,10 @@ type User struct {
TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed" gorm:"default:false"`
// TaskDailyDigestSubscribed indicates if the user wants to receive a daily task digest email
TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed" gorm:"default:false"`
+
+ // RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.
+ // The DB default is intentionally true so existing users are opted in when the column is introduced.
+ RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed" gorm:"default:true"`
}
func (User) TableName() string {
diff --git a/internal/service/relational/risks/service.go b/internal/service/relational/risks/service.go
index 97b12f46..227c1dce 100644
--- a/internal/service/relational/risks/service.go
+++ b/internal/service/relational/risks/service.go
@@ -63,6 +63,9 @@ type ReviewRiskParams struct {
Decision RiskReviewDecision
Notes *string
NextReviewDeadline *time.Time
+ // RequireCurrentReviewDeadlineBefore enforces, under lock, that the current review deadline
+ // is set and no later than this timestamp before applying the decision.
+ RequireCurrentReviewDeadlineBefore *time.Time
}
type Associations struct {
@@ -339,6 +342,13 @@ func (s *RiskService) ReviewRisk(params ReviewRiskParams) (*Risk, error) {
tx.Rollback()
return nil, newValidationError("only risks in status risk-accepted can be reviewed")
}
+ if params.RequireCurrentReviewDeadlineBefore != nil {
+ cutoff := params.RequireCurrentReviewDeadlineBefore.UTC()
+ if risk.ReviewDeadline == nil || risk.ReviewDeadline.UTC().After(cutoff) {
+ tx.Rollback()
+ return nil, newValidationError("risk review deadline no longer eligible for requested decision")
+ }
+ }
reviewedAt := time.Now().UTC()
if params.ReviewedAt != nil {
diff --git a/internal/service/worker/jobs.go b/internal/service/worker/jobs.go
index a8c9628a..91aeb51c 100644
--- a/internal/service/worker/jobs.go
+++ b/internal/service/worker/jobs.go
@@ -173,6 +173,7 @@ type NotificationUser struct {
LastName string
TaskAvailableEmailSubscribed bool
TaskDailyDigestSubscribed bool
+ RiskNotificationsSubscribed bool
}
func (u NotificationUser) FullName() string {
@@ -660,6 +661,12 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us
if db != nil {
riskEvidenceWorker := NewRiskEvidenceWorker(db, logger)
river.AddWorker(workers, river.WorkFunc(riskEvidenceWorker.Work))
+
+ riskReconcileDuplicatesWorker := NewRiskReconcileDuplicatesWorker(db, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReconcileDuplicatesWorker.Work))
+
+ riskReviewOverdueReopenWorker := NewRiskReviewOverdueReopenWorker(db, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReviewOverdueReopenWorker.Work))
}
// Register workflow notification workers if dependencies are available
@@ -676,6 +683,15 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us
workflowExecutionFailedWorker := NewWorkflowExecutionFailedWorker(db, emailService, userRepo, webBaseURL, logger)
river.AddWorker(workers, river.WorkFunc(workflowExecutionFailedWorker.Work))
+
+ riskReviewDueReminderWorker := NewRiskReviewDueReminderWorker(db, emailService, userRepo, webBaseURL, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReviewDueReminderWorker.Work))
+
+ riskReviewOverdueEscalationWorker := NewRiskReviewOverdueEscalationWorker(db, emailService, userRepo, webBaseURL, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReviewOverdueEscalationWorker.Work))
+
+ riskStaleOpenReminderWorker := NewRiskStaleOpenReminderWorker(db, emailService, userRepo, webBaseURL, logger)
+ river.AddWorker(workers, river.WorkFunc(riskStaleOpenReminderWorker.Work))
}
}
diff --git a/internal/service/worker/risk_job_types.go b/internal/service/worker/risk_job_types.go
new file mode 100644
index 00000000..23a0a421
--- /dev/null
+++ b/internal/service/worker/risk_job_types.go
@@ -0,0 +1,105 @@
+package worker
+
+import (
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/riverqueue/river"
+)
+
+const (
+ JobTypeRiskReviewDeadlineReminderScanner = "risk_review_deadline_reminder_scanner"
+ JobTypeRiskReviewOverdueEscalationScanner = "risk_review_overdue_escalation_scanner"
+ JobTypeRiskStaleRiskScanner = "risk_stale_risk_scanner"
+ JobTypeRiskEvidenceReconciliationScanner = "risk_evidence_reconciliation_scanner"
+
+ JobTypeRiskReviewDueReminder = "risk_review_due_reminder"
+ JobTypeRiskReviewOverdueEscalation = "risk_review_overdue_escalation"
+ JobTypeRiskStaleOpenReminder = "risk_stale_open_reminder"
+ JobTypeRiskReconcileDuplicates = "risk_reconcile_duplicates"
+ JobTypeRiskReviewOverdueReopen = "risk_review_overdue_reopen"
+)
+
+type RiskReviewDeadlineReminderScannerArgs struct{}
+type RiskReviewOverdueEscalationScannerArgs struct{}
+type RiskStaleRiskScannerArgs struct{}
+type RiskEvidenceReconciliationScannerArgs struct{}
+
+type RiskReviewDueReminderArgs struct {
+ RiskID uuid.UUID `json:"risk_id"`
+ OwnerUserID uuid.UUID `json:"owner_user_id"`
+ ReviewDeadline string `json:"review_deadline"`
+ ReminderWindow string `json:"reminder_window"`
+}
+
+type RiskReviewOverdueEscalationArgs struct {
+ RiskID uuid.UUID `json:"risk_id"`
+ OwnerUserID uuid.UUID `json:"owner_user_id"`
+ ReviewDeadline string `json:"review_deadline"`
+ OverdueWindow string `json:"overdue_window"`
+}
+
+type RiskStaleOpenReminderArgs struct {
+ RiskID uuid.UUID `json:"risk_id"`
+ OwnerUserID uuid.UUID `json:"owner_user_id"`
+ LastSeenAt string `json:"last_seen_at"`
+ StaleBucketDate string `json:"stale_bucket_date"`
+}
+
+type RiskReconcileDuplicatesArgs struct {
+ DedupeKey string `json:"dedupe_key"`
+}
+
+type RiskReviewOverdueReopenArgs struct {
+ RiskID uuid.UUID `json:"risk_id"`
+ ReviewDeadline string `json:"review_deadline"`
+ ThresholdDays int `json:"threshold_days"`
+}
+
+func (RiskReviewDeadlineReminderScannerArgs) Kind() string {
+ return JobTypeRiskReviewDeadlineReminderScanner
+}
+func (RiskReviewOverdueEscalationScannerArgs) Kind() string {
+ return JobTypeRiskReviewOverdueEscalationScanner
+}
+func (RiskStaleRiskScannerArgs) Kind() string { return JobTypeRiskStaleRiskScanner }
+func (RiskEvidenceReconciliationScannerArgs) Kind() string {
+ return JobTypeRiskEvidenceReconciliationScanner
+}
+func (RiskReviewDueReminderArgs) Kind() string { return JobTypeRiskReviewDueReminder }
+func (RiskReviewOverdueEscalationArgs) Kind() string { return JobTypeRiskReviewOverdueEscalation }
+func (RiskStaleOpenReminderArgs) Kind() string { return JobTypeRiskStaleOpenReminder }
+func (RiskReconcileDuplicatesArgs) Kind() string { return JobTypeRiskReconcileDuplicates }
+func (RiskReviewOverdueReopenArgs) Kind() string { return JobTypeRiskReviewOverdueReopen }
+
+func (RiskReviewDeadlineReminderScannerArgs) Timeout() time.Duration { return 5 * time.Minute }
+func (RiskReviewOverdueEscalationScannerArgs) Timeout() time.Duration { return 5 * time.Minute }
+func (RiskStaleRiskScannerArgs) Timeout() time.Duration { return 5 * time.Minute }
+func (RiskEvidenceReconciliationScannerArgs) Timeout() time.Duration { return 5 * time.Minute }
+func (RiskReviewDueReminderArgs) Timeout() time.Duration { return 30 * time.Second }
+func (RiskReviewOverdueEscalationArgs) Timeout() time.Duration { return 30 * time.Second }
+func (RiskStaleOpenReminderArgs) Timeout() time.Duration { return 30 * time.Second }
+func (RiskReconcileDuplicatesArgs) Timeout() time.Duration { return 2 * time.Minute }
+func (RiskReviewOverdueReopenArgs) Timeout() time.Duration { return 30 * time.Second }
+
+func JobInsertOptionsForRiskNotification(byPeriod time.Duration) *river.InsertOpts {
+ return &river.InsertOpts{
+ Queue: "email",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: byPeriod,
+ },
+ }
+}
+
+func JobInsertOptionsForRiskWorkerUnique(byPeriod time.Duration) *river.InsertOpts {
+ return &river.InsertOpts{
+ Queue: "risk",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: byPeriod,
+ },
+ }
+}
diff --git a/internal/service/worker/risk_workers.go b/internal/service/worker/risk_workers.go
new file mode 100644
index 00000000..a2c5e831
--- /dev/null
+++ b/internal/service/worker/risk_workers.go
@@ -0,0 +1,629 @@
+package worker
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/compliance-framework/api/internal/service/email/types"
+ "github.com/compliance-framework/api/internal/service/relational"
+ riskrel "github.com/compliance-framework/api/internal/service/relational/risks"
+ "github.com/compliance-framework/api/internal/workflow"
+ "github.com/google/uuid"
+ "github.com/riverqueue/river"
+ "go.uber.org/zap"
+ "gorm.io/gorm"
+)
+
+const riskScannerBatchSize = 1000
+
+type RiskReviewDeadlineReminderScannerWorker struct {
+ db *gorm.DB
+ client workflow.RiverClient
+ logger *zap.SugaredLogger
+}
+
+func NewRiskReviewDeadlineReminderScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskReviewDeadlineReminderScannerWorker {
+ return &RiskReviewDeadlineReminderScannerWorker{db: db, client: client, logger: logger}
+}
+
+func (w *RiskReviewDeadlineReminderScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewDeadlineReminderScannerArgs]) error {
+ now := time.Now().UTC()
+ windowEnd := now.Add(30 * 24 * time.Hour)
+
+ var (
+ risks []riskrel.Risk
+ totalEnqueued int
+ )
+ if err := w.db.WithContext(ctx).
+ Where("status = ? AND review_deadline IS NOT NULL AND review_deadline > ? AND review_deadline <= ?",
+ string(riskrel.RiskStatusRiskAccepted), now, windowEnd).
+ FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error {
+ ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks)
+ if err != nil {
+ return fmt.Errorf("risk deadline reminder scanner: resolve owners failed: %w", err)
+ }
+
+ params := make([]river.InsertManyParams, 0, len(risks))
+ for i := range risks {
+ risk := &risks[i]
+ if risk.ID == nil {
+ continue
+ }
+ ownerIDs := ownersByRiskID[*risk.ID]
+ for _, ownerID := range ownerIDs {
+ params = append(params, river.InsertManyParams{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339),
+ ReminderWindow: "30d",
+ },
+ InsertOpts: JobInsertOptionsForRiskNotification(24 * time.Hour),
+ })
+ }
+ }
+
+ if len(params) == 0 {
+ return nil
+ }
+
+ if _, err := w.client.InsertMany(ctx, params); err != nil {
+ return fmt.Errorf("risk deadline reminder scanner: enqueue failed: %w", err)
+ }
+ totalEnqueued += len(params)
+ return nil
+ }).Error; err != nil {
+ return fmt.Errorf("risk deadline reminder scanner: query failed: %w", err)
+ }
+
+ if totalEnqueued == 0 {
+ w.logger.Infow("RiskReviewDeadlineReminderScannerWorker: no reminders to enqueue")
+ return nil
+ }
+
+ w.logger.Infow("RiskReviewDeadlineReminderScannerWorker: enqueued reminders", "count", totalEnqueued)
+ return nil
+}
+
+type RiskReviewOverdueEscalationScannerWorker struct {
+ db *gorm.DB
+ client workflow.RiverClient
+ logger *zap.SugaredLogger
+ autoReopenEnabled bool
+ autoReopenThresholdDays int
+}
+
+func NewRiskReviewOverdueEscalationScannerWorker(
+ db *gorm.DB,
+ client workflow.RiverClient,
+ logger *zap.SugaredLogger,
+ autoReopenEnabled bool,
+ autoReopenThresholdDays int,
+) *RiskReviewOverdueEscalationScannerWorker {
+ return &RiskReviewOverdueEscalationScannerWorker{
+ db: db,
+ client: client,
+ logger: logger,
+ autoReopenEnabled: autoReopenEnabled,
+ autoReopenThresholdDays: autoReopenThresholdDays,
+ }
+}
+
+func (w *RiskReviewOverdueEscalationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewOverdueEscalationScannerArgs]) error {
+ now := time.Now().UTC()
+ threshold := time.Duration(w.autoReopenThresholdDays) * 24 * time.Hour
+
+ var (
+ risks []riskrel.Risk
+ totalEnqueued int
+ totalReopenCount int
+ )
+ if err := w.db.WithContext(ctx).
+ Where("status = ? AND review_deadline IS NOT NULL AND review_deadline < ?",
+ string(riskrel.RiskStatusRiskAccepted), now).
+ FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error {
+ ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks)
+ if err != nil {
+ return fmt.Errorf("risk overdue escalation scanner: resolve owners failed: %w", err)
+ }
+
+ params := make([]river.InsertManyParams, 0, len(risks))
+ overdueWindow := now.Format("2006-01-02")
+ for i := range risks {
+ risk := &risks[i]
+ if risk.ID == nil {
+ continue
+ }
+ ownerIDs := ownersByRiskID[*risk.ID]
+ for _, ownerID := range ownerIDs {
+ params = append(params, river.InsertManyParams{
+ Args: RiskReviewOverdueEscalationArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339),
+ OverdueWindow: overdueWindow,
+ },
+ InsertOpts: JobInsertOptionsForRiskNotification(24 * time.Hour),
+ })
+ }
+
+ if w.autoReopenEnabled && w.autoReopenThresholdDays > 0 {
+ overdueFor := now.Sub(risk.ReviewDeadline.UTC())
+ if overdueFor >= threshold {
+ params = append(params, river.InsertManyParams{
+ Args: RiskReviewOverdueReopenArgs{
+ RiskID: *risk.ID,
+ ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339),
+ ThresholdDays: w.autoReopenThresholdDays,
+ },
+ InsertOpts: JobInsertOptionsForRiskWorkerUnique(24 * time.Hour),
+ })
+ totalReopenCount++
+ }
+ }
+ }
+
+ if len(params) == 0 {
+ return nil
+ }
+
+ if _, err := w.client.InsertMany(ctx, params); err != nil {
+ return fmt.Errorf("risk overdue escalation scanner: enqueue failed: %w", err)
+ }
+ totalEnqueued += len(params)
+ return nil
+ }).Error; err != nil {
+ return fmt.Errorf("risk overdue escalation scanner: query failed: %w", err)
+ }
+
+ if totalEnqueued == 0 {
+ w.logger.Infow("RiskReviewOverdueEscalationScannerWorker: no escalations to enqueue")
+ return nil
+ }
+
+ w.logger.Infow("RiskReviewOverdueEscalationScannerWorker: enqueued jobs", "count", totalEnqueued, "reopen_count", totalReopenCount)
+ return nil
+}
+
+type RiskStaleRiskScannerWorker struct {
+ db *gorm.DB
+ client workflow.RiverClient
+ logger *zap.SugaredLogger
+}
+
+func NewRiskStaleRiskScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskStaleRiskScannerWorker {
+ return &RiskStaleRiskScannerWorker{db: db, client: client, logger: logger}
+}
+
+func (w *RiskStaleRiskScannerWorker) Work(ctx context.Context, _ *river.Job[RiskStaleRiskScannerArgs]) error {
+ now := time.Now().UTC()
+ cutoff := now.Add(-30 * 24 * time.Hour)
+ staleBucketDate := startOfISOWeekUTC(now).Format("2006-01-02")
+
+ var (
+ risks []riskrel.Risk
+ totalEnqueued int
+ )
+ if err := w.db.WithContext(ctx).
+ Where("status IN ? AND last_seen_at <= ?",
+ []string{
+ string(riskrel.RiskStatusOpen),
+ string(riskrel.RiskStatusInvestigating),
+ string(riskrel.RiskStatusMitigatingPlanned),
+ string(riskrel.RiskStatusMitigatingImplemented),
+ },
+ cutoff).
+ FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error {
+ ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks)
+ if err != nil {
+ return fmt.Errorf("risk stale scanner: resolve owners failed: %w", err)
+ }
+
+ params := make([]river.InsertManyParams, 0, len(risks))
+ for i := range risks {
+ risk := &risks[i]
+ if risk.ID == nil {
+ continue
+ }
+ ownerIDs := ownersByRiskID[*risk.ID]
+ for _, ownerID := range ownerIDs {
+ params = append(params, river.InsertManyParams{
+ Args: RiskStaleOpenReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ LastSeenAt: risk.LastSeenAt.UTC().Format(time.RFC3339),
+ StaleBucketDate: staleBucketDate,
+ },
+ InsertOpts: JobInsertOptionsForRiskNotification(7 * 24 * time.Hour),
+ })
+ }
+ }
+
+ if len(params) == 0 {
+ return nil
+ }
+
+ if _, err := w.client.InsertMany(ctx, params); err != nil {
+ return fmt.Errorf("risk stale scanner: enqueue failed: %w", err)
+ }
+ totalEnqueued += len(params)
+ return nil
+ }).Error; err != nil {
+ return fmt.Errorf("risk stale scanner: query failed: %w", err)
+ }
+
+ if totalEnqueued == 0 {
+ w.logger.Infow("RiskStaleRiskScannerWorker: no stale reminders to enqueue")
+ return nil
+ }
+
+ w.logger.Infow("RiskStaleRiskScannerWorker: enqueued stale reminders", "count", totalEnqueued)
+ return nil
+}
+
+type RiskEvidenceReconciliationScannerWorker struct {
+ db *gorm.DB
+ client workflow.RiverClient
+ logger *zap.SugaredLogger
+}
+
+func NewRiskEvidenceReconciliationScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskEvidenceReconciliationScannerWorker {
+ return &RiskEvidenceReconciliationScannerWorker{db: db, client: client, logger: logger}
+}
+
+func (w *RiskEvidenceReconciliationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskEvidenceReconciliationScannerArgs]) error {
+ var params []river.InsertManyParams
+
+ // Duplicate active risks with same dedupe key.
+ var duplicateKeys []string
+ if err := w.db.WithContext(ctx).
+ Model(&riskrel.Risk{}).
+ Select("dedupe_key").
+ Where("status <> ? AND dedupe_key <> ''", string(riskrel.RiskStatusClosed)).
+ Group("dedupe_key").
+ Having("COUNT(*) > 1").
+ Pluck("dedupe_key", &duplicateKeys).Error; err != nil {
+ return fmt.Errorf("risk reconciliation scanner: query duplicate dedupe keys failed: %w", err)
+ }
+ for _, key := range duplicateKeys {
+ params = append(params, river.InsertManyParams{
+ Args: RiskReconcileDuplicatesArgs{DedupeKey: key},
+ InsertOpts: JobInsertOptionsForRiskWorkerUnique(24 * time.Hour),
+ })
+ }
+
+ if len(params) == 0 {
+ w.logger.Infow("RiskEvidenceReconciliationScannerWorker: no reconciliation jobs to enqueue")
+ return nil
+ }
+
+ if _, err := w.client.InsertMany(ctx, params); err != nil {
+ return fmt.Errorf("risk reconciliation scanner: enqueue failed: %w", err)
+ }
+
+ w.logger.Infow("RiskEvidenceReconciliationScannerWorker: enqueued reconciliation jobs", "count", len(params))
+ return nil
+}
+
+type RiskReviewDueReminderWorker struct {
+ db *gorm.DB
+ emailService EmailService
+ userRepo UserRepository
+ webBaseURL string
+ logger *zap.SugaredLogger
+}
+
+func NewRiskReviewDueReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewDueReminderWorker {
+ return &RiskReviewDueReminderWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger}
+}
+
+func (w *RiskReviewDueReminderWorker) Work(ctx context.Context, job *river.Job[RiskReviewDueReminderArgs]) error {
+ return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-review-due-reminder", "Risk review due soon")
+}
+
+type RiskReviewOverdueEscalationWorker struct {
+ db *gorm.DB
+ emailService EmailService
+ userRepo UserRepository
+ webBaseURL string
+ logger *zap.SugaredLogger
+}
+
+func NewRiskReviewOverdueEscalationWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewOverdueEscalationWorker {
+ return &RiskReviewOverdueEscalationWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger}
+}
+
+func (w *RiskReviewOverdueEscalationWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueEscalationArgs]) error {
+ return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-review-overdue-escalation", "Risk review overdue")
+}
+
+type RiskStaleOpenReminderWorker struct {
+ db *gorm.DB
+ emailService EmailService
+ userRepo UserRepository
+ webBaseURL string
+ logger *zap.SugaredLogger
+}
+
+func NewRiskStaleOpenReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskStaleOpenReminderWorker {
+ return &RiskStaleOpenReminderWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger}
+}
+
+func (w *RiskStaleOpenReminderWorker) Work(ctx context.Context, job *river.Job[RiskStaleOpenReminderArgs]) error {
+ return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-stale-open-reminder", "Stale risk reminder")
+}
+
+func (w *RiskReviewDueReminderWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error {
+ return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix)
+}
+
+func (w *RiskReviewOverdueEscalationWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error {
+ return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix)
+}
+
+func (w *RiskStaleOpenReminderWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error {
+ return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix)
+}
+
+func sendRiskNotification(
+ ctx context.Context,
+ db *gorm.DB,
+ emailService EmailService,
+ userRepo UserRepository,
+ webBaseURL string,
+ logger *zap.SugaredLogger,
+ riskID, ownerUserID uuid.UUID,
+ templateName, subjectPrefix string,
+) error {
+ var risk riskrel.Risk
+ if err := db.WithContext(ctx).First(&risk, "id = ?", riskID).Error; err != nil {
+ if errors.Is(err, gorm.ErrRecordNotFound) {
+ logger.Warnw("Risk notification worker: risk not found, skipping", "risk_id", riskID)
+ return nil
+ }
+ return fmt.Errorf("risk notification worker: load risk failed: %w", err)
+ }
+
+ user, err := userRepo.FindUserByID(ctx, ownerUserID.String())
+ if err != nil {
+ if errors.Is(err, gorm.ErrRecordNotFound) {
+ logger.Warnw("Risk notification worker: owner user not found, skipping", "risk_id", riskID, "owner_user_id", ownerUserID, "error", err)
+ return nil
+ }
+ return fmt.Errorf("risk notification worker: load owner user failed: %w", err)
+ }
+ if !user.RiskNotificationsSubscribed {
+ logger.Debugw("Risk notification worker: owner unsubscribed, skipping", "risk_id", riskID, "owner_user_id", ownerUserID)
+ return nil
+ }
+
+ riskTitle := strings.TrimSpace(risk.Title)
+ if riskTitle == "" {
+ riskTitle = riskID.String()
+ }
+
+ sspName := resolveSSPDisplayName(ctx, db, risk.SSPID)
+ reviewDeadline := ""
+ if risk.ReviewDeadline != nil {
+ reviewDeadline = formatDate(*risk.ReviewDeadline)
+ }
+
+ templateData := map[string]interface{}{
+ "OwnerName": user.FullName(),
+ "RiskTitle": riskTitle,
+ "SSPName": sspName,
+ "RiskStatus": risk.Status,
+ "ReviewDeadline": reviewDeadline,
+ "LastSeenAt": formatDate(risk.LastSeenAt),
+ "RiskURL": resolveRiskURL(webBaseURL, riskID),
+ }
+
+ htmlBody, textBody, err := emailService.UseTemplate(templateName, templateData)
+ if err != nil {
+ return fmt.Errorf("risk notification worker: render template %q failed: %w", templateName, err)
+ }
+
+ message := &types.Message{
+ From: emailService.GetDefaultFromAddress(),
+ To: []string{user.Email},
+ Subject: fmt.Sprintf("%s: %s", subjectPrefix, riskTitle),
+ HTMLBody: htmlBody,
+ TextBody: textBody,
+ }
+ result, err := emailService.Send(ctx, message)
+ if err != nil {
+ return fmt.Errorf("risk notification worker: send email failed: %w", err)
+ }
+ if !result.Success {
+ return fmt.Errorf("risk notification worker: email send failed: %s", result.Error)
+ }
+
+ logger.Infow("Risk notification worker: email sent",
+ "risk_id", riskID,
+ "owner_user_id", ownerUserID,
+ "template", templateName,
+ "message_id", result.MessageID,
+ )
+ return nil
+}
+
+type RiskReconcileDuplicatesWorker struct {
+ db *gorm.DB
+ logger *zap.SugaredLogger
+}
+
+func NewRiskReconcileDuplicatesWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReconcileDuplicatesWorker {
+ return &RiskReconcileDuplicatesWorker{db: db, logger: logger}
+}
+
+func (w *RiskReconcileDuplicatesWorker) Work(ctx context.Context, job *river.Job[RiskReconcileDuplicatesArgs]) error {
+ if strings.TrimSpace(job.Args.DedupeKey) == "" {
+ return nil
+ }
+
+ var duplicates []riskrel.Risk
+ if err := w.db.WithContext(ctx).
+ Where("dedupe_key = ? AND status <> ?", job.Args.DedupeKey, string(riskrel.RiskStatusClosed)).
+ Order("created_at ASC, id ASC").
+ Find(&duplicates).Error; err != nil {
+ return fmt.Errorf("risk reconcile duplicates: load duplicates failed: %w", err)
+ }
+ if len(duplicates) <= 1 {
+ return nil
+ }
+
+ riskSvc := riskrel.NewRiskService(w.db.WithContext(ctx))
+ keeper := duplicates[0]
+ for i := 1; i < len(duplicates); i++ {
+ risk := duplicates[i]
+ oldStatus := risk.Status
+ risk.Status = string(riskrel.RiskStatusClosed)
+ if _, err := riskSvc.Update(riskrel.UpdateRiskParams{
+ Risk: &risk,
+ OldStatus: oldStatus,
+ StatusChanged: oldStatus != risk.Status,
+ }); err != nil {
+ return fmt.Errorf("risk reconcile duplicates: close duplicate %s failed: %w", risk.ID.String(), err)
+ }
+ }
+
+ w.logger.Infow("RiskReconcileDuplicatesWorker: reconciled duplicates",
+ "dedupe_key", job.Args.DedupeKey,
+ "kept_risk_id", keeper.ID.String(),
+ "closed_count", len(duplicates)-1,
+ )
+ return nil
+}
+
+type RiskReviewOverdueReopenWorker struct {
+ db *gorm.DB
+ logger *zap.SugaredLogger
+}
+
+func NewRiskReviewOverdueReopenWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReviewOverdueReopenWorker {
+ return &RiskReviewOverdueReopenWorker{db: db, logger: logger}
+}
+
+func (w *RiskReviewOverdueReopenWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueReopenArgs]) error {
+ if job.Args.ThresholdDays <= 0 {
+ w.logger.Infow("RiskReviewOverdueReopenWorker: skipping reopen due to non-positive threshold_days",
+ "risk_id", job.Args.RiskID,
+ "threshold_days", job.Args.ThresholdDays,
+ )
+ return nil
+ }
+
+ now := time.Now().UTC()
+ threshold := time.Duration(job.Args.ThresholdDays) * 24 * time.Hour
+ cutoff := now.Add(-threshold)
+
+ riskSvc := riskrel.NewRiskService(w.db.WithContext(ctx))
+ if _, err := riskSvc.ReviewRisk(riskrel.ReviewRiskParams{
+ RiskID: job.Args.RiskID,
+ Decision: riskrel.RiskReviewDecisionReopen,
+ ReviewedAt: &now,
+ RequireCurrentReviewDeadlineBefore: &cutoff,
+ }); err != nil {
+ if errors.Is(err, gorm.ErrRecordNotFound) || riskrel.IsValidationError(err) {
+ return nil
+ }
+ return fmt.Errorf("risk overdue reopen: reopen review failed: %w", err)
+ }
+
+ w.logger.Infow("RiskReviewOverdueReopenWorker: reopened overdue accepted risk",
+ "risk_id", job.Args.RiskID,
+ "threshold_days", job.Args.ThresholdDays,
+ )
+ return nil
+}
+
+func resolveRiskOwnerUserIDsBatch(ctx context.Context, db *gorm.DB, risks []riskrel.Risk) (map[uuid.UUID][]uuid.UUID, error) {
+ ownerSetByRiskID := make(map[uuid.UUID]map[uuid.UUID]struct{}, len(risks))
+ riskIDs := make([]uuid.UUID, 0, len(risks))
+ for i := range risks {
+ risk := &risks[i]
+ if risk.ID == nil {
+ continue
+ }
+ riskID := *risk.ID
+ riskIDs = append(riskIDs, riskID)
+ if _, ok := ownerSetByRiskID[riskID]; !ok {
+ ownerSetByRiskID[riskID] = make(map[uuid.UUID]struct{}, 4)
+ }
+ if risk.PrimaryOwnerUserID != nil {
+ ownerSetByRiskID[riskID][*risk.PrimaryOwnerUserID] = struct{}{}
+ }
+ }
+ if len(riskIDs) == 0 {
+ return map[uuid.UUID][]uuid.UUID{}, nil
+ }
+
+ var assignments []riskrel.RiskOwnerAssignment
+ if err := db.WithContext(ctx).
+ Where("risk_id IN ? AND owner_kind = ?", riskIDs, "user").
+ Find(&assignments).Error; err != nil {
+ return nil, err
+ }
+
+ for _, assignment := range assignments {
+ parsed, err := uuid.Parse(assignment.OwnerRef)
+ if err != nil {
+ continue
+ }
+ set, ok := ownerSetByRiskID[assignment.RiskID]
+ if !ok {
+ set = make(map[uuid.UUID]struct{}, 1)
+ ownerSetByRiskID[assignment.RiskID] = set
+ }
+ set[parsed] = struct{}{}
+ }
+
+ ownersByRiskID := make(map[uuid.UUID][]uuid.UUID, len(ownerSetByRiskID))
+ for riskID, ownerSet := range ownerSetByRiskID {
+ owners := make([]uuid.UUID, 0, len(ownerSet))
+ for ownerID := range ownerSet {
+ owners = append(owners, ownerID)
+ }
+ sort.Slice(owners, func(i, j int) bool { return owners[i].String() < owners[j].String() })
+ ownersByRiskID[riskID] = owners
+ }
+
+ return ownersByRiskID, nil
+}
+
+func resolveSSPDisplayName(ctx context.Context, db *gorm.DB, sspID uuid.UUID) string {
+ var sc relational.SystemCharacteristics
+ if err := db.WithContext(ctx).
+ Select("system_name_short", "system_name").
+ First(&sc, "system_security_plan_id = ?", sspID).Error; err != nil {
+ return sspID.String()
+ }
+
+ name := strings.TrimSpace(sc.SystemNameShort)
+ if name != "" {
+ return name
+ }
+ name = strings.TrimSpace(sc.SystemName)
+ if name != "" {
+ return name
+ }
+ return sspID.String()
+}
+
+func resolveRiskURL(webBaseURL string, riskID uuid.UUID) string {
+ base := strings.TrimRight(webBaseURL, "/")
+ return base + "/risks/" + riskID.String()
+}
+
+func startOfISOWeekUTC(t time.Time) time.Time {
+ d := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
+ wd := int(d.Weekday())
+ if wd == 0 {
+ wd = 7
+ }
+ return d.AddDate(0, 0, -(wd - 1))
+}
diff --git a/internal/service/worker/risk_workers_test.go b/internal/service/worker/risk_workers_test.go
new file mode 100644
index 00000000..edb3e2f4
--- /dev/null
+++ b/internal/service/worker/risk_workers_test.go
@@ -0,0 +1,599 @@
+package worker
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/compliance-framework/api/internal/service/email/types"
+ "github.com/compliance-framework/api/internal/service/relational"
+ riskrel "github.com/compliance-framework/api/internal/service/relational/risks"
+ "github.com/google/uuid"
+ "github.com/riverqueue/river"
+ "github.com/riverqueue/river/rivertype"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap"
+ "gorm.io/driver/sqlite"
+ "gorm.io/gorm"
+)
+
+type stubRiverClient struct {
+ params []river.InsertManyParams
+ err error
+}
+
+func (s *stubRiverClient) InsertMany(_ context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) {
+ s.params = append(s.params, params...)
+ if s.err != nil {
+ return nil, s.err
+ }
+ return []*rivertype.JobInsertResult{}, nil
+}
+
+type stubUserRepository struct {
+ user NotificationUser
+ err error
+}
+
+func (s *stubUserRepository) FindUserByID(_ context.Context, _ string) (NotificationUser, error) {
+ if s.err != nil {
+ return NotificationUser{}, s.err
+ }
+ return s.user, nil
+}
+
+func newRiskWorkersTestDB(t *testing.T) *gorm.DB {
+ t.Helper()
+ db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
+ require.NoError(t, err)
+ require.NoError(t, db.AutoMigrate(
+ &relational.User{},
+ &relational.SystemSecurityPlan{},
+ &relational.SystemCharacteristics{},
+ &relational.Evidence{},
+ &relational.Labels{},
+ &relational.AssessmentSubject{},
+ &relational.SystemComponent{},
+ &relational.InventoryItem{},
+ &riskrel.Risk{},
+ &riskrel.RiskOwnerAssignment{},
+ &riskrel.RiskEvidenceLink{},
+ &riskrel.RiskSubjectLink{},
+ &riskrel.RiskEvent{},
+ &riskrel.RiskReview{},
+ ))
+ return db
+}
+
+func createTestRiskWithOwner(t *testing.T, db *gorm.DB, status riskrel.RiskStatus, reviewDeadline *time.Time, lastSeenAt time.Time) (riskrel.Risk, uuid.UUID) {
+ t.Helper()
+ sspID := uuid.New()
+ require.NoError(t, db.Create(&relational.SystemSecurityPlan{
+ UUIDModel: relational.UUIDModel{ID: &sspID},
+ }).Error)
+
+ ownerID := uuid.New()
+ riskID := uuid.New()
+ r := riskrel.Risk{
+ UUIDModel: relational.UUIDModel{ID: &riskID},
+ Title: "Test Risk",
+ Description: "Test Risk Description",
+ Status: string(status),
+ SSPID: sspID,
+ SourceType: string(riskrel.RiskSourceTypeManual),
+ ReviewDeadline: reviewDeadline,
+ FirstSeenAt: time.Now().UTC().Add(-24 * time.Hour),
+ LastSeenAt: lastSeenAt,
+ }
+ require.NoError(t, db.Create(&r).Error)
+ require.NoError(t, db.Create(&riskrel.RiskOwnerAssignment{
+ RiskID: riskID,
+ OwnerKind: "user",
+ OwnerRef: ownerID.String(),
+ IsPrimary: true,
+ }).Error)
+ return r, ownerID
+}
+
+func createTestUser(t *testing.T, db *gorm.DB, id uuid.UUID, subscribed bool) {
+ t.Helper()
+ require.NoError(t, db.Model(&relational.User{}).Create(map[string]interface{}{
+ "id": id,
+ "email": id.String() + "@example.com",
+ "first_name": "Risk",
+ "last_name": "Owner",
+ "auth_method": "password",
+ "risk_notifications_subscribed": subscribed,
+ }).Error)
+}
+
+func TestRiskReviewDeadlineReminderScannerWorker_EnqueuesPerUserOwner(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ client := &stubRiverClient{}
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(29 * 24 * time.Hour)
+ _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+
+ w := NewRiskReviewDeadlineReminderScannerWorker(db, client, logger)
+ err := w.Work(context.Background(), &river.Job[RiskReviewDeadlineReminderScannerArgs]{})
+ require.NoError(t, err)
+ require.Len(t, client.params, 1)
+
+ args, ok := client.params[0].Args.(RiskReviewDueReminderArgs)
+ require.True(t, ok)
+ assert.Equal(t, "30d", args.ReminderWindow)
+ assert.Equal(t, 24*time.Hour, client.params[0].InsertOpts.UniqueOpts.ByPeriod)
+}
+
+func TestRiskReviewOverdueEscalationScannerWorker_EnqueuesEscalationAndReopen(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ client := &stubRiverClient{}
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(-31 * 24 * time.Hour)
+ _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now.Add(-40*24*time.Hour))
+
+ w := NewRiskReviewOverdueEscalationScannerWorker(db, client, logger, true, 30)
+ err := w.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationScannerArgs]{})
+ require.NoError(t, err)
+
+ var escalationCount, reopenCount int
+ var reopenInsertOpts *river.InsertOpts
+ for _, p := range client.params {
+ switch p.Args.(type) {
+ case RiskReviewOverdueEscalationArgs:
+ escalationCount++
+ case RiskReviewOverdueReopenArgs:
+ reopenCount++
+ reopenInsertOpts = p.InsertOpts
+ }
+ }
+ assert.Equal(t, 1, escalationCount)
+ assert.Equal(t, 1, reopenCount)
+ require.NotNil(t, reopenInsertOpts)
+ assert.Equal(t, 24*time.Hour, reopenInsertOpts.UniqueOpts.ByPeriod)
+ assert.Equal(t, 3, reopenInsertOpts.MaxAttempts)
+}
+
+func TestRiskStaleRiskScannerWorker_EnqueuesWeeklyReminder(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ client := &stubRiverClient{}
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-31*24*time.Hour))
+
+ w := NewRiskStaleRiskScannerWorker(db, client, logger)
+ err := w.Work(context.Background(), &river.Job[RiskStaleRiskScannerArgs]{})
+ require.NoError(t, err)
+ require.Len(t, client.params, 1)
+
+ _, ok := client.params[0].Args.(RiskStaleOpenReminderArgs)
+ require.True(t, ok)
+ assert.Equal(t, 7*24*time.Hour, client.params[0].InsertOpts.UniqueOpts.ByPeriod)
+}
+
+func TestRiskStaleRiskScannerWorker_EnqueuesMitigatingImplemented(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ client := &stubRiverClient{}
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusMitigatingImplemented, nil, now.Add(-31*24*time.Hour))
+
+ w := NewRiskStaleRiskScannerWorker(db, client, logger)
+ err := w.Work(context.Background(), &river.Job[RiskStaleRiskScannerArgs]{})
+ require.NoError(t, err)
+ require.Len(t, client.params, 1)
+
+ _, ok := client.params[0].Args.(RiskStaleOpenReminderArgs)
+ require.True(t, ok)
+}
+
+func TestRiskEvidenceReconciliationScannerWorker_EnqueuesDuplicateRepairJobs(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ client := &stubRiverClient{}
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+
+ // Duplicate active risks by dedupe key
+ dupSSP := uuid.New()
+ require.NoError(t, db.Create(&relational.SystemSecurityPlan{UUIDModel: relational.UUIDModel{ID: &dupSSP}}).Error)
+ r1ID := uuid.New()
+ r2ID := uuid.New()
+ require.NoError(t, db.Create(&riskrel.Risk{
+ UUIDModel: relational.UUIDModel{ID: &r1ID},
+ Title: "dup1",
+ Description: "dup1",
+ Status: string(riskrel.RiskStatusOpen),
+ SSPID: dupSSP,
+ SourceType: string(riskrel.RiskSourceTypeManual),
+ DedupeKey: "dup-key",
+ FirstSeenAt: now,
+ LastSeenAt: now,
+ }).Error)
+ require.NoError(t, db.Create(&riskrel.Risk{
+ UUIDModel: relational.UUIDModel{ID: &r2ID},
+ Title: "dup2",
+ Description: "dup2",
+ Status: string(riskrel.RiskStatusInvestigating),
+ SSPID: dupSSP,
+ SourceType: string(riskrel.RiskSourceTypeManual),
+ DedupeKey: "dup-key",
+ FirstSeenAt: now,
+ LastSeenAt: now,
+ }).Error)
+
+ w := NewRiskEvidenceReconciliationScannerWorker(db, client, logger)
+ err := w.Work(context.Background(), &river.Job[RiskEvidenceReconciliationScannerArgs]{})
+ require.NoError(t, err)
+
+ var sawDuplicateRepair bool
+ for _, p := range client.params {
+ switch args := p.Args.(type) {
+ case RiskReconcileDuplicatesArgs:
+ if args.DedupeKey == "dup-key" {
+ sawDuplicateRepair = true
+ }
+ }
+ }
+ assert.True(t, sawDuplicateRepair)
+}
+
+func TestRiskReconcileDuplicatesWorker_ClosesNewerRisk(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ sspID := uuid.New()
+ require.NoError(t, db.Create(&relational.SystemSecurityPlan{UUIDModel: relational.UUIDModel{ID: &sspID}}).Error)
+
+ oldID := uuid.New()
+ newID := uuid.New()
+ require.NoError(t, db.Create(&riskrel.Risk{
+ UUIDModel: relational.UUIDModel{ID: &oldID},
+ Title: "old",
+ Description: "old",
+ Status: string(riskrel.RiskStatusOpen),
+ SSPID: sspID,
+ SourceType: string(riskrel.RiskSourceTypeManual),
+ DedupeKey: "dup-close",
+ FirstSeenAt: time.Now().UTC(),
+ LastSeenAt: time.Now().UTC(),
+ CreatedAt: time.Now().UTC().Add(-time.Hour),
+ }).Error)
+ require.NoError(t, db.Create(&riskrel.Risk{
+ UUIDModel: relational.UUIDModel{ID: &newID},
+ Title: "new",
+ Description: "new",
+ Status: string(riskrel.RiskStatusInvestigating),
+ SSPID: sspID,
+ SourceType: string(riskrel.RiskSourceTypeManual),
+ DedupeKey: "dup-close",
+ FirstSeenAt: time.Now().UTC(),
+ LastSeenAt: time.Now().UTC(),
+ CreatedAt: time.Now().UTC(),
+ }).Error)
+
+ w := NewRiskReconcileDuplicatesWorker(db, logger)
+ err := w.Work(context.Background(), &river.Job[RiskReconcileDuplicatesArgs]{Args: RiskReconcileDuplicatesArgs{DedupeKey: "dup-close"}})
+ require.NoError(t, err)
+
+ var oldRisk, newRisk riskrel.Risk
+ require.NoError(t, db.First(&oldRisk, "id = ?", oldID).Error)
+ require.NoError(t, db.First(&newRisk, "id = ?", newID).Error)
+ assert.Equal(t, string(riskrel.RiskStatusOpen), oldRisk.Status)
+ assert.Equal(t, string(riskrel.RiskStatusClosed), newRisk.Status)
+}
+
+func TestRiskReviewOverdueReopenWorker_ReopensAcceptedRisk(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ reviewDeadline := time.Now().UTC().Add(-40 * 24 * time.Hour)
+ risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC())
+ justification := "temporarily accepted due to compensating controls"
+ require.NoError(t, db.Model(&riskrel.Risk{}).
+ Where("id = ?", risk.ID).
+ Update("acceptance_justification", justification).Error)
+
+ w := NewRiskReviewOverdueReopenWorker(db, logger)
+ err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{
+ Args: RiskReviewOverdueReopenArgs{
+ RiskID: *risk.ID,
+ ThresholdDays: 30,
+ },
+ })
+ require.NoError(t, err)
+
+ var updated riskrel.Risk
+ require.NoError(t, db.First(&updated, "id = ?", risk.ID).Error)
+ assert.Equal(t, string(riskrel.RiskStatusInvestigating), updated.Status)
+ assert.Nil(t, updated.ReviewDeadline)
+ assert.Nil(t, updated.AcceptanceJustification)
+
+ var statusChangeEvents int64
+ require.NoError(t, db.Model(&riskrel.RiskEvent{}).
+ Where("risk_id = ? AND event_type = ?", risk.ID, string(riskrel.RiskEventTypeStatusChange)).
+ Count(&statusChangeEvents).Error)
+ assert.Equal(t, int64(1), statusChangeEvents)
+}
+
+func TestRiskReviewOverdueReopenWorker_SkipsNonPositiveThreshold(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ reviewDeadline := time.Now().UTC().Add(-40 * 24 * time.Hour)
+ risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC())
+
+ w := NewRiskReviewOverdueReopenWorker(db, logger)
+ err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{
+ Args: RiskReviewOverdueReopenArgs{
+ RiskID: *risk.ID,
+ ThresholdDays: 0,
+ },
+ })
+ require.NoError(t, err)
+
+ var unchanged riskrel.Risk
+ require.NoError(t, db.First(&unchanged, "id = ?", risk.ID).Error)
+ assert.Equal(t, string(riskrel.RiskStatusRiskAccepted), unchanged.Status)
+}
+
+func TestRiskReviewOverdueReopenWorker_SkipsWhenNotYetOverdueForThreshold(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ reviewDeadline := time.Now().UTC().Add(-5 * 24 * time.Hour)
+ risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC())
+
+ w := NewRiskReviewOverdueReopenWorker(db, logger)
+ err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{
+ Args: RiskReviewOverdueReopenArgs{
+ RiskID: *risk.ID,
+ ThresholdDays: 30,
+ },
+ })
+ require.NoError(t, err)
+
+ var unchanged riskrel.Risk
+ require.NoError(t, db.First(&unchanged, "id = ?", risk.ID).Error)
+ assert.Equal(t, string(riskrel.RiskStatusRiskAccepted), unchanged.Status)
+ assert.NotNil(t, unchanged.ReviewDeadline)
+}
+
+func TestRiskReviewDueReminderWorker_RespectsRiskSubscription(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, false)
+
+ mockEmail := &MockEmailService{}
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertNotCalled(t, "Send")
+}
+
+func TestRiskReviewDueReminderWorker_SendsWhenSubscribed(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, true)
+
+ mockEmail := &MockEmailService{}
+ mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("ok", "ok", nil)
+ mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com")
+ mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool {
+ return len(msg.To) == 1 && msg.To[0] == ownerID.String()+"@example.com"
+ })).Return(&types.SendResult{Success: true, MessageID: "risk-msg"}, nil)
+
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertExpectations(t)
+}
+
+func TestRiskReviewDueReminderWorker_TemplateError_ReturnsError(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, true)
+
+ mockEmail := &MockEmailService{}
+ mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("", "", errors.New("template boom"))
+
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.Error(t, err)
+ require.ErrorContains(t, err, "render template")
+ require.ErrorContains(t, err, "template boom")
+ mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
+}
+
+func TestRiskReviewDueReminderWorker_SendUnsuccessful_ReturnsError(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, true)
+
+ mockEmail := &MockEmailService{}
+ mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("ok", "ok", nil)
+ mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com")
+ mockEmail.On("Send", mock.Anything, mock.Anything).
+ Return(&types.SendResult{Success: false, Error: "provider refused"}, nil)
+
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.Error(t, err)
+ require.ErrorContains(t, err, "email send failed: provider refused")
+}
+
+func TestRiskReviewDueReminderWorker_UserNotFound_Skips(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+
+ mockEmail := &MockEmailService{}
+ userRepo := &stubUserRepository{err: gorm.ErrRecordNotFound}
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
+}
+
+func TestRiskReviewDueReminderWorker_UserLookupError_ReturnsError(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(7 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+
+ mockEmail := &MockEmailService{}
+ userRepo := &stubUserRepository{err: errors.New("database unavailable")}
+ worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{
+ Args: RiskReviewDueReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.Error(t, err)
+ require.ErrorContains(t, err, "load owner user failed")
+ mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
+}
+
+func TestRiskReviewOverdueEscalationWorker_SendsWhenSubscribed(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(-2 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, true)
+
+ mockEmail := &MockEmailService{}
+ mockEmail.On("UseTemplate", "risk-review-overdue-escalation", mock.Anything).Return("ok", "ok", nil)
+ mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com")
+ mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool {
+ return len(msg.To) == 1 &&
+ msg.To[0] == ownerID.String()+"@example.com" &&
+ msg.Subject == "Risk review overdue: Test Risk"
+ })).Return(&types.SendResult{Success: true, MessageID: "risk-overdue-msg"}, nil)
+
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewOverdueEscalationWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationArgs]{
+ Args: RiskReviewOverdueEscalationArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertExpectations(t)
+}
+
+func TestRiskReviewOverdueEscalationWorker_RespectsRiskSubscription(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ reviewDeadline := now.Add(-2 * 24 * time.Hour)
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now)
+ createTestUser(t, db, ownerID, false)
+
+ mockEmail := &MockEmailService{}
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskReviewOverdueEscalationWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationArgs]{
+ Args: RiskReviewOverdueEscalationArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
+}
+
+func TestRiskStaleOpenReminderWorker_SendsWhenSubscribed(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-35*24*time.Hour))
+ createTestUser(t, db, ownerID, true)
+
+ mockEmail := &MockEmailService{}
+ mockEmail.On("UseTemplate", "risk-stale-open-reminder", mock.Anything).Return("ok", "ok", nil)
+ mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com")
+ mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool {
+ return len(msg.To) == 1 &&
+ msg.To[0] == ownerID.String()+"@example.com" &&
+ msg.Subject == "Stale risk reminder: Test Risk"
+ })).Return(&types.SendResult{Success: true, MessageID: "risk-stale-msg"}, nil)
+
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskStaleOpenReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskStaleOpenReminderArgs]{
+ Args: RiskStaleOpenReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertExpectations(t)
+}
+
+func TestRiskStaleOpenReminderWorker_RespectsRiskSubscription(t *testing.T) {
+ db := newRiskWorkersTestDB(t)
+ logger := zap.NewNop().Sugar()
+ now := time.Now().UTC()
+ risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-35*24*time.Hour))
+ createTestUser(t, db, ownerID, false)
+
+ mockEmail := &MockEmailService{}
+ userRepo := NewGORMUserRepository(db)
+ worker := NewRiskStaleOpenReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger)
+ err := worker.Work(context.Background(), &river.Job[RiskStaleOpenReminderArgs]{
+ Args: RiskStaleOpenReminderArgs{
+ RiskID: *risk.ID,
+ OwnerUserID: ownerID,
+ },
+ })
+ require.NoError(t, err)
+ mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
+}
diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go
index 93e3faa6..7f9d1dbc 100644
--- a/internal/service/worker/service.go
+++ b/internal/service/worker/service.go
@@ -224,6 +224,30 @@ func NewServiceWithDigest(
digestCheckerWorker := NewWorkflowTaskDigestCheckerWorker(db, clientProxy, logger)
river.AddWorker(workers, river.WorkFunc(digestCheckerWorker.Work))
+ // Add risk scanner workers
+ riskCfg := config.DefaultRiskConfig()
+ if digestCfg != nil && digestCfg.Risk != nil {
+ riskCfg = digestCfg.Risk
+ }
+
+ riskReminderScannerWorker := NewRiskReviewDeadlineReminderScannerWorker(db, clientProxy, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReminderScannerWorker.Work))
+
+ riskOverdueScannerWorker := NewRiskReviewOverdueEscalationScannerWorker(
+ db,
+ clientProxy,
+ logger,
+ riskCfg.AutoReopenEnabled,
+ riskCfg.AutoReopenThresholdDays,
+ )
+ river.AddWorker(workers, river.WorkFunc(riskOverdueScannerWorker.Work))
+
+ riskStaleScannerWorker := NewRiskStaleRiskScannerWorker(db, clientProxy, logger)
+ river.AddWorker(workers, river.WorkFunc(riskStaleScannerWorker.Work))
+
+ riskReconciliationScannerWorker := NewRiskEvidenceReconciliationScannerWorker(db, clientProxy, logger)
+ river.AddWorker(workers, river.WorkFunc(riskReconciliationScannerWorker.Work))
+
// Configure periodic jobs
periodicJobs := periodicJobsFromConfig(digestCfg, logger)
@@ -443,6 +467,90 @@ func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger
)
}
+func NewRiskReviewDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob {
+ sched := parseCronScheduleWithFallback(schedule, "0 0 8 * * *", "risk review deadline reminder scanner", logger)
+
+ return river.NewPeriodicJob(
+ sched,
+ func() (river.JobArgs, *river.InsertOpts) {
+ return &RiskReviewDeadlineReminderScannerArgs{}, &river.InsertOpts{
+ Queue: "risk",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: 24 * time.Hour,
+ },
+ }
+ },
+ &river.PeriodicJobOpts{
+ RunOnStart: false,
+ },
+ )
+}
+
+func NewRiskReviewOverdueEscalationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob {
+ sched := parseCronScheduleWithFallback(schedule, "0 0 9 * * *", "risk review overdue escalation scanner", logger)
+
+ return river.NewPeriodicJob(
+ sched,
+ func() (river.JobArgs, *river.InsertOpts) {
+ return &RiskReviewOverdueEscalationScannerArgs{}, &river.InsertOpts{
+ Queue: "risk",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: 24 * time.Hour,
+ },
+ }
+ },
+ &river.PeriodicJobOpts{
+ RunOnStart: false,
+ },
+ )
+}
+
+func NewRiskStaleScannerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob {
+ sched := parseCronScheduleWithFallback(schedule, "0 0 10 * * 1", "risk stale scanner", logger)
+
+ return river.NewPeriodicJob(
+ sched,
+ func() (river.JobArgs, *river.InsertOpts) {
+ return &RiskStaleRiskScannerArgs{}, &river.InsertOpts{
+ Queue: "risk",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: 7 * 24 * time.Hour,
+ },
+ }
+ },
+ &river.PeriodicJobOpts{
+ RunOnStart: false,
+ },
+ )
+}
+
+func NewRiskEvidenceReconciliationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob {
+ sched := parseCronScheduleWithFallback(schedule, "0 30 10 * * *", "risk evidence reconciliation scanner", logger)
+
+ return river.NewPeriodicJob(
+ sched,
+ func() (river.JobArgs, *river.InsertOpts) {
+ return &RiskEvidenceReconciliationScannerArgs{}, &river.InsertOpts{
+ Queue: "risk",
+ MaxAttempts: 3,
+ UniqueOpts: river.UniqueOpts{
+ ByArgs: true,
+ ByPeriod: 24 * time.Hour,
+ },
+ }
+ },
+ &river.PeriodicJobOpts{
+ RunOnStart: false,
+ },
+ )
+}
+
func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*river.PeriodicJob {
var periodicJobs []*river.PeriodicJob
if cfg == nil {
@@ -460,6 +568,18 @@ func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*ri
if cfg.Workflow != nil && cfg.Workflow.TaskDigestEnabled {
periodicJobs = append(periodicJobs, NewWorkflowTaskDigestPeriodicJob(cfg.Workflow.TaskDigestSchedule, logger))
}
+ if cfg.Risk != nil && cfg.Risk.ReviewDeadlineReminderEnabled {
+ periodicJobs = append(periodicJobs, NewRiskReviewDeadlineReminderPeriodicJob(cfg.Risk.ReviewDeadlineReminderSchedule, logger))
+ }
+ if cfg.Risk != nil && cfg.Risk.ReviewOverdueEscalationEnabled {
+ periodicJobs = append(periodicJobs, NewRiskReviewOverdueEscalationPeriodicJob(cfg.Risk.ReviewOverdueEscalationSchedule, logger))
+ }
+ if cfg.Risk != nil && cfg.Risk.StaleRiskScannerEnabled {
+ periodicJobs = append(periodicJobs, NewRiskStaleScannerPeriodicJob(cfg.Risk.StaleRiskScannerSchedule, logger))
+ }
+ if cfg.Risk != nil && cfg.Risk.EvidenceReconciliationEnabled {
+ periodicJobs = append(periodicJobs, NewRiskEvidenceReconciliationPeriodicJob(cfg.Risk.EvidenceReconciliationSchedule, logger))
+ }
return periodicJobs
}
diff --git a/internal/service/worker/service_test.go b/internal/service/worker/service_test.go
index 1a6c9d47..b19f278b 100644
--- a/internal/service/worker/service_test.go
+++ b/internal/service/worker/service_test.go
@@ -369,6 +369,25 @@ func TestPeriodicJobsFromConfig_WorkflowSchedulerEnabledGuard(t *testing.T) {
assert.Len(t, jobs, 3)
}
+func TestPeriodicJobsFromConfig_RiskJobsFromDedicatedConfig(t *testing.T) {
+ logger := zap.NewNop().Sugar()
+
+ jobs := periodicJobsFromConfig(&config.Config{
+ Risk: &config.RiskConfig{
+ ReviewDeadlineReminderEnabled: true,
+ ReviewDeadlineReminderSchedule: "0 0 8 * * *",
+ ReviewOverdueEscalationEnabled: true,
+ ReviewOverdueEscalationSchedule: "0 0 9 * * *",
+ StaleRiskScannerEnabled: true,
+ StaleRiskScannerSchedule: "0 0 10 * * 1",
+ EvidenceReconciliationEnabled: true,
+ EvidenceReconciliationSchedule: "0 30 10 * * *",
+ },
+ }, logger)
+
+ assert.Len(t, jobs, 4)
+}
+
func TestWorkflowSchedulerPeriodicJobConstructor_InsertOpts(t *testing.T) {
args, opts := workflowSchedulerPeriodicJobConstructor()
assert.NotNil(t, args)
diff --git a/internal/service/worker/user_repository.go b/internal/service/worker/user_repository.go
index 538bca88..f00855c5 100644
--- a/internal/service/worker/user_repository.go
+++ b/internal/service/worker/user_repository.go
@@ -30,7 +30,7 @@ func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (N
var user relational.User
if err := r.db.WithContext(ctx).First(&user, parsed).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
- return NotificationUser{}, fmt.Errorf("user %s not found", userID)
+ return NotificationUser{}, fmt.Errorf("user %s not found: %w", userID, gorm.ErrRecordNotFound)
}
return NotificationUser{}, fmt.Errorf("failed to fetch user %s: %w", userID, err)
}
@@ -42,5 +42,6 @@ func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (N
LastName: user.LastName,
TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed,
TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed,
+ RiskNotificationsSubscribed: user.RiskNotificationsSubscribed,
}, nil
}
diff --git a/risk.yaml b/risk.yaml
new file mode 100644
index 00000000..559d08d1
--- /dev/null
+++ b/risk.yaml
@@ -0,0 +1,14 @@
+review_deadline_reminder_enabled: false
+review_deadline_reminder_schedule: "0 0 8 * * *"
+
+review_overdue_escalation_enabled: false
+review_overdue_escalation_schedule: "0 0 9 * * *"
+
+stale_risk_scanner_enabled: false
+stale_risk_scanner_schedule: "0 0 10 * * 1"
+
+evidence_reconciliation_enabled: false
+evidence_reconciliation_schedule: "0 30 10 * * *"
+
+auto_reopen_enabled: false
+auto_reopen_threshold_days: 30