diff --git a/.env.example b/.env.example index b625355f..c3872c83 100644 --- a/.env.example +++ b/.env.example @@ -8,8 +8,9 @@ CCF_JWT_PRIVATE_KEY=private.pem CCF_JWT_PUBLIC_KEY=public.pem CCF_API_ALLOWED_ORIGINS="http://localhost:3000,http://localhost:8000" +CCF_RISK_CONFIG="risk.yaml" CCF_ENVIRONMENT="" # Defaults to production. ## This configuration disables cookie setting to allow testing on Safari ## It is insecure so use it with caution -#CCF_ENVIRONMENT="local" \ No newline at end of file +#CCF_ENVIRONMENT="local" diff --git a/cmd/root.go b/cmd/root.go index 9d164bc0..891c6017 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -46,6 +46,7 @@ func bindEnvironmentVariables() { viper.MustBindEnv("sso_config") viper.MustBindEnv("email_config") viper.MustBindEnv("workflow_config") + viper.MustBindEnv("risk_config") viper.MustBindEnv("metrics_enabled") viper.MustBindEnv("metrics_port") viper.MustBindEnv("use_dev_logger") diff --git a/docs/docs.go b/docs/docs.go index c85333ec..63ffccea 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -24309,6 +24309,9 @@ const docTemplate = `{ "handler.SubscriptionsResponse": { "type": "object", "properties": { + "riskNotificationsSubscribed": { + "type": "boolean" + }, "subscribed": { "type": "boolean" }, @@ -24323,6 +24326,9 @@ const docTemplate = `{ "handler.UpdateSubscriptionsRequest": { "type": "object", "properties": { + "riskNotificationsSubscribed": { + "type": "boolean" + }, "subscribed": { "type": "boolean" }, @@ -32321,6 +32327,10 @@ const docTemplate = `{ "lastName": { "type": "string" }, + "riskNotificationsSubscribed": { + "description": "RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.\nThe DB default is intentionally true so existing users are opted in when the column is introduced.", + "type": "boolean" + }, "taskAvailableEmailSubscribed": { "description": "TaskAvailableEmailSubscribed indicates if the user wants an email when tasks become available", "type": "boolean" diff --git a/docs/swagger.json b/docs/swagger.json index 4165d632..8405552f 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -24303,6 +24303,9 @@ "handler.SubscriptionsResponse": { "type": "object", "properties": { + "riskNotificationsSubscribed": { + "type": "boolean" + }, "subscribed": { "type": "boolean" }, @@ -24317,6 +24320,9 @@ "handler.UpdateSubscriptionsRequest": { "type": "object", "properties": { + "riskNotificationsSubscribed": { + "type": "boolean" + }, "subscribed": { "type": "boolean" }, @@ -32315,6 +32321,10 @@ "lastName": { "type": "string" }, + "riskNotificationsSubscribed": { + "description": "RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications.\nThe DB default is intentionally true so existing users are opted in when the column is introduced.", + "type": "boolean" + }, "taskAvailableEmailSubscribed": { "description": "TaskAvailableEmailSubscribed indicates if the user wants an email when tasks become available", "type": "boolean" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index c195c04b..d82db2f4 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1344,6 +1344,8 @@ definitions: type: object handler.SubscriptionsResponse: properties: + riskNotificationsSubscribed: + type: boolean subscribed: type: boolean taskAvailableEmailSubscribed: @@ -1353,6 +1355,8 @@ definitions: type: object handler.UpdateSubscriptionsRequest: properties: + riskNotificationsSubscribed: + type: boolean subscribed: type: boolean taskAvailableEmailSubscribed: @@ -6642,6 +6646,11 @@ definitions: type: string lastName: type: string + riskNotificationsSubscribed: + description: |- + RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications. + The DB default is intentionally true so existing users are opted in when the column is introduced. + type: boolean taskAvailableEmailSubscribed: description: TaskAvailableEmailSubscribed indicates if the user wants an email when tasks become available diff --git a/internal/api/handler/users.go b/internal/api/handler/users.go index f95098c8..dd23e790 100644 --- a/internal/api/handler/users.go +++ b/internal/api/handler/users.go @@ -27,12 +27,14 @@ type SubscriptionsResponse struct { Subscribed bool `json:"subscribed"` TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"` TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"` + RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"` } type UpdateSubscriptionsRequest struct { Subscribed *bool `json:"subscribed"` TaskAvailableEmailSubscribed *bool `json:"taskAvailableEmailSubscribed"` TaskDailyDigestSubscribed *bool `json:"taskDailyDigestSubscribed"` + RiskNotificationsSubscribed *bool `json:"riskNotificationsSubscribed"` } func NewUserHandler(sugar *zap.SugaredLogger, db *gorm.DB) *UserHandler { @@ -437,6 +439,7 @@ func (h *UserHandler) GetSubscriptions(ctx echo.Context) error { Subscribed: user.DigestSubscribed, TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed, TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed, + RiskNotificationsSubscribed: user.RiskNotificationsSubscribed, }, }) } @@ -484,6 +487,9 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error { if req.TaskDailyDigestSubscribed != nil { user.TaskDailyDigestSubscribed = *req.TaskDailyDigestSubscribed } + if req.RiskNotificationsSubscribed != nil { + user.RiskNotificationsSubscribed = *req.RiskNotificationsSubscribed + } if err := h.db.Save(&user).Error; err != nil { h.sugar.Errorw("Failed to update user subscriptions", "error", err) @@ -496,6 +502,7 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error { "subscribed", user.DigestSubscribed, "taskAvailableEmailSubscribed", user.TaskAvailableEmailSubscribed, "taskDailyDigestSubscribed", user.TaskDailyDigestSubscribed, + "riskNotificationsSubscribed", user.RiskNotificationsSubscribed, ) return ctx.JSON(200, GenericDataResponse[SubscriptionsResponse]{ @@ -503,6 +510,7 @@ func (h *UserHandler) UpdateSubscriptions(ctx echo.Context) error { Subscribed: user.DigestSubscribed, TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed, TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed, + RiskNotificationsSubscribed: user.RiskNotificationsSubscribed, }, }) } diff --git a/internal/api/handler/users_integration_test.go b/internal/api/handler/users_integration_test.go index 5d4a85d8..33d65ca9 100644 --- a/internal/api/handler/users_integration_test.go +++ b/internal/api/handler/users_integration_test.go @@ -403,6 +403,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { Subscribed bool `json:"subscribed"` TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"` TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"` + RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"` } `json:"data"` } err = json.Unmarshal(rec.Body.Bytes(), &response) @@ -412,6 +413,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { suite.False(response.Data.Subscribed, "Expected default digest subscription to be false") suite.False(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to default to false") suite.False(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to default to false") + suite.True(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to default to true") }) suite.Run("UpdateSubscriptions", func() { @@ -420,6 +422,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { "subscribed": true, "taskAvailableEmailSubscribed": true, "taskDailyDigestSubscribed": true, + "riskNotificationsSubscribed": false, } payloadJSON, err := json.Marshal(payload) suite.Require().NoError(err, "Failed to marshal update subscriptions request") @@ -437,6 +440,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { Subscribed bool `json:"subscribed"` TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed"` TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed"` + RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed"` } `json:"data"` } err = json.Unmarshal(rec.Body.Bytes(), &response) @@ -445,11 +449,13 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { suite.True(response.Data.Subscribed, "Expected digest subscription to be updated to true") suite.True(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to be updated to true") suite.True(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to be updated to true") + suite.False(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to be updated to false") // Test unsubscribing from digest payload = map[string]interface{}{ - "subscribed": false, - "taskDailyDigestSubscribed": false, + "subscribed": false, + "taskDailyDigestSubscribed": false, + "riskNotificationsSubscribed": true, } payloadJSON, err = json.Marshal(payload) suite.Require().NoError(err, "Failed to marshal unsubscribe request") @@ -468,6 +474,7 @@ func (suite *UserApiIntegrationSuite) TestSubscriptions() { suite.False(response.Data.Subscribed, "Expected digest subscription to be updated to false") suite.True(response.Data.TaskAvailableEmailSubscribed, "Expected task available email subscription to remain unchanged when omitted") suite.False(response.Data.TaskDailyDigestSubscribed, "Expected task daily digest subscription to be updated to false") + suite.True(response.Data.RiskNotificationsSubscribed, "Expected risk notifications subscription to be updated to true") }) suite.Run("UpdateSubscriptionsInvalidPayload", func() { diff --git a/internal/config/config.go b/internal/config/config.go index c0ed9fb4..a71f55c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -38,6 +38,7 @@ type Config struct { DigestEnabled bool // Enable or disable the digest scheduler DigestSchedule string // Cron schedule for digest emails Workflow *WorkflowConfig + Risk *RiskConfig } func NewConfig(logger *zap.SugaredLogger) *Config { @@ -174,6 +175,16 @@ func NewConfig(logger *zap.SugaredLogger) *Config { workflowConfig = &WorkflowConfig{SchedulerEnabled: false} } + riskConfigPath := viper.GetString("risk_config") + if riskConfigPath == "" { + riskConfigPath = "risk.yaml" + } + riskConfig, err := LoadRiskConfig(riskConfigPath) + if err != nil { + logger.Warnw("Failed to load risk config, risk jobs will be disabled", "error", err, "path", riskConfigPath) + riskConfig = DefaultRiskConfig() + } + // Worker configuration workerConfig := DefaultWorkerConfig() if viper.IsSet("worker_enabled") { @@ -206,6 +217,7 @@ func NewConfig(logger *zap.SugaredLogger) *Config { DigestEnabled: digestEnabled, DigestSchedule: digestSchedule, Workflow: workflowConfig, + Risk: riskConfig, } } diff --git a/internal/config/risk.go b/internal/config/risk.go new file mode 100644 index 00000000..9ce2a6af --- /dev/null +++ b/internal/config/risk.go @@ -0,0 +1,118 @@ +package config + +import ( + "errors" + "fmt" + "os" + "strings" + + "github.com/robfig/cron/v3" + "github.com/spf13/viper" +) + +// RiskConfig contains configuration for risk-related periodic workers. +type RiskConfig struct { + ReviewDeadlineReminderEnabled bool `mapstructure:"review_deadline_reminder_enabled" yaml:"review_deadline_reminder_enabled" json:"reviewDeadlineReminderEnabled"` + ReviewDeadlineReminderSchedule string `mapstructure:"review_deadline_reminder_schedule" yaml:"review_deadline_reminder_schedule" json:"reviewDeadlineReminderSchedule"` + + ReviewOverdueEscalationEnabled bool `mapstructure:"review_overdue_escalation_enabled" yaml:"review_overdue_escalation_enabled" json:"reviewOverdueEscalationEnabled"` + ReviewOverdueEscalationSchedule string `mapstructure:"review_overdue_escalation_schedule" yaml:"review_overdue_escalation_schedule" json:"reviewOverdueEscalationSchedule"` + + StaleRiskScannerEnabled bool `mapstructure:"stale_risk_scanner_enabled" yaml:"stale_risk_scanner_enabled" json:"staleRiskScannerEnabled"` + StaleRiskScannerSchedule string `mapstructure:"stale_risk_scanner_schedule" yaml:"stale_risk_scanner_schedule" json:"staleRiskScannerSchedule"` + + EvidenceReconciliationEnabled bool `mapstructure:"evidence_reconciliation_enabled" yaml:"evidence_reconciliation_enabled" json:"evidenceReconciliationEnabled"` + EvidenceReconciliationSchedule string `mapstructure:"evidence_reconciliation_schedule" yaml:"evidence_reconciliation_schedule" json:"evidenceReconciliationSchedule"` + + AutoReopenEnabled bool `mapstructure:"auto_reopen_enabled" yaml:"auto_reopen_enabled" json:"autoReopenEnabled"` + AutoReopenThresholdDays int `mapstructure:"auto_reopen_threshold_days" yaml:"auto_reopen_threshold_days" json:"autoReopenThresholdDays"` +} + +func DefaultRiskConfig() *RiskConfig { + return &RiskConfig{ + ReviewDeadlineReminderEnabled: false, + ReviewDeadlineReminderSchedule: "0 0 8 * * *", + ReviewOverdueEscalationEnabled: false, + ReviewOverdueEscalationSchedule: "0 0 9 * * *", + StaleRiskScannerEnabled: false, + StaleRiskScannerSchedule: "0 0 10 * * 1", + EvidenceReconciliationEnabled: false, + EvidenceReconciliationSchedule: "0 30 10 * * *", + AutoReopenEnabled: false, + AutoReopenThresholdDays: 30, + } +} + +func LoadRiskConfig(path string) (*RiskConfig, error) { + v := viper.NewWithOptions(viper.KeyDelimiter("::")) + + def := DefaultRiskConfig() + v.SetDefault("review_deadline_reminder_enabled", def.ReviewDeadlineReminderEnabled) + v.SetDefault("review_deadline_reminder_schedule", def.ReviewDeadlineReminderSchedule) + v.SetDefault("review_overdue_escalation_enabled", def.ReviewOverdueEscalationEnabled) + v.SetDefault("review_overdue_escalation_schedule", def.ReviewOverdueEscalationSchedule) + v.SetDefault("stale_risk_scanner_enabled", def.StaleRiskScannerEnabled) + v.SetDefault("stale_risk_scanner_schedule", def.StaleRiskScannerSchedule) + v.SetDefault("evidence_reconciliation_enabled", def.EvidenceReconciliationEnabled) + v.SetDefault("evidence_reconciliation_schedule", def.EvidenceReconciliationSchedule) + v.SetDefault("auto_reopen_enabled", def.AutoReopenEnabled) + v.SetDefault("auto_reopen_threshold_days", def.AutoReopenThresholdDays) + + v.SetEnvPrefix("CCF_RISK") + v.SetEnvKeyReplacer(strings.NewReplacer("::", "_", ".", "_", "-", "_")) + v.AutomaticEnv() + + if path != "" { + v.SetConfigFile(path) + v.SetConfigType("yaml") + if err := v.ReadInConfig(); err != nil { + var notFound viper.ConfigFileNotFoundError + if !errors.As(err, ¬Found) && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("failed to read risk config file: %w", err) + } + } + } + + var cfg RiskConfig + if err := v.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("failed to parse risk config: %w", err) + } + if err := cfg.Validate(); err != nil { + return nil, err + } + + return &cfg, nil +} + +func (c *RiskConfig) Validate() error { + parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + + if c.ReviewDeadlineReminderEnabled { + if _, err := parser.Parse(c.ReviewDeadlineReminderSchedule); err != nil { + return fmt.Errorf("invalid review_deadline_reminder_schedule: %w", err) + } + } + if c.ReviewOverdueEscalationEnabled { + if _, err := parser.Parse(c.ReviewOverdueEscalationSchedule); err != nil { + return fmt.Errorf("invalid review_overdue_escalation_schedule: %w", err) + } + } + if c.StaleRiskScannerEnabled { + if _, err := parser.Parse(c.StaleRiskScannerSchedule); err != nil { + return fmt.Errorf("invalid stale_risk_scanner_schedule: %w", err) + } + } + if c.EvidenceReconciliationEnabled { + if _, err := parser.Parse(c.EvidenceReconciliationSchedule); err != nil { + return fmt.Errorf("invalid evidence_reconciliation_schedule: %w", err) + } + } + if c.AutoReopenThresholdDays < 0 { + return fmt.Errorf("risk auto reopen threshold days must be non-negative") + } + if c.AutoReopenEnabled && c.AutoReopenThresholdDays <= 0 { + return fmt.Errorf("risk auto reopen threshold days must be greater than zero when auto reopen is enabled") + } + + return nil +} diff --git a/internal/config/risk_test.go b/internal/config/risk_test.go new file mode 100644 index 00000000..d9d1d722 --- /dev/null +++ b/internal/config/risk_test.go @@ -0,0 +1,164 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefaultRiskConfig(t *testing.T) { + cfg := DefaultRiskConfig() + assert.False(t, cfg.ReviewDeadlineReminderEnabled) + assert.Equal(t, "0 0 8 * * *", cfg.ReviewDeadlineReminderSchedule) + assert.False(t, cfg.ReviewOverdueEscalationEnabled) + assert.Equal(t, "0 0 9 * * *", cfg.ReviewOverdueEscalationSchedule) + assert.False(t, cfg.StaleRiskScannerEnabled) + assert.Equal(t, "0 0 10 * * 1", cfg.StaleRiskScannerSchedule) + assert.False(t, cfg.EvidenceReconciliationEnabled) + assert.Equal(t, "0 30 10 * * *", cfg.EvidenceReconciliationSchedule) + assert.False(t, cfg.AutoReopenEnabled) + assert.Equal(t, 30, cfg.AutoReopenThresholdDays) +} + +func TestLoadRiskConfigDefaults(t *testing.T) { + require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE")) + require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_OVERDUE_ESCALATION_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_RISK_REVIEW_OVERDUE_ESCALATION_SCHEDULE")) + require.NoError(t, os.Unsetenv("CCF_RISK_STALE_RISK_SCANNER_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_RISK_STALE_RISK_SCANNER_SCHEDULE")) + require.NoError(t, os.Unsetenv("CCF_RISK_EVIDENCE_RECONCILIATION_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_RISK_EVIDENCE_RECONCILIATION_SCHEDULE")) + require.NoError(t, os.Unsetenv("CCF_RISK_AUTO_REOPEN_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS")) + + cfg, err := LoadRiskConfig("") + require.NoError(t, err) + assert.False(t, cfg.ReviewDeadlineReminderEnabled) + assert.False(t, cfg.ReviewOverdueEscalationEnabled) + assert.False(t, cfg.StaleRiskScannerEnabled) + assert.False(t, cfg.EvidenceReconciliationEnabled) + assert.False(t, cfg.AutoReopenEnabled) + assert.Equal(t, 30, cfg.AutoReopenThresholdDays) +} + +func TestLoadRiskConfigFromEnv(t *testing.T) { + require.NoError(t, os.Setenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED", "true")) + require.NoError(t, os.Setenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE", "0 15 8 * * *")) + require.NoError(t, os.Setenv("CCF_RISK_AUTO_REOPEN_ENABLED", "true")) + require.NoError(t, os.Setenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS", "45")) + defer func() { + _ = os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_ENABLED") + _ = os.Unsetenv("CCF_RISK_REVIEW_DEADLINE_REMINDER_SCHEDULE") + _ = os.Unsetenv("CCF_RISK_AUTO_REOPEN_ENABLED") + _ = os.Unsetenv("CCF_RISK_AUTO_REOPEN_THRESHOLD_DAYS") + }() + + cfg, err := LoadRiskConfig("") + require.NoError(t, err) + assert.True(t, cfg.ReviewDeadlineReminderEnabled) + assert.Equal(t, "0 15 8 * * *", cfg.ReviewDeadlineReminderSchedule) + assert.True(t, cfg.AutoReopenEnabled) + assert.Equal(t, 45, cfg.AutoReopenThresholdDays) +} + +func TestLoadRiskConfigFromFile(t *testing.T) { + content := ` +review_deadline_reminder_enabled: true +review_deadline_reminder_schedule: "0 0 8 * * *" +review_overdue_escalation_enabled: true +review_overdue_escalation_schedule: "0 0 9 * * *" +stale_risk_scanner_enabled: true +stale_risk_scanner_schedule: "0 0 10 * * 1" +evidence_reconciliation_enabled: true +evidence_reconciliation_schedule: "0 30 10 * * *" +auto_reopen_enabled: true +auto_reopen_threshold_days: 60 +` + tmpDir := t.TempDir() + cfgPath := filepath.Join(tmpDir, "risk.yaml") + require.NoError(t, os.WriteFile(cfgPath, []byte(content), 0644)) + + cfg, err := LoadRiskConfig(cfgPath) + require.NoError(t, err) + assert.True(t, cfg.ReviewDeadlineReminderEnabled) + assert.True(t, cfg.ReviewOverdueEscalationEnabled) + assert.True(t, cfg.StaleRiskScannerEnabled) + assert.True(t, cfg.EvidenceReconciliationEnabled) + assert.True(t, cfg.AutoReopenEnabled) + assert.Equal(t, 60, cfg.AutoReopenThresholdDays) +} + +func TestLoadRiskConfigMissingFileUsesDefaults(t *testing.T) { + cfg, err := LoadRiskConfig(filepath.Join(t.TempDir(), "does-not-exist.yaml")) + require.NoError(t, err) + + def := DefaultRiskConfig() + assert.Equal(t, def.ReviewDeadlineReminderEnabled, cfg.ReviewDeadlineReminderEnabled) + assert.Equal(t, def.ReviewDeadlineReminderSchedule, cfg.ReviewDeadlineReminderSchedule) + assert.Equal(t, def.AutoReopenThresholdDays, cfg.AutoReopenThresholdDays) +} + +func TestRiskConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg *RiskConfig + wantErr bool + }{ + { + name: "valid config", + cfg: &RiskConfig{ + ReviewDeadlineReminderEnabled: true, + ReviewDeadlineReminderSchedule: "0 0 8 * * *", + AutoReopenThresholdDays: 30, + }, + wantErr: false, + }, + { + name: "invalid reminder schedule", + cfg: &RiskConfig{ + ReviewDeadlineReminderEnabled: true, + ReviewDeadlineReminderSchedule: "not-cron", + }, + wantErr: true, + }, + { + name: "negative threshold", + cfg: &RiskConfig{ + AutoReopenThresholdDays: -1, + }, + wantErr: true, + }, + { + name: "zero threshold invalid when auto reopen enabled", + cfg: &RiskConfig{ + AutoReopenEnabled: true, + AutoReopenThresholdDays: 0, + }, + wantErr: true, + }, + { + name: "disabled schedule may be invalid", + cfg: &RiskConfig{ + ReviewDeadlineReminderEnabled: false, + ReviewDeadlineReminderSchedule: "not-cron", + AutoReopenThresholdDays: 0, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} diff --git a/internal/config/workflow.go b/internal/config/workflow.go index 4c2b4953..333e3a38 100644 --- a/internal/config/workflow.go +++ b/internal/config/workflow.go @@ -3,6 +3,7 @@ package config import ( "errors" "fmt" + "os" "strings" "github.com/robfig/cron/v3" @@ -75,7 +76,7 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) { v.SetConfigType("yaml") if err := v.ReadInConfig(); err != nil { var notFound viper.ConfigFileNotFoundError - if !errors.As(err, ¬Found) { + if !errors.As(err, ¬Found) && !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("failed to read workflow config file: %w", err) } // If file not found, we continue with defaults and env vars diff --git a/internal/config/workflow_test.go b/internal/config/workflow_test.go index 6b06d101..7cf7564c 100644 --- a/internal/config/workflow_test.go +++ b/internal/config/workflow_test.go @@ -108,6 +108,16 @@ overdue_check_enabled: false assert.False(t, config.OverdueCheckEnabled) } +func TestLoadWorkflowConfig_MissingFileUsesDefaults(t *testing.T) { + config, err := LoadWorkflowConfig(filepath.Join(t.TempDir(), "does-not-exist.yaml")) + require.NoError(t, err) + + def := DefaultWorkflowConfig() + assert.Equal(t, def.SchedulerEnabled, config.SchedulerEnabled) + assert.Equal(t, def.Schedule, config.Schedule) + assert.Equal(t, def.GracePeriodDays, config.GracePeriodDays) +} + func TestWorkflowConfig_Validate(t *testing.T) { tests := []struct { name string diff --git a/internal/service/email/templates/service_test.go b/internal/service/email/templates/service_test.go index baf9f814..216d07de 100644 --- a/internal/service/email/templates/service_test.go +++ b/internal/service/email/templates/service_test.go @@ -250,6 +250,35 @@ func TestTemplateService_WorkflowTaskDigest_EmptyTasks(t *testing.T) { require.Contains(t, html, "Bob") } +func TestTemplateService_RiskTemplates(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "OwnerName": "Alice Smith", + "RiskTitle": "Weak MFA Controls", + "SSPName": "GoodRead SSP", + "RiskStatus": "risk-accepted", + "ReviewDeadline": "01/mar/2026", + "LastSeenAt": "15/feb/2026", + "RiskURL": "https://app.example.com/risks/123", + } + + for _, name := range []string{ + "risk-review-due-reminder", + "risk-review-overdue-escalation", + "risk-stale-open-reminder", + } { + html, text, err := service.Use(name, data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Alice Smith") + require.Contains(t, text, "Weak MFA Controls") + require.Contains(t, text, "https://app.example.com/risks/123") + } +} + func TestTemplateService_ListTemplates(t *testing.T) { service, err := NewTemplateService() require.NoError(t, err, "Failed to create template service") diff --git a/internal/service/email/templates/templates/risk-review-due-reminder.html b/internal/service/email/templates/templates/risk-review-due-reminder.html new file mode 100644 index 00000000..2e0088fd --- /dev/null +++ b/internal/service/email/templates/templates/risk-review-due-reminder.html @@ -0,0 +1,14 @@ + + + +

Hello {{.OwnerName}},

+

A risk review deadline is approaching.

+ +

Open risk

+ + diff --git a/internal/service/email/templates/templates/risk-review-due-reminder.txt b/internal/service/email/templates/templates/risk-review-due-reminder.txt new file mode 100644 index 00000000..50e7c0e7 --- /dev/null +++ b/internal/service/email/templates/templates/risk-review-due-reminder.txt @@ -0,0 +1,10 @@ +Hello {{.OwnerName}}, + +A risk review deadline is approaching. + +Risk: {{.RiskTitle}} +SSP: {{.SSPName}} +Status: {{.RiskStatus}} +Review deadline: {{.ReviewDeadline}} + +Open risk: {{.RiskURL}} diff --git a/internal/service/email/templates/templates/risk-review-overdue-escalation.html b/internal/service/email/templates/templates/risk-review-overdue-escalation.html new file mode 100644 index 00000000..18b2f902 --- /dev/null +++ b/internal/service/email/templates/templates/risk-review-overdue-escalation.html @@ -0,0 +1,14 @@ + + + +

Hello {{.OwnerName}},

+

A risk review is overdue and requires attention.

+ +

Open risk

+ + diff --git a/internal/service/email/templates/templates/risk-review-overdue-escalation.txt b/internal/service/email/templates/templates/risk-review-overdue-escalation.txt new file mode 100644 index 00000000..7768e75f --- /dev/null +++ b/internal/service/email/templates/templates/risk-review-overdue-escalation.txt @@ -0,0 +1,10 @@ +Hello {{.OwnerName}}, + +A risk review is overdue and requires attention. + +Risk: {{.RiskTitle}} +SSP: {{.SSPName}} +Status: {{.RiskStatus}} +Review deadline: {{.ReviewDeadline}} + +Open risk: {{.RiskURL}} diff --git a/internal/service/email/templates/templates/risk-stale-open-reminder.html b/internal/service/email/templates/templates/risk-stale-open-reminder.html new file mode 100644 index 00000000..98fd8353 --- /dev/null +++ b/internal/service/email/templates/templates/risk-stale-open-reminder.html @@ -0,0 +1,14 @@ + + + +

Hello {{.OwnerName}},

+

A risk appears stale and should be reviewed.

+ +

Open risk

+ + diff --git a/internal/service/email/templates/templates/risk-stale-open-reminder.txt b/internal/service/email/templates/templates/risk-stale-open-reminder.txt new file mode 100644 index 00000000..484ba6f5 --- /dev/null +++ b/internal/service/email/templates/templates/risk-stale-open-reminder.txt @@ -0,0 +1,10 @@ +Hello {{.OwnerName}}, + +A risk appears stale and should be reviewed. + +Risk: {{.RiskTitle}} +SSP: {{.SSPName}} +Status: {{.RiskStatus}} +Last seen: {{.LastSeenAt}} + +Open risk: {{.RiskURL}} diff --git a/internal/service/relational/ccf_internal.go b/internal/service/relational/ccf_internal.go index 4ee7dbf5..c3b233f1 100644 --- a/internal/service/relational/ccf_internal.go +++ b/internal/service/relational/ccf_internal.go @@ -35,6 +35,10 @@ type User struct { TaskAvailableEmailSubscribed bool `json:"taskAvailableEmailSubscribed" gorm:"default:false"` // TaskDailyDigestSubscribed indicates if the user wants to receive a daily task digest email TaskDailyDigestSubscribed bool `json:"taskDailyDigestSubscribed" gorm:"default:false"` + + // RiskNotificationsSubscribed indicates if the user wants to receive risk lifecycle notifications. + // The DB default is intentionally true so existing users are opted in when the column is introduced. + RiskNotificationsSubscribed bool `json:"riskNotificationsSubscribed" gorm:"default:true"` } func (User) TableName() string { diff --git a/internal/service/relational/risks/service.go b/internal/service/relational/risks/service.go index 97b12f46..227c1dce 100644 --- a/internal/service/relational/risks/service.go +++ b/internal/service/relational/risks/service.go @@ -63,6 +63,9 @@ type ReviewRiskParams struct { Decision RiskReviewDecision Notes *string NextReviewDeadline *time.Time + // RequireCurrentReviewDeadlineBefore enforces, under lock, that the current review deadline + // is set and no later than this timestamp before applying the decision. + RequireCurrentReviewDeadlineBefore *time.Time } type Associations struct { @@ -339,6 +342,13 @@ func (s *RiskService) ReviewRisk(params ReviewRiskParams) (*Risk, error) { tx.Rollback() return nil, newValidationError("only risks in status risk-accepted can be reviewed") } + if params.RequireCurrentReviewDeadlineBefore != nil { + cutoff := params.RequireCurrentReviewDeadlineBefore.UTC() + if risk.ReviewDeadline == nil || risk.ReviewDeadline.UTC().After(cutoff) { + tx.Rollback() + return nil, newValidationError("risk review deadline no longer eligible for requested decision") + } + } reviewedAt := time.Now().UTC() if params.ReviewedAt != nil { diff --git a/internal/service/worker/jobs.go b/internal/service/worker/jobs.go index a8c9628a..91aeb51c 100644 --- a/internal/service/worker/jobs.go +++ b/internal/service/worker/jobs.go @@ -173,6 +173,7 @@ type NotificationUser struct { LastName string TaskAvailableEmailSubscribed bool TaskDailyDigestSubscribed bool + RiskNotificationsSubscribed bool } func (u NotificationUser) FullName() string { @@ -660,6 +661,12 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us if db != nil { riskEvidenceWorker := NewRiskEvidenceWorker(db, logger) river.AddWorker(workers, river.WorkFunc(riskEvidenceWorker.Work)) + + riskReconcileDuplicatesWorker := NewRiskReconcileDuplicatesWorker(db, logger) + river.AddWorker(workers, river.WorkFunc(riskReconcileDuplicatesWorker.Work)) + + riskReviewOverdueReopenWorker := NewRiskReviewOverdueReopenWorker(db, logger) + river.AddWorker(workers, river.WorkFunc(riskReviewOverdueReopenWorker.Work)) } // Register workflow notification workers if dependencies are available @@ -676,6 +683,15 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us workflowExecutionFailedWorker := NewWorkflowExecutionFailedWorker(db, emailService, userRepo, webBaseURL, logger) river.AddWorker(workers, river.WorkFunc(workflowExecutionFailedWorker.Work)) + + riskReviewDueReminderWorker := NewRiskReviewDueReminderWorker(db, emailService, userRepo, webBaseURL, logger) + river.AddWorker(workers, river.WorkFunc(riskReviewDueReminderWorker.Work)) + + riskReviewOverdueEscalationWorker := NewRiskReviewOverdueEscalationWorker(db, emailService, userRepo, webBaseURL, logger) + river.AddWorker(workers, river.WorkFunc(riskReviewOverdueEscalationWorker.Work)) + + riskStaleOpenReminderWorker := NewRiskStaleOpenReminderWorker(db, emailService, userRepo, webBaseURL, logger) + river.AddWorker(workers, river.WorkFunc(riskStaleOpenReminderWorker.Work)) } } diff --git a/internal/service/worker/risk_job_types.go b/internal/service/worker/risk_job_types.go new file mode 100644 index 00000000..23a0a421 --- /dev/null +++ b/internal/service/worker/risk_job_types.go @@ -0,0 +1,105 @@ +package worker + +import ( + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" +) + +const ( + JobTypeRiskReviewDeadlineReminderScanner = "risk_review_deadline_reminder_scanner" + JobTypeRiskReviewOverdueEscalationScanner = "risk_review_overdue_escalation_scanner" + JobTypeRiskStaleRiskScanner = "risk_stale_risk_scanner" + JobTypeRiskEvidenceReconciliationScanner = "risk_evidence_reconciliation_scanner" + + JobTypeRiskReviewDueReminder = "risk_review_due_reminder" + JobTypeRiskReviewOverdueEscalation = "risk_review_overdue_escalation" + JobTypeRiskStaleOpenReminder = "risk_stale_open_reminder" + JobTypeRiskReconcileDuplicates = "risk_reconcile_duplicates" + JobTypeRiskReviewOverdueReopen = "risk_review_overdue_reopen" +) + +type RiskReviewDeadlineReminderScannerArgs struct{} +type RiskReviewOverdueEscalationScannerArgs struct{} +type RiskStaleRiskScannerArgs struct{} +type RiskEvidenceReconciliationScannerArgs struct{} + +type RiskReviewDueReminderArgs struct { + RiskID uuid.UUID `json:"risk_id"` + OwnerUserID uuid.UUID `json:"owner_user_id"` + ReviewDeadline string `json:"review_deadline"` + ReminderWindow string `json:"reminder_window"` +} + +type RiskReviewOverdueEscalationArgs struct { + RiskID uuid.UUID `json:"risk_id"` + OwnerUserID uuid.UUID `json:"owner_user_id"` + ReviewDeadline string `json:"review_deadline"` + OverdueWindow string `json:"overdue_window"` +} + +type RiskStaleOpenReminderArgs struct { + RiskID uuid.UUID `json:"risk_id"` + OwnerUserID uuid.UUID `json:"owner_user_id"` + LastSeenAt string `json:"last_seen_at"` + StaleBucketDate string `json:"stale_bucket_date"` +} + +type RiskReconcileDuplicatesArgs struct { + DedupeKey string `json:"dedupe_key"` +} + +type RiskReviewOverdueReopenArgs struct { + RiskID uuid.UUID `json:"risk_id"` + ReviewDeadline string `json:"review_deadline"` + ThresholdDays int `json:"threshold_days"` +} + +func (RiskReviewDeadlineReminderScannerArgs) Kind() string { + return JobTypeRiskReviewDeadlineReminderScanner +} +func (RiskReviewOverdueEscalationScannerArgs) Kind() string { + return JobTypeRiskReviewOverdueEscalationScanner +} +func (RiskStaleRiskScannerArgs) Kind() string { return JobTypeRiskStaleRiskScanner } +func (RiskEvidenceReconciliationScannerArgs) Kind() string { + return JobTypeRiskEvidenceReconciliationScanner +} +func (RiskReviewDueReminderArgs) Kind() string { return JobTypeRiskReviewDueReminder } +func (RiskReviewOverdueEscalationArgs) Kind() string { return JobTypeRiskReviewOverdueEscalation } +func (RiskStaleOpenReminderArgs) Kind() string { return JobTypeRiskStaleOpenReminder } +func (RiskReconcileDuplicatesArgs) Kind() string { return JobTypeRiskReconcileDuplicates } +func (RiskReviewOverdueReopenArgs) Kind() string { return JobTypeRiskReviewOverdueReopen } + +func (RiskReviewDeadlineReminderScannerArgs) Timeout() time.Duration { return 5 * time.Minute } +func (RiskReviewOverdueEscalationScannerArgs) Timeout() time.Duration { return 5 * time.Minute } +func (RiskStaleRiskScannerArgs) Timeout() time.Duration { return 5 * time.Minute } +func (RiskEvidenceReconciliationScannerArgs) Timeout() time.Duration { return 5 * time.Minute } +func (RiskReviewDueReminderArgs) Timeout() time.Duration { return 30 * time.Second } +func (RiskReviewOverdueEscalationArgs) Timeout() time.Duration { return 30 * time.Second } +func (RiskStaleOpenReminderArgs) Timeout() time.Duration { return 30 * time.Second } +func (RiskReconcileDuplicatesArgs) Timeout() time.Duration { return 2 * time.Minute } +func (RiskReviewOverdueReopenArgs) Timeout() time.Duration { return 30 * time.Second } + +func JobInsertOptionsForRiskNotification(byPeriod time.Duration) *river.InsertOpts { + return &river.InsertOpts{ + Queue: "email", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: byPeriod, + }, + } +} + +func JobInsertOptionsForRiskWorkerUnique(byPeriod time.Duration) *river.InsertOpts { + return &river.InsertOpts{ + Queue: "risk", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: byPeriod, + }, + } +} diff --git a/internal/service/worker/risk_workers.go b/internal/service/worker/risk_workers.go new file mode 100644 index 00000000..a2c5e831 --- /dev/null +++ b/internal/service/worker/risk_workers.go @@ -0,0 +1,629 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/compliance-framework/api/internal/service/relational" + riskrel "github.com/compliance-framework/api/internal/service/relational/risks" + "github.com/compliance-framework/api/internal/workflow" + "github.com/google/uuid" + "github.com/riverqueue/river" + "go.uber.org/zap" + "gorm.io/gorm" +) + +const riskScannerBatchSize = 1000 + +type RiskReviewDeadlineReminderScannerWorker struct { + db *gorm.DB + client workflow.RiverClient + logger *zap.SugaredLogger +} + +func NewRiskReviewDeadlineReminderScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskReviewDeadlineReminderScannerWorker { + return &RiskReviewDeadlineReminderScannerWorker{db: db, client: client, logger: logger} +} + +func (w *RiskReviewDeadlineReminderScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewDeadlineReminderScannerArgs]) error { + now := time.Now().UTC() + windowEnd := now.Add(30 * 24 * time.Hour) + + var ( + risks []riskrel.Risk + totalEnqueued int + ) + if err := w.db.WithContext(ctx). + Where("status = ? AND review_deadline IS NOT NULL AND review_deadline > ? AND review_deadline <= ?", + string(riskrel.RiskStatusRiskAccepted), now, windowEnd). + FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error { + ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks) + if err != nil { + return fmt.Errorf("risk deadline reminder scanner: resolve owners failed: %w", err) + } + + params := make([]river.InsertManyParams, 0, len(risks)) + for i := range risks { + risk := &risks[i] + if risk.ID == nil { + continue + } + ownerIDs := ownersByRiskID[*risk.ID] + for _, ownerID := range ownerIDs { + params = append(params, river.InsertManyParams{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339), + ReminderWindow: "30d", + }, + InsertOpts: JobInsertOptionsForRiskNotification(24 * time.Hour), + }) + } + } + + if len(params) == 0 { + return nil + } + + if _, err := w.client.InsertMany(ctx, params); err != nil { + return fmt.Errorf("risk deadline reminder scanner: enqueue failed: %w", err) + } + totalEnqueued += len(params) + return nil + }).Error; err != nil { + return fmt.Errorf("risk deadline reminder scanner: query failed: %w", err) + } + + if totalEnqueued == 0 { + w.logger.Infow("RiskReviewDeadlineReminderScannerWorker: no reminders to enqueue") + return nil + } + + w.logger.Infow("RiskReviewDeadlineReminderScannerWorker: enqueued reminders", "count", totalEnqueued) + return nil +} + +type RiskReviewOverdueEscalationScannerWorker struct { + db *gorm.DB + client workflow.RiverClient + logger *zap.SugaredLogger + autoReopenEnabled bool + autoReopenThresholdDays int +} + +func NewRiskReviewOverdueEscalationScannerWorker( + db *gorm.DB, + client workflow.RiverClient, + logger *zap.SugaredLogger, + autoReopenEnabled bool, + autoReopenThresholdDays int, +) *RiskReviewOverdueEscalationScannerWorker { + return &RiskReviewOverdueEscalationScannerWorker{ + db: db, + client: client, + logger: logger, + autoReopenEnabled: autoReopenEnabled, + autoReopenThresholdDays: autoReopenThresholdDays, + } +} + +func (w *RiskReviewOverdueEscalationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewOverdueEscalationScannerArgs]) error { + now := time.Now().UTC() + threshold := time.Duration(w.autoReopenThresholdDays) * 24 * time.Hour + + var ( + risks []riskrel.Risk + totalEnqueued int + totalReopenCount int + ) + if err := w.db.WithContext(ctx). + Where("status = ? AND review_deadline IS NOT NULL AND review_deadline < ?", + string(riskrel.RiskStatusRiskAccepted), now). + FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error { + ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks) + if err != nil { + return fmt.Errorf("risk overdue escalation scanner: resolve owners failed: %w", err) + } + + params := make([]river.InsertManyParams, 0, len(risks)) + overdueWindow := now.Format("2006-01-02") + for i := range risks { + risk := &risks[i] + if risk.ID == nil { + continue + } + ownerIDs := ownersByRiskID[*risk.ID] + for _, ownerID := range ownerIDs { + params = append(params, river.InsertManyParams{ + Args: RiskReviewOverdueEscalationArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339), + OverdueWindow: overdueWindow, + }, + InsertOpts: JobInsertOptionsForRiskNotification(24 * time.Hour), + }) + } + + if w.autoReopenEnabled && w.autoReopenThresholdDays > 0 { + overdueFor := now.Sub(risk.ReviewDeadline.UTC()) + if overdueFor >= threshold { + params = append(params, river.InsertManyParams{ + Args: RiskReviewOverdueReopenArgs{ + RiskID: *risk.ID, + ReviewDeadline: risk.ReviewDeadline.UTC().Format(time.RFC3339), + ThresholdDays: w.autoReopenThresholdDays, + }, + InsertOpts: JobInsertOptionsForRiskWorkerUnique(24 * time.Hour), + }) + totalReopenCount++ + } + } + } + + if len(params) == 0 { + return nil + } + + if _, err := w.client.InsertMany(ctx, params); err != nil { + return fmt.Errorf("risk overdue escalation scanner: enqueue failed: %w", err) + } + totalEnqueued += len(params) + return nil + }).Error; err != nil { + return fmt.Errorf("risk overdue escalation scanner: query failed: %w", err) + } + + if totalEnqueued == 0 { + w.logger.Infow("RiskReviewOverdueEscalationScannerWorker: no escalations to enqueue") + return nil + } + + w.logger.Infow("RiskReviewOverdueEscalationScannerWorker: enqueued jobs", "count", totalEnqueued, "reopen_count", totalReopenCount) + return nil +} + +type RiskStaleRiskScannerWorker struct { + db *gorm.DB + client workflow.RiverClient + logger *zap.SugaredLogger +} + +func NewRiskStaleRiskScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskStaleRiskScannerWorker { + return &RiskStaleRiskScannerWorker{db: db, client: client, logger: logger} +} + +func (w *RiskStaleRiskScannerWorker) Work(ctx context.Context, _ *river.Job[RiskStaleRiskScannerArgs]) error { + now := time.Now().UTC() + cutoff := now.Add(-30 * 24 * time.Hour) + staleBucketDate := startOfISOWeekUTC(now).Format("2006-01-02") + + var ( + risks []riskrel.Risk + totalEnqueued int + ) + if err := w.db.WithContext(ctx). + Where("status IN ? AND last_seen_at <= ?", + []string{ + string(riskrel.RiskStatusOpen), + string(riskrel.RiskStatusInvestigating), + string(riskrel.RiskStatusMitigatingPlanned), + string(riskrel.RiskStatusMitigatingImplemented), + }, + cutoff). + FindInBatches(&risks, riskScannerBatchSize, func(_ *gorm.DB, _ int) error { + ownersByRiskID, err := resolveRiskOwnerUserIDsBatch(ctx, w.db, risks) + if err != nil { + return fmt.Errorf("risk stale scanner: resolve owners failed: %w", err) + } + + params := make([]river.InsertManyParams, 0, len(risks)) + for i := range risks { + risk := &risks[i] + if risk.ID == nil { + continue + } + ownerIDs := ownersByRiskID[*risk.ID] + for _, ownerID := range ownerIDs { + params = append(params, river.InsertManyParams{ + Args: RiskStaleOpenReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + LastSeenAt: risk.LastSeenAt.UTC().Format(time.RFC3339), + StaleBucketDate: staleBucketDate, + }, + InsertOpts: JobInsertOptionsForRiskNotification(7 * 24 * time.Hour), + }) + } + } + + if len(params) == 0 { + return nil + } + + if _, err := w.client.InsertMany(ctx, params); err != nil { + return fmt.Errorf("risk stale scanner: enqueue failed: %w", err) + } + totalEnqueued += len(params) + return nil + }).Error; err != nil { + return fmt.Errorf("risk stale scanner: query failed: %w", err) + } + + if totalEnqueued == 0 { + w.logger.Infow("RiskStaleRiskScannerWorker: no stale reminders to enqueue") + return nil + } + + w.logger.Infow("RiskStaleRiskScannerWorker: enqueued stale reminders", "count", totalEnqueued) + return nil +} + +type RiskEvidenceReconciliationScannerWorker struct { + db *gorm.DB + client workflow.RiverClient + logger *zap.SugaredLogger +} + +func NewRiskEvidenceReconciliationScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskEvidenceReconciliationScannerWorker { + return &RiskEvidenceReconciliationScannerWorker{db: db, client: client, logger: logger} +} + +func (w *RiskEvidenceReconciliationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskEvidenceReconciliationScannerArgs]) error { + var params []river.InsertManyParams + + // Duplicate active risks with same dedupe key. + var duplicateKeys []string + if err := w.db.WithContext(ctx). + Model(&riskrel.Risk{}). + Select("dedupe_key"). + Where("status <> ? AND dedupe_key <> ''", string(riskrel.RiskStatusClosed)). + Group("dedupe_key"). + Having("COUNT(*) > 1"). + Pluck("dedupe_key", &duplicateKeys).Error; err != nil { + return fmt.Errorf("risk reconciliation scanner: query duplicate dedupe keys failed: %w", err) + } + for _, key := range duplicateKeys { + params = append(params, river.InsertManyParams{ + Args: RiskReconcileDuplicatesArgs{DedupeKey: key}, + InsertOpts: JobInsertOptionsForRiskWorkerUnique(24 * time.Hour), + }) + } + + if len(params) == 0 { + w.logger.Infow("RiskEvidenceReconciliationScannerWorker: no reconciliation jobs to enqueue") + return nil + } + + if _, err := w.client.InsertMany(ctx, params); err != nil { + return fmt.Errorf("risk reconciliation scanner: enqueue failed: %w", err) + } + + w.logger.Infow("RiskEvidenceReconciliationScannerWorker: enqueued reconciliation jobs", "count", len(params)) + return nil +} + +type RiskReviewDueReminderWorker struct { + db *gorm.DB + emailService EmailService + userRepo UserRepository + webBaseURL string + logger *zap.SugaredLogger +} + +func NewRiskReviewDueReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewDueReminderWorker { + return &RiskReviewDueReminderWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger} +} + +func (w *RiskReviewDueReminderWorker) Work(ctx context.Context, job *river.Job[RiskReviewDueReminderArgs]) error { + return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-review-due-reminder", "Risk review due soon") +} + +type RiskReviewOverdueEscalationWorker struct { + db *gorm.DB + emailService EmailService + userRepo UserRepository + webBaseURL string + logger *zap.SugaredLogger +} + +func NewRiskReviewOverdueEscalationWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewOverdueEscalationWorker { + return &RiskReviewOverdueEscalationWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger} +} + +func (w *RiskReviewOverdueEscalationWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueEscalationArgs]) error { + return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-review-overdue-escalation", "Risk review overdue") +} + +type RiskStaleOpenReminderWorker struct { + db *gorm.DB + emailService EmailService + userRepo UserRepository + webBaseURL string + logger *zap.SugaredLogger +} + +func NewRiskStaleOpenReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskStaleOpenReminderWorker { + return &RiskStaleOpenReminderWorker{db: db, emailService: emailService, userRepo: userRepo, webBaseURL: webBaseURL, logger: logger} +} + +func (w *RiskStaleOpenReminderWorker) Work(ctx context.Context, job *river.Job[RiskStaleOpenReminderArgs]) error { + return w.sendRiskNotification(ctx, job.Args.RiskID, job.Args.OwnerUserID, "risk-stale-open-reminder", "Stale risk reminder") +} + +func (w *RiskReviewDueReminderWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error { + return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix) +} + +func (w *RiskReviewOverdueEscalationWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error { + return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix) +} + +func (w *RiskStaleOpenReminderWorker) sendRiskNotification(ctx context.Context, riskID, ownerUserID uuid.UUID, templateName, subjectPrefix string) error { + return sendRiskNotification(ctx, w.db, w.emailService, w.userRepo, w.webBaseURL, w.logger, riskID, ownerUserID, templateName, subjectPrefix) +} + +func sendRiskNotification( + ctx context.Context, + db *gorm.DB, + emailService EmailService, + userRepo UserRepository, + webBaseURL string, + logger *zap.SugaredLogger, + riskID, ownerUserID uuid.UUID, + templateName, subjectPrefix string, +) error { + var risk riskrel.Risk + if err := db.WithContext(ctx).First(&risk, "id = ?", riskID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + logger.Warnw("Risk notification worker: risk not found, skipping", "risk_id", riskID) + return nil + } + return fmt.Errorf("risk notification worker: load risk failed: %w", err) + } + + user, err := userRepo.FindUserByID(ctx, ownerUserID.String()) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + logger.Warnw("Risk notification worker: owner user not found, skipping", "risk_id", riskID, "owner_user_id", ownerUserID, "error", err) + return nil + } + return fmt.Errorf("risk notification worker: load owner user failed: %w", err) + } + if !user.RiskNotificationsSubscribed { + logger.Debugw("Risk notification worker: owner unsubscribed, skipping", "risk_id", riskID, "owner_user_id", ownerUserID) + return nil + } + + riskTitle := strings.TrimSpace(risk.Title) + if riskTitle == "" { + riskTitle = riskID.String() + } + + sspName := resolveSSPDisplayName(ctx, db, risk.SSPID) + reviewDeadline := "" + if risk.ReviewDeadline != nil { + reviewDeadline = formatDate(*risk.ReviewDeadline) + } + + templateData := map[string]interface{}{ + "OwnerName": user.FullName(), + "RiskTitle": riskTitle, + "SSPName": sspName, + "RiskStatus": risk.Status, + "ReviewDeadline": reviewDeadline, + "LastSeenAt": formatDate(risk.LastSeenAt), + "RiskURL": resolveRiskURL(webBaseURL, riskID), + } + + htmlBody, textBody, err := emailService.UseTemplate(templateName, templateData) + if err != nil { + return fmt.Errorf("risk notification worker: render template %q failed: %w", templateName, err) + } + + message := &types.Message{ + From: emailService.GetDefaultFromAddress(), + To: []string{user.Email}, + Subject: fmt.Sprintf("%s: %s", subjectPrefix, riskTitle), + HTMLBody: htmlBody, + TextBody: textBody, + } + result, err := emailService.Send(ctx, message) + if err != nil { + return fmt.Errorf("risk notification worker: send email failed: %w", err) + } + if !result.Success { + return fmt.Errorf("risk notification worker: email send failed: %s", result.Error) + } + + logger.Infow("Risk notification worker: email sent", + "risk_id", riskID, + "owner_user_id", ownerUserID, + "template", templateName, + "message_id", result.MessageID, + ) + return nil +} + +type RiskReconcileDuplicatesWorker struct { + db *gorm.DB + logger *zap.SugaredLogger +} + +func NewRiskReconcileDuplicatesWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReconcileDuplicatesWorker { + return &RiskReconcileDuplicatesWorker{db: db, logger: logger} +} + +func (w *RiskReconcileDuplicatesWorker) Work(ctx context.Context, job *river.Job[RiskReconcileDuplicatesArgs]) error { + if strings.TrimSpace(job.Args.DedupeKey) == "" { + return nil + } + + var duplicates []riskrel.Risk + if err := w.db.WithContext(ctx). + Where("dedupe_key = ? AND status <> ?", job.Args.DedupeKey, string(riskrel.RiskStatusClosed)). + Order("created_at ASC, id ASC"). + Find(&duplicates).Error; err != nil { + return fmt.Errorf("risk reconcile duplicates: load duplicates failed: %w", err) + } + if len(duplicates) <= 1 { + return nil + } + + riskSvc := riskrel.NewRiskService(w.db.WithContext(ctx)) + keeper := duplicates[0] + for i := 1; i < len(duplicates); i++ { + risk := duplicates[i] + oldStatus := risk.Status + risk.Status = string(riskrel.RiskStatusClosed) + if _, err := riskSvc.Update(riskrel.UpdateRiskParams{ + Risk: &risk, + OldStatus: oldStatus, + StatusChanged: oldStatus != risk.Status, + }); err != nil { + return fmt.Errorf("risk reconcile duplicates: close duplicate %s failed: %w", risk.ID.String(), err) + } + } + + w.logger.Infow("RiskReconcileDuplicatesWorker: reconciled duplicates", + "dedupe_key", job.Args.DedupeKey, + "kept_risk_id", keeper.ID.String(), + "closed_count", len(duplicates)-1, + ) + return nil +} + +type RiskReviewOverdueReopenWorker struct { + db *gorm.DB + logger *zap.SugaredLogger +} + +func NewRiskReviewOverdueReopenWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReviewOverdueReopenWorker { + return &RiskReviewOverdueReopenWorker{db: db, logger: logger} +} + +func (w *RiskReviewOverdueReopenWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueReopenArgs]) error { + if job.Args.ThresholdDays <= 0 { + w.logger.Infow("RiskReviewOverdueReopenWorker: skipping reopen due to non-positive threshold_days", + "risk_id", job.Args.RiskID, + "threshold_days", job.Args.ThresholdDays, + ) + return nil + } + + now := time.Now().UTC() + threshold := time.Duration(job.Args.ThresholdDays) * 24 * time.Hour + cutoff := now.Add(-threshold) + + riskSvc := riskrel.NewRiskService(w.db.WithContext(ctx)) + if _, err := riskSvc.ReviewRisk(riskrel.ReviewRiskParams{ + RiskID: job.Args.RiskID, + Decision: riskrel.RiskReviewDecisionReopen, + ReviewedAt: &now, + RequireCurrentReviewDeadlineBefore: &cutoff, + }); err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) || riskrel.IsValidationError(err) { + return nil + } + return fmt.Errorf("risk overdue reopen: reopen review failed: %w", err) + } + + w.logger.Infow("RiskReviewOverdueReopenWorker: reopened overdue accepted risk", + "risk_id", job.Args.RiskID, + "threshold_days", job.Args.ThresholdDays, + ) + return nil +} + +func resolveRiskOwnerUserIDsBatch(ctx context.Context, db *gorm.DB, risks []riskrel.Risk) (map[uuid.UUID][]uuid.UUID, error) { + ownerSetByRiskID := make(map[uuid.UUID]map[uuid.UUID]struct{}, len(risks)) + riskIDs := make([]uuid.UUID, 0, len(risks)) + for i := range risks { + risk := &risks[i] + if risk.ID == nil { + continue + } + riskID := *risk.ID + riskIDs = append(riskIDs, riskID) + if _, ok := ownerSetByRiskID[riskID]; !ok { + ownerSetByRiskID[riskID] = make(map[uuid.UUID]struct{}, 4) + } + if risk.PrimaryOwnerUserID != nil { + ownerSetByRiskID[riskID][*risk.PrimaryOwnerUserID] = struct{}{} + } + } + if len(riskIDs) == 0 { + return map[uuid.UUID][]uuid.UUID{}, nil + } + + var assignments []riskrel.RiskOwnerAssignment + if err := db.WithContext(ctx). + Where("risk_id IN ? AND owner_kind = ?", riskIDs, "user"). + Find(&assignments).Error; err != nil { + return nil, err + } + + for _, assignment := range assignments { + parsed, err := uuid.Parse(assignment.OwnerRef) + if err != nil { + continue + } + set, ok := ownerSetByRiskID[assignment.RiskID] + if !ok { + set = make(map[uuid.UUID]struct{}, 1) + ownerSetByRiskID[assignment.RiskID] = set + } + set[parsed] = struct{}{} + } + + ownersByRiskID := make(map[uuid.UUID][]uuid.UUID, len(ownerSetByRiskID)) + for riskID, ownerSet := range ownerSetByRiskID { + owners := make([]uuid.UUID, 0, len(ownerSet)) + for ownerID := range ownerSet { + owners = append(owners, ownerID) + } + sort.Slice(owners, func(i, j int) bool { return owners[i].String() < owners[j].String() }) + ownersByRiskID[riskID] = owners + } + + return ownersByRiskID, nil +} + +func resolveSSPDisplayName(ctx context.Context, db *gorm.DB, sspID uuid.UUID) string { + var sc relational.SystemCharacteristics + if err := db.WithContext(ctx). + Select("system_name_short", "system_name"). + First(&sc, "system_security_plan_id = ?", sspID).Error; err != nil { + return sspID.String() + } + + name := strings.TrimSpace(sc.SystemNameShort) + if name != "" { + return name + } + name = strings.TrimSpace(sc.SystemName) + if name != "" { + return name + } + return sspID.String() +} + +func resolveRiskURL(webBaseURL string, riskID uuid.UUID) string { + base := strings.TrimRight(webBaseURL, "/") + return base + "/risks/" + riskID.String() +} + +func startOfISOWeekUTC(t time.Time) time.Time { + d := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) + wd := int(d.Weekday()) + if wd == 0 { + wd = 7 + } + return d.AddDate(0, 0, -(wd - 1)) +} diff --git a/internal/service/worker/risk_workers_test.go b/internal/service/worker/risk_workers_test.go new file mode 100644 index 00000000..edb3e2f4 --- /dev/null +++ b/internal/service/worker/risk_workers_test.go @@ -0,0 +1,599 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/compliance-framework/api/internal/service/relational" + riskrel "github.com/compliance-framework/api/internal/service/relational/risks" + "github.com/google/uuid" + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +type stubRiverClient struct { + params []river.InsertManyParams + err error +} + +func (s *stubRiverClient) InsertMany(_ context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) { + s.params = append(s.params, params...) + if s.err != nil { + return nil, s.err + } + return []*rivertype.JobInsertResult{}, nil +} + +type stubUserRepository struct { + user NotificationUser + err error +} + +func (s *stubUserRepository) FindUserByID(_ context.Context, _ string) (NotificationUser, error) { + if s.err != nil { + return NotificationUser{}, s.err + } + return s.user, nil +} + +func newRiskWorkersTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate( + &relational.User{}, + &relational.SystemSecurityPlan{}, + &relational.SystemCharacteristics{}, + &relational.Evidence{}, + &relational.Labels{}, + &relational.AssessmentSubject{}, + &relational.SystemComponent{}, + &relational.InventoryItem{}, + &riskrel.Risk{}, + &riskrel.RiskOwnerAssignment{}, + &riskrel.RiskEvidenceLink{}, + &riskrel.RiskSubjectLink{}, + &riskrel.RiskEvent{}, + &riskrel.RiskReview{}, + )) + return db +} + +func createTestRiskWithOwner(t *testing.T, db *gorm.DB, status riskrel.RiskStatus, reviewDeadline *time.Time, lastSeenAt time.Time) (riskrel.Risk, uuid.UUID) { + t.Helper() + sspID := uuid.New() + require.NoError(t, db.Create(&relational.SystemSecurityPlan{ + UUIDModel: relational.UUIDModel{ID: &sspID}, + }).Error) + + ownerID := uuid.New() + riskID := uuid.New() + r := riskrel.Risk{ + UUIDModel: relational.UUIDModel{ID: &riskID}, + Title: "Test Risk", + Description: "Test Risk Description", + Status: string(status), + SSPID: sspID, + SourceType: string(riskrel.RiskSourceTypeManual), + ReviewDeadline: reviewDeadline, + FirstSeenAt: time.Now().UTC().Add(-24 * time.Hour), + LastSeenAt: lastSeenAt, + } + require.NoError(t, db.Create(&r).Error) + require.NoError(t, db.Create(&riskrel.RiskOwnerAssignment{ + RiskID: riskID, + OwnerKind: "user", + OwnerRef: ownerID.String(), + IsPrimary: true, + }).Error) + return r, ownerID +} + +func createTestUser(t *testing.T, db *gorm.DB, id uuid.UUID, subscribed bool) { + t.Helper() + require.NoError(t, db.Model(&relational.User{}).Create(map[string]interface{}{ + "id": id, + "email": id.String() + "@example.com", + "first_name": "Risk", + "last_name": "Owner", + "auth_method": "password", + "risk_notifications_subscribed": subscribed, + }).Error) +} + +func TestRiskReviewDeadlineReminderScannerWorker_EnqueuesPerUserOwner(t *testing.T) { + db := newRiskWorkersTestDB(t) + client := &stubRiverClient{} + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(29 * 24 * time.Hour) + _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + + w := NewRiskReviewDeadlineReminderScannerWorker(db, client, logger) + err := w.Work(context.Background(), &river.Job[RiskReviewDeadlineReminderScannerArgs]{}) + require.NoError(t, err) + require.Len(t, client.params, 1) + + args, ok := client.params[0].Args.(RiskReviewDueReminderArgs) + require.True(t, ok) + assert.Equal(t, "30d", args.ReminderWindow) + assert.Equal(t, 24*time.Hour, client.params[0].InsertOpts.UniqueOpts.ByPeriod) +} + +func TestRiskReviewOverdueEscalationScannerWorker_EnqueuesEscalationAndReopen(t *testing.T) { + db := newRiskWorkersTestDB(t) + client := &stubRiverClient{} + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(-31 * 24 * time.Hour) + _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now.Add(-40*24*time.Hour)) + + w := NewRiskReviewOverdueEscalationScannerWorker(db, client, logger, true, 30) + err := w.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationScannerArgs]{}) + require.NoError(t, err) + + var escalationCount, reopenCount int + var reopenInsertOpts *river.InsertOpts + for _, p := range client.params { + switch p.Args.(type) { + case RiskReviewOverdueEscalationArgs: + escalationCount++ + case RiskReviewOverdueReopenArgs: + reopenCount++ + reopenInsertOpts = p.InsertOpts + } + } + assert.Equal(t, 1, escalationCount) + assert.Equal(t, 1, reopenCount) + require.NotNil(t, reopenInsertOpts) + assert.Equal(t, 24*time.Hour, reopenInsertOpts.UniqueOpts.ByPeriod) + assert.Equal(t, 3, reopenInsertOpts.MaxAttempts) +} + +func TestRiskStaleRiskScannerWorker_EnqueuesWeeklyReminder(t *testing.T) { + db := newRiskWorkersTestDB(t) + client := &stubRiverClient{} + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-31*24*time.Hour)) + + w := NewRiskStaleRiskScannerWorker(db, client, logger) + err := w.Work(context.Background(), &river.Job[RiskStaleRiskScannerArgs]{}) + require.NoError(t, err) + require.Len(t, client.params, 1) + + _, ok := client.params[0].Args.(RiskStaleOpenReminderArgs) + require.True(t, ok) + assert.Equal(t, 7*24*time.Hour, client.params[0].InsertOpts.UniqueOpts.ByPeriod) +} + +func TestRiskStaleRiskScannerWorker_EnqueuesMitigatingImplemented(t *testing.T) { + db := newRiskWorkersTestDB(t) + client := &stubRiverClient{} + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + _, _ = createTestRiskWithOwner(t, db, riskrel.RiskStatusMitigatingImplemented, nil, now.Add(-31*24*time.Hour)) + + w := NewRiskStaleRiskScannerWorker(db, client, logger) + err := w.Work(context.Background(), &river.Job[RiskStaleRiskScannerArgs]{}) + require.NoError(t, err) + require.Len(t, client.params, 1) + + _, ok := client.params[0].Args.(RiskStaleOpenReminderArgs) + require.True(t, ok) +} + +func TestRiskEvidenceReconciliationScannerWorker_EnqueuesDuplicateRepairJobs(t *testing.T) { + db := newRiskWorkersTestDB(t) + client := &stubRiverClient{} + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + + // Duplicate active risks by dedupe key + dupSSP := uuid.New() + require.NoError(t, db.Create(&relational.SystemSecurityPlan{UUIDModel: relational.UUIDModel{ID: &dupSSP}}).Error) + r1ID := uuid.New() + r2ID := uuid.New() + require.NoError(t, db.Create(&riskrel.Risk{ + UUIDModel: relational.UUIDModel{ID: &r1ID}, + Title: "dup1", + Description: "dup1", + Status: string(riskrel.RiskStatusOpen), + SSPID: dupSSP, + SourceType: string(riskrel.RiskSourceTypeManual), + DedupeKey: "dup-key", + FirstSeenAt: now, + LastSeenAt: now, + }).Error) + require.NoError(t, db.Create(&riskrel.Risk{ + UUIDModel: relational.UUIDModel{ID: &r2ID}, + Title: "dup2", + Description: "dup2", + Status: string(riskrel.RiskStatusInvestigating), + SSPID: dupSSP, + SourceType: string(riskrel.RiskSourceTypeManual), + DedupeKey: "dup-key", + FirstSeenAt: now, + LastSeenAt: now, + }).Error) + + w := NewRiskEvidenceReconciliationScannerWorker(db, client, logger) + err := w.Work(context.Background(), &river.Job[RiskEvidenceReconciliationScannerArgs]{}) + require.NoError(t, err) + + var sawDuplicateRepair bool + for _, p := range client.params { + switch args := p.Args.(type) { + case RiskReconcileDuplicatesArgs: + if args.DedupeKey == "dup-key" { + sawDuplicateRepair = true + } + } + } + assert.True(t, sawDuplicateRepair) +} + +func TestRiskReconcileDuplicatesWorker_ClosesNewerRisk(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + sspID := uuid.New() + require.NoError(t, db.Create(&relational.SystemSecurityPlan{UUIDModel: relational.UUIDModel{ID: &sspID}}).Error) + + oldID := uuid.New() + newID := uuid.New() + require.NoError(t, db.Create(&riskrel.Risk{ + UUIDModel: relational.UUIDModel{ID: &oldID}, + Title: "old", + Description: "old", + Status: string(riskrel.RiskStatusOpen), + SSPID: sspID, + SourceType: string(riskrel.RiskSourceTypeManual), + DedupeKey: "dup-close", + FirstSeenAt: time.Now().UTC(), + LastSeenAt: time.Now().UTC(), + CreatedAt: time.Now().UTC().Add(-time.Hour), + }).Error) + require.NoError(t, db.Create(&riskrel.Risk{ + UUIDModel: relational.UUIDModel{ID: &newID}, + Title: "new", + Description: "new", + Status: string(riskrel.RiskStatusInvestigating), + SSPID: sspID, + SourceType: string(riskrel.RiskSourceTypeManual), + DedupeKey: "dup-close", + FirstSeenAt: time.Now().UTC(), + LastSeenAt: time.Now().UTC(), + CreatedAt: time.Now().UTC(), + }).Error) + + w := NewRiskReconcileDuplicatesWorker(db, logger) + err := w.Work(context.Background(), &river.Job[RiskReconcileDuplicatesArgs]{Args: RiskReconcileDuplicatesArgs{DedupeKey: "dup-close"}}) + require.NoError(t, err) + + var oldRisk, newRisk riskrel.Risk + require.NoError(t, db.First(&oldRisk, "id = ?", oldID).Error) + require.NoError(t, db.First(&newRisk, "id = ?", newID).Error) + assert.Equal(t, string(riskrel.RiskStatusOpen), oldRisk.Status) + assert.Equal(t, string(riskrel.RiskStatusClosed), newRisk.Status) +} + +func TestRiskReviewOverdueReopenWorker_ReopensAcceptedRisk(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + reviewDeadline := time.Now().UTC().Add(-40 * 24 * time.Hour) + risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC()) + justification := "temporarily accepted due to compensating controls" + require.NoError(t, db.Model(&riskrel.Risk{}). + Where("id = ?", risk.ID). + Update("acceptance_justification", justification).Error) + + w := NewRiskReviewOverdueReopenWorker(db, logger) + err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{ + Args: RiskReviewOverdueReopenArgs{ + RiskID: *risk.ID, + ThresholdDays: 30, + }, + }) + require.NoError(t, err) + + var updated riskrel.Risk + require.NoError(t, db.First(&updated, "id = ?", risk.ID).Error) + assert.Equal(t, string(riskrel.RiskStatusInvestigating), updated.Status) + assert.Nil(t, updated.ReviewDeadline) + assert.Nil(t, updated.AcceptanceJustification) + + var statusChangeEvents int64 + require.NoError(t, db.Model(&riskrel.RiskEvent{}). + Where("risk_id = ? AND event_type = ?", risk.ID, string(riskrel.RiskEventTypeStatusChange)). + Count(&statusChangeEvents).Error) + assert.Equal(t, int64(1), statusChangeEvents) +} + +func TestRiskReviewOverdueReopenWorker_SkipsNonPositiveThreshold(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + reviewDeadline := time.Now().UTC().Add(-40 * 24 * time.Hour) + risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC()) + + w := NewRiskReviewOverdueReopenWorker(db, logger) + err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{ + Args: RiskReviewOverdueReopenArgs{ + RiskID: *risk.ID, + ThresholdDays: 0, + }, + }) + require.NoError(t, err) + + var unchanged riskrel.Risk + require.NoError(t, db.First(&unchanged, "id = ?", risk.ID).Error) + assert.Equal(t, string(riskrel.RiskStatusRiskAccepted), unchanged.Status) +} + +func TestRiskReviewOverdueReopenWorker_SkipsWhenNotYetOverdueForThreshold(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + reviewDeadline := time.Now().UTC().Add(-5 * 24 * time.Hour) + risk, _ := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, time.Now().UTC()) + + w := NewRiskReviewOverdueReopenWorker(db, logger) + err := w.Work(context.Background(), &river.Job[RiskReviewOverdueReopenArgs]{ + Args: RiskReviewOverdueReopenArgs{ + RiskID: *risk.ID, + ThresholdDays: 30, + }, + }) + require.NoError(t, err) + + var unchanged riskrel.Risk + require.NoError(t, db.First(&unchanged, "id = ?", risk.ID).Error) + assert.Equal(t, string(riskrel.RiskStatusRiskAccepted), unchanged.Status) + assert.NotNil(t, unchanged.ReviewDeadline) +} + +func TestRiskReviewDueReminderWorker_RespectsRiskSubscription(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, false) + + mockEmail := &MockEmailService{} + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestRiskReviewDueReminderWorker_SendsWhenSubscribed(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, true) + + mockEmail := &MockEmailService{} + mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("ok", "ok", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool { + return len(msg.To) == 1 && msg.To[0] == ownerID.String()+"@example.com" + })).Return(&types.SendResult{Success: true, MessageID: "risk-msg"}, nil) + + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertExpectations(t) +} + +func TestRiskReviewDueReminderWorker_TemplateError_ReturnsError(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, true) + + mockEmail := &MockEmailService{} + mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("", "", errors.New("template boom")) + + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "render template") + require.ErrorContains(t, err, "template boom") + mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) +} + +func TestRiskReviewDueReminderWorker_SendUnsuccessful_ReturnsError(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, true) + + mockEmail := &MockEmailService{} + mockEmail.On("UseTemplate", "risk-review-due-reminder", mock.Anything).Return("ok", "ok", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", mock.Anything, mock.Anything). + Return(&types.SendResult{Success: false, Error: "provider refused"}, nil) + + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "email send failed: provider refused") +} + +func TestRiskReviewDueReminderWorker_UserNotFound_Skips(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + + mockEmail := &MockEmailService{} + userRepo := &stubUserRepository{err: gorm.ErrRecordNotFound} + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) +} + +func TestRiskReviewDueReminderWorker_UserLookupError_ReturnsError(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(7 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + + mockEmail := &MockEmailService{} + userRepo := &stubUserRepository{err: errors.New("database unavailable")} + worker := NewRiskReviewDueReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewDueReminderArgs]{ + Args: RiskReviewDueReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.Error(t, err) + require.ErrorContains(t, err, "load owner user failed") + mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) +} + +func TestRiskReviewOverdueEscalationWorker_SendsWhenSubscribed(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(-2 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, true) + + mockEmail := &MockEmailService{} + mockEmail.On("UseTemplate", "risk-review-overdue-escalation", mock.Anything).Return("ok", "ok", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool { + return len(msg.To) == 1 && + msg.To[0] == ownerID.String()+"@example.com" && + msg.Subject == "Risk review overdue: Test Risk" + })).Return(&types.SendResult{Success: true, MessageID: "risk-overdue-msg"}, nil) + + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewOverdueEscalationWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationArgs]{ + Args: RiskReviewOverdueEscalationArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertExpectations(t) +} + +func TestRiskReviewOverdueEscalationWorker_RespectsRiskSubscription(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + reviewDeadline := now.Add(-2 * 24 * time.Hour) + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusRiskAccepted, &reviewDeadline, now) + createTestUser(t, db, ownerID, false) + + mockEmail := &MockEmailService{} + userRepo := NewGORMUserRepository(db) + worker := NewRiskReviewOverdueEscalationWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskReviewOverdueEscalationArgs]{ + Args: RiskReviewOverdueEscalationArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) +} + +func TestRiskStaleOpenReminderWorker_SendsWhenSubscribed(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-35*24*time.Hour)) + createTestUser(t, db, ownerID, true) + + mockEmail := &MockEmailService{} + mockEmail.On("UseTemplate", "risk-stale-open-reminder", mock.Anything).Return("ok", "ok", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", mock.Anything, mock.MatchedBy(func(msg *types.Message) bool { + return len(msg.To) == 1 && + msg.To[0] == ownerID.String()+"@example.com" && + msg.Subject == "Stale risk reminder: Test Risk" + })).Return(&types.SendResult{Success: true, MessageID: "risk-stale-msg"}, nil) + + userRepo := NewGORMUserRepository(db) + worker := NewRiskStaleOpenReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskStaleOpenReminderArgs]{ + Args: RiskStaleOpenReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertExpectations(t) +} + +func TestRiskStaleOpenReminderWorker_RespectsRiskSubscription(t *testing.T) { + db := newRiskWorkersTestDB(t) + logger := zap.NewNop().Sugar() + now := time.Now().UTC() + risk, ownerID := createTestRiskWithOwner(t, db, riskrel.RiskStatusOpen, nil, now.Add(-35*24*time.Hour)) + createTestUser(t, db, ownerID, false) + + mockEmail := &MockEmailService{} + userRepo := NewGORMUserRepository(db) + worker := NewRiskStaleOpenReminderWorker(db, mockEmail, userRepo, "https://app.example.com", logger) + err := worker.Work(context.Background(), &river.Job[RiskStaleOpenReminderArgs]{ + Args: RiskStaleOpenReminderArgs{ + RiskID: *risk.ID, + OwnerUserID: ownerID, + }, + }) + require.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) +} diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index 93e3faa6..7f9d1dbc 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -224,6 +224,30 @@ func NewServiceWithDigest( digestCheckerWorker := NewWorkflowTaskDigestCheckerWorker(db, clientProxy, logger) river.AddWorker(workers, river.WorkFunc(digestCheckerWorker.Work)) + // Add risk scanner workers + riskCfg := config.DefaultRiskConfig() + if digestCfg != nil && digestCfg.Risk != nil { + riskCfg = digestCfg.Risk + } + + riskReminderScannerWorker := NewRiskReviewDeadlineReminderScannerWorker(db, clientProxy, logger) + river.AddWorker(workers, river.WorkFunc(riskReminderScannerWorker.Work)) + + riskOverdueScannerWorker := NewRiskReviewOverdueEscalationScannerWorker( + db, + clientProxy, + logger, + riskCfg.AutoReopenEnabled, + riskCfg.AutoReopenThresholdDays, + ) + river.AddWorker(workers, river.WorkFunc(riskOverdueScannerWorker.Work)) + + riskStaleScannerWorker := NewRiskStaleRiskScannerWorker(db, clientProxy, logger) + river.AddWorker(workers, river.WorkFunc(riskStaleScannerWorker.Work)) + + riskReconciliationScannerWorker := NewRiskEvidenceReconciliationScannerWorker(db, clientProxy, logger) + river.AddWorker(workers, river.WorkFunc(riskReconciliationScannerWorker.Work)) + // Configure periodic jobs periodicJobs := periodicJobsFromConfig(digestCfg, logger) @@ -443,6 +467,90 @@ func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger ) } +func NewRiskReviewDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 0 8 * * *", "risk review deadline reminder scanner", logger) + + return river.NewPeriodicJob( + sched, + func() (river.JobArgs, *river.InsertOpts) { + return &RiskReviewDeadlineReminderScannerArgs{}, &river.InsertOpts{ + Queue: "risk", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: 24 * time.Hour, + }, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + +func NewRiskReviewOverdueEscalationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 0 9 * * *", "risk review overdue escalation scanner", logger) + + return river.NewPeriodicJob( + sched, + func() (river.JobArgs, *river.InsertOpts) { + return &RiskReviewOverdueEscalationScannerArgs{}, &river.InsertOpts{ + Queue: "risk", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: 24 * time.Hour, + }, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + +func NewRiskStaleScannerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 0 10 * * 1", "risk stale scanner", logger) + + return river.NewPeriodicJob( + sched, + func() (river.JobArgs, *river.InsertOpts) { + return &RiskStaleRiskScannerArgs{}, &river.InsertOpts{ + Queue: "risk", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: 7 * 24 * time.Hour, + }, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + +func NewRiskEvidenceReconciliationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 30 10 * * *", "risk evidence reconciliation scanner", logger) + + return river.NewPeriodicJob( + sched, + func() (river.JobArgs, *river.InsertOpts) { + return &RiskEvidenceReconciliationScannerArgs{}, &river.InsertOpts{ + Queue: "risk", + MaxAttempts: 3, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: 24 * time.Hour, + }, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*river.PeriodicJob { var periodicJobs []*river.PeriodicJob if cfg == nil { @@ -460,6 +568,18 @@ func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*ri if cfg.Workflow != nil && cfg.Workflow.TaskDigestEnabled { periodicJobs = append(periodicJobs, NewWorkflowTaskDigestPeriodicJob(cfg.Workflow.TaskDigestSchedule, logger)) } + if cfg.Risk != nil && cfg.Risk.ReviewDeadlineReminderEnabled { + periodicJobs = append(periodicJobs, NewRiskReviewDeadlineReminderPeriodicJob(cfg.Risk.ReviewDeadlineReminderSchedule, logger)) + } + if cfg.Risk != nil && cfg.Risk.ReviewOverdueEscalationEnabled { + periodicJobs = append(periodicJobs, NewRiskReviewOverdueEscalationPeriodicJob(cfg.Risk.ReviewOverdueEscalationSchedule, logger)) + } + if cfg.Risk != nil && cfg.Risk.StaleRiskScannerEnabled { + periodicJobs = append(periodicJobs, NewRiskStaleScannerPeriodicJob(cfg.Risk.StaleRiskScannerSchedule, logger)) + } + if cfg.Risk != nil && cfg.Risk.EvidenceReconciliationEnabled { + periodicJobs = append(periodicJobs, NewRiskEvidenceReconciliationPeriodicJob(cfg.Risk.EvidenceReconciliationSchedule, logger)) + } return periodicJobs } diff --git a/internal/service/worker/service_test.go b/internal/service/worker/service_test.go index 1a6c9d47..b19f278b 100644 --- a/internal/service/worker/service_test.go +++ b/internal/service/worker/service_test.go @@ -369,6 +369,25 @@ func TestPeriodicJobsFromConfig_WorkflowSchedulerEnabledGuard(t *testing.T) { assert.Len(t, jobs, 3) } +func TestPeriodicJobsFromConfig_RiskJobsFromDedicatedConfig(t *testing.T) { + logger := zap.NewNop().Sugar() + + jobs := periodicJobsFromConfig(&config.Config{ + Risk: &config.RiskConfig{ + ReviewDeadlineReminderEnabled: true, + ReviewDeadlineReminderSchedule: "0 0 8 * * *", + ReviewOverdueEscalationEnabled: true, + ReviewOverdueEscalationSchedule: "0 0 9 * * *", + StaleRiskScannerEnabled: true, + StaleRiskScannerSchedule: "0 0 10 * * 1", + EvidenceReconciliationEnabled: true, + EvidenceReconciliationSchedule: "0 30 10 * * *", + }, + }, logger) + + assert.Len(t, jobs, 4) +} + func TestWorkflowSchedulerPeriodicJobConstructor_InsertOpts(t *testing.T) { args, opts := workflowSchedulerPeriodicJobConstructor() assert.NotNil(t, args) diff --git a/internal/service/worker/user_repository.go b/internal/service/worker/user_repository.go index 538bca88..f00855c5 100644 --- a/internal/service/worker/user_repository.go +++ b/internal/service/worker/user_repository.go @@ -30,7 +30,7 @@ func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (N var user relational.User if err := r.db.WithContext(ctx).First(&user, parsed).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - return NotificationUser{}, fmt.Errorf("user %s not found", userID) + return NotificationUser{}, fmt.Errorf("user %s not found: %w", userID, gorm.ErrRecordNotFound) } return NotificationUser{}, fmt.Errorf("failed to fetch user %s: %w", userID, err) } @@ -42,5 +42,6 @@ func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (N LastName: user.LastName, TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed, TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed, + RiskNotificationsSubscribed: user.RiskNotificationsSubscribed, }, nil } diff --git a/risk.yaml b/risk.yaml new file mode 100644 index 00000000..559d08d1 --- /dev/null +++ b/risk.yaml @@ -0,0 +1,14 @@ +review_deadline_reminder_enabled: false +review_deadline_reminder_schedule: "0 0 8 * * *" + +review_overdue_escalation_enabled: false +review_overdue_escalation_schedule: "0 0 9 * * *" + +stale_risk_scanner_enabled: false +stale_risk_scanner_schedule: "0 0 10 * * 1" + +evidence_reconciliation_enabled: false +evidence_reconciliation_schedule: "0 30 10 * * *" + +auto_reopen_enabled: false +auto_reopen_threshold_days: 30