From 8e65d7045c38cee6a06743f32d75d6845b8c1a03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CF=81=CE=B1=D1=82=D1=8F=CE=B9c=D0=BA=C6=92=CE=B1=CE=B7?= =?UTF-8?q?=D1=94=C6=96=C6=96=CE=B1?= Date: Thu, 16 Apr 2026 23:21:59 -0500 Subject: [PATCH 1/2] feat(observability): wire LLM + report worker metrics, auto-disable jobs Add Prometheus instruments and wiring for the full LLM resilience chain and report worker pipeline (PR 6 of llm-resilience-refactor-plan). Metrics added: - tradingagent_llm_retry_total (counter, by provider) - tradingagent_llm_budget_exhausted_total (counter) - tradingagent_report_worker_success_total (counter, by strategy_id) - tradingagent_report_worker_error_total (counter, by strategy_id) - tradingagent_report_staleness_seconds (histogram, by strategy_id) LLM package: - RetryMetrics interface + WithRetryMetrics() on RetryProvider - BudgetMetrics interface + WithBudgetMetrics() on BudgetGuardProvider - WithChainRetryMetrics/WithChainBudgetMetrics chain options - Emit retry metric on each backoff attempt - Emit budget exhaustion metric before rejecting call Runtime: - retryMetricsAdapter bridges provider label to RetryMetrics interface - chainOpts wires retry + budget metrics; report metrics wired on orch Automation: - ReportWorkerMetrics interface on JobOrchestrator - WithReportMetrics() setter (mirrors existing WithJobMetrics pattern) - Emit success/error per strategy in paperValidationReport loop - Auto-disable: jobs with >=5 consecutive failures set Enabled=false and log at ERROR level in both wrapAndRun and runDirect paths API: - ReportMetrics interface defined in report_handlers.go - ObserveReportStaleness emitted on GET /reports/latest - Deps.ReportMetrics wired through to Server Docs: - docs/reference/llm-resilience.md: chain diagram, env var table, emergency overrides, report worker ops, Prometheus metrics reference, troubleshooting playbooks (all providers down / budget exhausted / report stale / fallback storm), 7-day success criteria --- cmd/tradingagent/runtime.go | 17 +++ docs/reference/llm-resilience.md | 200 ++++++++++++++++++++++++++++ internal/api/report_handlers.go | 9 ++ internal/api/server.go | 7 + internal/automation/jobs_reports.go | 6 + internal/automation/orchestrator.go | 38 +++++- internal/llm/budget.go | 19 ++- internal/llm/provider_chain.go | 26 +++- internal/llm/retry.go | 16 +++ internal/metrics/metrics.go | 76 +++++++++++ 10 files changed, 406 insertions(+), 8 deletions(-) create mode 100644 docs/reference/llm-resilience.md diff --git a/cmd/tradingagent/runtime.go b/cmd/tradingagent/runtime.go index c9d3b839..bd4a651d 100644 --- a/cmd/tradingagent/runtime.go +++ b/cmd/tradingagent/runtime.go @@ -187,6 +187,7 @@ func newAPIServer(ctx context.Context, cfg config.Config, logger *slog.Logger) ( NewsFeedRepo: newsFeedRepo, DiscoveryRunRepo: pgrepo.NewDiscoveryRunRepo(db.Pool), ReportArtifacts: reportArtifactRepo, + ReportMetrics: appMetrics, } notificationManager := newNotificationManager(cfg) @@ -344,6 +345,7 @@ func newAPIServer(ctx context.Context, cfg config.Config, logger *slog.Logger) ( Logger: logger, }) orch.WithJobMetrics(appMetrics) + orch.WithReportMetrics(appMetrics) orch.RegisterAll() if err := orch.Start(); err != nil { logger.Warn("automation: failed to start job orchestrator", slog.Any("error", err)) @@ -540,6 +542,9 @@ func chainOpts(cfg config.LLMConfig, appMetrics *metrics.Metrics, logger *slog.L // Retry with exponential backoff. if cfg.RetryMaxAttempts > 1 { opts = append(opts, llm.WithRetry(cfg.RetryMaxAttempts)) + if appMetrics != nil { + opts = append(opts, llm.WithChainRetryMetrics(&retryMetricsAdapter{m: appMetrics, provider: strings.TrimSpace(cfg.DefaultProvider)})) + } } // Fallback provider. @@ -570,6 +575,9 @@ func chainOpts(cfg config.LLMConfig, appMetrics *metrics.Metrics, logger *slog.L // Budget guard. if budget != nil { opts = append(opts, llm.WithBudget(budget)) + if appMetrics != nil { + opts = append(opts, llm.WithChainBudgetMetrics(appMetrics)) + } } // Per-call timeout. @@ -580,6 +588,15 @@ func chainOpts(cfg config.LLMConfig, appMetrics *metrics.Metrics, logger *slog.L return opts } +// retryMetricsAdapter adapts *metrics.Metrics to the llm.RetryMetrics interface +// by binding a provider label at construction time. +type retryMetricsAdapter struct { + m *metrics.Metrics + provider string +} + +func (a *retryMetricsAdapter) RecordLLMRetry() { a.m.RecordLLMRetry(a.provider) } + func buildLLMBudget(cfg config.LLMConfig) *llm.Budget { // Validate() enforces non-negative values, but unit tests call runtime helpers // directly with hand-built LLMConfig values; clamp defensively for that path. diff --git a/docs/reference/llm-resilience.md b/docs/reference/llm-resilience.md new file mode 100644 index 00000000..0df88d72 --- /dev/null +++ b/docs/reference/llm-resilience.md @@ -0,0 +1,200 @@ +--- +title: 'LLM Resilience' +description: 'Provider chain architecture, env var reference, and operational troubleshooting.' +status: 'canonical' +created: '2026-04-16' +tags: [llm, resilience, observability, reference] +--- + +## Provider Chain Architecture + +Every LLM call passes through a layered provider chain. Each layer is optional — omitting its config disables it. + +```text +┌─────────────────────────────────────────────────────────────┐ +│ LLM Provider Chain │ +│ │ +│ Request flow (outermost → innermost): │ +│ │ +│ ┌──────────────┐ │ +│ │ Budget Guard │ ErrBudgetExhausted → not retried │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Timeout │ Per-call deadline (LLM_CALL_TIMEOUT) │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Throttle │ Concurrency limiter (LLM_THROTTLE_…) │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Retry │ Exponential backoff on 429/5xx │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Fallback │ Primary fails → try secondary │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Cache │ In-memory response cache │ +│ └──────┬───────┘ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Provider │ Raw OpenAI / OpenRouter / Ollama / etc. │ +│ └──────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Chain construction + +`NewProviderChain(primary, logger, opts...)` builds layers inside-out: + +1. **Cache** (innermost) — deduplicates identical prompts +2. **Fallback** — routes to secondary provider on failure +3. **Retry** — exponential backoff with jitter on transient errors +4. **Throttle** — semaphore-based concurrency limiter +5. **Timeout** — per-call `context.WithTimeout` +6. **Budget Guard** (outermost) — rejects when daily limits hit + +Only layers whose options are provided are added. With zero config the chain degrades to the raw provider. + +## Environment Variables + +| Variable | Type | Default | Description | +| -------------------------- | -------- | -------- | ------------------------------------------------------- | +| `LLM_DEFAULT_PROVIDER` | string | `openai` | Primary LLM provider name | +| `LLM_FALLBACK_PROVIDER` | string | `""` | Secondary LLM provider (empty = no fallback) | +| `LLM_FALLBACK_MODEL` | string | `""` | Model override for fallback provider | +| `LLM_RETRY_MAX_ATTEMPTS` | int | `2` | Max retry attempts on transient errors (429, 5xx) | +| `LLM_CALL_TIMEOUT` | duration | `5m` | Per-call timeout (high during stabilization) | +| `LLM_BUDGET_REQUESTS_DAY` | int | `0` | Max requests/day (0 = unlimited) | +| `LLM_BUDGET_TOKENS_DAY` | int | `0` | Max tokens/day (0 = unlimited) | +| `LLM_THROTTLE_CONCURRENCY` | int | `4` | Max concurrent LLM calls | +| `LLM_CACHE_ENABLED` | bool | `true` | Enable in-memory response cache | +| `LLM_TIMEOUT` | duration | `30s` | Legacy timeout (superseded by `LLM_CALL_TIMEOUT`) | +| `LLM_DEBATE_TIMEOUT` | duration | — | Debate-phase-specific timeout with quick-model fallback | +| `SCHEDULER_JOB_TIMEOUT` | duration | `45m` | Pipeline-level hard cap | + +### Emergency env-var overrides (no redeploy needed for Docker Compose) + +| Override | Effect | +| ---------------------------- | ----------------------------------------- | +| `LLM_FALLBACK_PROVIDER=` | Chain degrades to primary-only with retry | +| `LLM_RETRY_MAX_ATTEMPTS=1` | Disables retry | +| `LLM_BUDGET_REQUESTS_DAY=0` | Disables budget guard (unlimited) | +| `LLM_CALL_TIMEOUT=30m` | Even more generous timeout | +| `LLM_THROTTLE_CONCURRENCY=1` | Serialize all LLM calls | + +## Report Worker + +The `paper_validation_report` automation job generates paper-trading validation reports. + +- **Schedule:** `0 17 * * 1-5` ET (after US market close) +- **Per-strategy jitter:** 0–119s to spread LLM/DB load +- **Persistence:** `report_artifacts` table (upsert on `strategy_id, report_type, time_bucket`) +- **Error handling:** Each strategy is independent — a failure in one does not block others + +### Auto-Disable + +If a job accumulates **5 consecutive failures**, it is automatically disabled: + +- Job's `Enabled` flag is set to `false` +- A log entry at ERROR level is emitted +- The job will not fire again until manually re-enabled + +**Re-enable via API:** + +```http +PUT /api/v1/automation/jobs/{name}/enable +``` + +### API Endpoints + +| Endpoint | Method | Description | +| ---------------------------------------- | ------ | ----------------------------------- | +| `/api/v1/strategies/{id}/reports/latest` | GET | Latest completed report + staleness | +| `/api/v1/strategies/{id}/reports` | GET | Paginated report list | + +The `latest` endpoint includes a `stale_seconds` field = `now - completed_at`. + +## Prometheus Metrics + +### LLM Provider Chain + +| Metric | Type | Labels | Description | +| ----------------------------------------- | --------- | --------------------- | --------------------------------------- | +| `tradingagent_llm_calls_total` | Counter | provider, model, role | Total LLM API calls | +| `tradingagent_llm_retry_total` | Counter | provider | Retry attempts | +| `tradingagent_llm_fallback_total` | Counter | reason | Fallback events | +| `tradingagent_llm_budget_exhausted_total` | Counter | — | Budget exhaustion rejections | +| `tradingagent_llm_tokens_total` | Counter | type | Token consumption (prompt / completion) | +| `tradingagent_llm_latency_seconds` | Histogram | provider, model | Call latency distribution | +| `tradingagent_llm_cache_hits_total` | Counter | — | Response cache hits | +| `tradingagent_llm_cache_misses_total` | Counter | — | Response cache misses | + +### Report Worker + +| Metric | Type | Labels | Description | +| ------------------------------------------ | --------- | ----------- | ----------------------------- | +| `tradingagent_report_worker_success_total` | Counter | strategy_id | Successful report generations | +| `tradingagent_report_worker_error_total` | Counter | strategy_id | Failed report generations | +| `tradingagent_report_staleness_seconds` | Histogram | strategy_id | Report age at query time | +| `tradingagent_automation_job_errors_total` | Counter | job_name | All automation job errors | + +## Troubleshooting + +### All providers down + +**Symptoms:** `tradingagent_llm_fallback_total` spikes, pipeline runs fail. + +**Check:** + +1. `tradingagent_llm_calls_total` by provider — are calls reaching providers? +2. Provider status pages (OpenAI, OpenRouter) +3. API key validity + +**Mitigate:** If secondary is also down, set `LLM_FALLBACK_PROVIDER=` to avoid wasted fallback attempts. + +### Budget exhausted + +**Symptoms:** `tradingagent_llm_budget_exhausted_total` incrementing, jobs returning `ErrBudgetExhausted`. + +**Check:** + +1. `GET /api/v1/llm/budget` (if exposed) or check logs for budget stats +2. Compare `LLM_BUDGET_REQUESTS_DAY` against actual daily volume + +**Mitigate:** Set `LLM_BUDGET_REQUESTS_DAY=0` to remove limit temporarily. Investigate cost after. + +### Report stale + +**Symptoms:** `tradingagent_report_staleness_seconds` > 86400 (24h), dashboard shows stale warning. + +**Check:** + +1. `GET /api/v1/automation/status` — is `paper_validation_report` enabled? Check `consecutive_failures`. +2. If auto-disabled: re-enable via `PUT /api/v1/automation/jobs/paper_validation_report/enable` +3. Check logs for the underlying failure reason + +**Mitigate:** Trigger manual run via `POST /api/v1/automation/jobs/paper_validation_report/run`. + +### Fallback storm + +**Symptoms:** `tradingagent_llm_fallback_total` / `tradingagent_llm_calls_total` > 20% over sustained period. + +**Check:** + +1. Primary provider health +2. `tradingagent_llm_retry_total` — are retries exhausting before fallback? + +**Mitigate:** Budget guard caps daily requests. Auto-disable fires after 5 consecutive job failures. Reduce `LLM_THROTTLE_CONCURRENCY=1` to slow the bleed. + +## Success Criteria (7-Day Validation) + +| Criterion | Target | +| -------------------------- | -------- | +| Report worker success rate | > 98% | +| Fallback usage | < 20% | +| Pipeline timeout failures | ↓ 80% | +| P95 report latency | < 20 min | diff --git a/internal/api/report_handlers.go b/internal/api/report_handlers.go index 188966f0..d70c0b14 100644 --- a/internal/api/report_handlers.go +++ b/internal/api/report_handlers.go @@ -8,6 +8,11 @@ import ( pgrepo "github.com/PatrickFanella/get-rich-quick/internal/repository/postgres" ) +// ReportMetrics captures report staleness observations. +type ReportMetrics interface { + ObserveReportStaleness(strategyID string, seconds float64) +} + // reportLatestResponse wraps the latest report artifact with a stale_seconds // field showing how old the report is. type reportLatestResponse struct { @@ -50,6 +55,10 @@ func (s *Server) handleGetLatestReport(w http.ResponseWriter, r *http.Request) { stale = math.Max(0, math.Round(time.Since(*artifact.CompletedAt).Seconds())) } + if s.reportMetrics != nil { + s.reportMetrics.ObserveReportStaleness(id.String(), stale) + } + respondJSON(w, http.StatusOK, reportLatestResponse{ ReportArtifact: *artifact, StaleSeconds: stale, diff --git a/internal/api/server.go b/internal/api/server.go index 6e67ead3..4b38ed3f 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -92,6 +92,9 @@ type Server struct { // Report artifacts (optional; nil = feature not enabled). reportArtifacts *pgrepo.ReportArtifactRepo + // Report metrics (optional; nil = no metrics). + reportMetrics ReportMetrics + // Services — constructed from deps in NewServer. backtestSvc *service.BacktestService conversationSvc *service.ConversationService @@ -195,6 +198,9 @@ type Deps struct { // Report artifacts (optional; nil = feature not enabled). ReportArtifacts *pgrepo.ReportArtifactRepo + + // Report metrics (optional; nil = no metrics). + ReportMetrics ReportMetrics } // NewServer creates a new API server with all routes and middleware registered. @@ -297,6 +303,7 @@ func NewServer(cfg ServerConfig, deps Deps, logger *slog.Logger) (*Server, error signalStore: deps.SignalStore, watchIndex: deps.WatchIndex, reportArtifacts: deps.ReportArtifacts, + reportMetrics: deps.ReportMetrics, } // Construct services from the assembled deps. diff --git a/internal/automation/jobs_reports.go b/internal/automation/jobs_reports.go index 62835c11..b33b8555 100644 --- a/internal/automation/jobs_reports.go +++ b/internal/automation/jobs_reports.go @@ -89,12 +89,18 @@ func (o *JobOrchestrator) paperValidationReport(ctx context.Context) error { if err := o.generateOneReport(ctx, ps.ID, ps.Name, timeBucket, now); err != nil { failed++ + if o.reportMetrics != nil { + o.reportMetrics.RecordReportWorkerError(ps.ID.String()) + } o.logger.Warn("paper_validation_report: strategy failed", slog.String("strategy", ps.Name), slog.Any("error", err), ) } else { succeeded++ + if o.reportMetrics != nil { + o.reportMetrics.RecordReportWorkerSuccess(ps.ID.String()) + } } } diff --git a/internal/automation/orchestrator.go b/internal/automation/orchestrator.go index 762751ab..ce4ac3f2 100644 --- a/internal/automation/orchestrator.go +++ b/internal/automation/orchestrator.go @@ -33,6 +33,10 @@ func mustLoadEastern() *time.Location { return loc } +// autoDisableThreshold is the number of consecutive failures after which a +// job is automatically disabled to prevent cascading damage. +const autoDisableThreshold = 5 + // StrategyTrigger triggers an immediate pipeline run for a strategy. // The scheduler satisfies this interface. type StrategyTrigger interface { @@ -55,9 +59,9 @@ type OrchestratorDeps struct { StrategyTrigger StrategyTrigger // optional; nil = no event-driven triggers PolymarketAccountRepo repository.PolymarketAccountRepository // optional; nil = skip profiling job PolymarketCLOBURL string // optional; defaults to Polymarket CLOB base URL - ReportArtifactRepo *pgrepo.ReportArtifactRepo // optional; nil = skip report jobs - BacktestConfigRepo repository.BacktestConfigRepository // optional; needed by report jobs - BacktestRunRepo repository.BacktestRunRepository // optional; needed by report jobs + ReportArtifactRepo *pgrepo.ReportArtifactRepo // optional; nil = skip report jobs + BacktestConfigRepo repository.BacktestConfigRepository // optional; needed by report jobs + BacktestRunRepo repository.BacktestRunRepository // optional; needed by report jobs Logger *slog.Logger } @@ -104,6 +108,13 @@ type AutomationJobMetrics interface { RecordAutomationJobError(jobName string) } +// ReportWorkerMetrics captures report worker success/error/staleness. +type ReportWorkerMetrics interface { + RecordReportWorkerSuccess(strategyID string) + RecordReportWorkerError(strategyID string) + ObserveReportStaleness(strategyID string, seconds float64) +} + // JobOrchestrator is the central registry and cron runner for all automated jobs. type JobOrchestrator struct { jobs map[string]*RegisteredJob @@ -112,6 +123,7 @@ type JobOrchestrator struct { logger *slog.Logger rssAggregator *rss.Aggregator metrics AutomationJobMetrics + reportMetrics ReportWorkerMetrics } // NewJobOrchestrator constructs a new orchestrator. @@ -134,6 +146,12 @@ func (o *JobOrchestrator) WithJobMetrics(m AutomationJobMetrics) { o.metrics = m } +// WithReportMetrics attaches report-worker-specific metrics. +// Call before Start(). Safe to call with nil. +func (o *JobOrchestrator) WithReportMetrics(m ReportWorkerMetrics) { + o.reportMetrics = m +} + // SetConsecutiveFailures sets the ConsecutiveFailures counter on a job. // Primarily for testing and operational resets. func (o *JobOrchestrator) SetConsecutiveFailures(name string, n int) { @@ -304,6 +322,13 @@ func (o *JobOrchestrator) runDirect(job *RegisteredJob) { if o.metrics != nil { o.metrics.RecordAutomationJobError(job.Name) } + if job.ConsecutiveFailures >= autoDisableThreshold { + job.Enabled = false + o.logger.Error("automation: auto-disabled job after consecutive failures", + slog.String("job", job.Name), + slog.Int("consecutive_failures", job.ConsecutiveFailures), + ) + } } else { job.LastResult = "success" job.LastError = "" @@ -400,6 +425,13 @@ func (o *JobOrchestrator) wrapAndRun(job *RegisteredJob) { if o.metrics != nil { o.metrics.RecordAutomationJobError(job.Name) } + if job.ConsecutiveFailures >= autoDisableThreshold { + job.Enabled = false + o.logger.Error("automation: auto-disabled job after consecutive failures", + slog.String("job", job.Name), + slog.Int("consecutive_failures", job.ConsecutiveFailures), + ) + } } else { job.LastError = "" job.ConsecutiveFailures = 0 diff --git a/internal/llm/budget.go b/internal/llm/budget.go index 23499c5b..cbfbd61a 100644 --- a/internal/llm/budget.go +++ b/internal/llm/budget.go @@ -119,11 +119,17 @@ func nextUTCMidnight() time.Time { return time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC) } +// BudgetMetrics captures budget exhaustion events for observability. +type BudgetMetrics interface { + RecordLLMBudgetExhausted() +} + // BudgetGuardProvider wraps a Provider, rejecting calls when budget is exhausted. // On success it records token usage back to the budget. type BudgetGuardProvider struct { - inner Provider - budget *Budget + inner Provider + budget *Budget + metrics BudgetMetrics } // NewBudgetGuardProvider wraps inner with budget enforcement. @@ -131,9 +137,18 @@ func NewBudgetGuardProvider(inner Provider, budget *Budget) *BudgetGuardProvider return &BudgetGuardProvider{inner: inner, budget: budget} } +// WithBudgetMetrics attaches optional budget metrics. +func (b *BudgetGuardProvider) WithBudgetMetrics(m BudgetMetrics) *BudgetGuardProvider { + b.metrics = m + return b +} + // Complete checks budget before delegating. Records usage after success. func (b *BudgetGuardProvider) Complete(ctx context.Context, req CompletionRequest) (*CompletionResponse, error) { if !b.budget.reserveRequest() { + if b.metrics != nil { + b.metrics.RecordLLMBudgetExhausted() + } return nil, ErrBudgetExhausted } diff --git a/internal/llm/provider_chain.go b/internal/llm/provider_chain.go index a2084316..0e33d219 100644 --- a/internal/llm/provider_chain.go +++ b/internal/llm/provider_chain.go @@ -23,6 +23,8 @@ type chainConfig struct { type chainMetrics struct { fallback FallbackMetrics cache CacheMetrics + retry RetryMetrics + budget BudgetMetrics } // WithFallback adds a secondary provider tried when primary fails. @@ -85,6 +87,16 @@ func WithChainCacheMetrics(m CacheMetrics) ChainOption { return func(c *chainConfig) { c.metrics.cache = m } } +// WithChainRetryMetrics attaches metrics to the retry layer. +func WithChainRetryMetrics(m RetryMetrics) ChainOption { + return func(c *chainConfig) { c.metrics.retry = m } +} + +// WithChainBudgetMetrics attaches metrics to the budget guard layer. +func WithChainBudgetMetrics(m BudgetMetrics) ChainOption { + return func(c *chainConfig) { c.metrics.budget = m } +} + // NewProviderChain composes a resilient provider from existing primitives. // // Chain order (outermost → innermost): @@ -107,7 +119,7 @@ func NewProviderChain(primary Provider, logger *slog.Logger, opts ...ChainOption } // Build inside-out: start with primary, wrap outward. - var p Provider = primary + p := primary // Layer 1 (innermost): cache if cfg.cache != nil { @@ -141,7 +153,11 @@ func NewProviderChain(primary Provider, logger *slog.Logger, opts ...ChainOption if cfg.baseDelay > 0 { retryOpts = append(retryOpts, WithBaseDelay(cfg.baseDelay)) } - p = NewRetryProvider(p, logger, retryOpts...) + rp := NewRetryProvider(p, logger, retryOpts...) + if cfg.metrics.retry != nil { + rp = rp.WithRetryMetrics(cfg.metrics.retry) + } + p = rp } // Layer 4: throttle @@ -156,7 +172,11 @@ func NewProviderChain(primary Provider, logger *slog.Logger, opts ...ChainOption // Layer 6 (outermost): budget guard if cfg.budget != nil { - p = NewBudgetGuardProvider(p, cfg.budget) + bg := NewBudgetGuardProvider(p, cfg.budget) + if cfg.metrics.budget != nil { + bg = bg.WithBudgetMetrics(cfg.metrics.budget) + } + p = bg } return p diff --git a/internal/llm/retry.go b/internal/llm/retry.go index 65efd512..28b68400 100644 --- a/internal/llm/retry.go +++ b/internal/llm/retry.go @@ -20,6 +20,11 @@ type statusCoder interface { StatusCode() int } +// RetryMetrics captures retry events for observability. +type RetryMetrics interface { + RecordLLMRetry() +} + // RetryProvider wraps a Provider with exponential-backoff retry logic. // It retries on transient errors (rate limits, server errors, timeouts) and // does not retry on client errors (bad request, auth failures). @@ -30,6 +35,7 @@ type RetryProvider struct { jitterPct float64 logger *slog.Logger timerFn func(time.Duration) (<-chan time.Time, func() bool) // overridable for testing + metrics RetryMetrics } // RetryOption configures a RetryProvider. @@ -83,6 +89,12 @@ func NewRetryProvider(provider Provider, logger *slog.Logger, opts ...RetryOptio return r } +// WithRetryMetrics attaches optional retry metrics. +func (r *RetryProvider) WithRetryMetrics(m RetryMetrics) *RetryProvider { + r.metrics = m + return r +} + // SetTimerFn overrides the function used to create backoff timers between retries. // This is intended for testing only. The function should return a channel that // fires after the duration and a stop function to release resources. @@ -111,6 +123,10 @@ func (r *RetryProvider) Complete(ctx context.Context, request CompletionRequest) if attempt > 0 { delay := r.backoffDelay(attempt - 1) + if r.metrics != nil { + r.metrics.RecordLLMRetry() + } + r.logger.Warn("llm: retrying after transient error", slog.Int("attempt", attempt+1), slog.Int("max_attempts", r.maxAttempts), diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8a3b9f07..56b5d002 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -27,6 +27,11 @@ type Metrics struct { PositionsOpen prometheus.Gauge CircuitBreakerState prometheus.Gauge KillSwitchActive prometheus.Gauge + LLMRetryTotal *prometheus.CounterVec + LLMBudgetExhaustedTotal prometheus.Counter + ReportWorkerSuccessTotal *prometheus.CounterVec + ReportWorkerErrorTotal *prometheus.CounterVec + ReportStaleness *prometheus.HistogramVec } // New creates a new isolated Prometheus registry, registers all trading-agent @@ -126,6 +131,32 @@ func New() *Metrics { Name: "tradingagent_kill_switch_active", Help: "Kill switch state: 1 = active, 0 = inactive.", }), + + LLMRetryTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "tradingagent_llm_retry_total", + Help: "Total LLM retry attempts by provider.", + }, []string{"provider"}), + + LLMBudgetExhaustedTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tradingagent_llm_budget_exhausted_total", + Help: "Total times an LLM call was rejected due to budget exhaustion.", + }), + + ReportWorkerSuccessTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "tradingagent_report_worker_success_total", + Help: "Total successful report generations by strategy ID.", + }, []string{"strategy_id"}), + + ReportWorkerErrorTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "tradingagent_report_worker_error_total", + Help: "Total failed report generations by strategy ID.", + }, []string{"strategy_id"}), + + ReportStaleness: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "tradingagent_report_staleness_seconds", + Help: "Report staleness in seconds at query time.", + Buckets: []float64{60, 300, 900, 1800, 3600, 7200, 14400, 43200, 86400}, + }, []string{"strategy_id"}), } reg.MustRegister( @@ -146,6 +177,11 @@ func New() *Metrics { m.PositionsOpen, m.CircuitBreakerState, m.KillSwitchActive, + m.LLMRetryTotal, + m.LLMBudgetExhaustedTotal, + m.ReportWorkerSuccessTotal, + m.ReportWorkerErrorTotal, + m.ReportStaleness, ) return m @@ -234,6 +270,46 @@ func (m *Metrics) SetKillSwitchActive(active bool) { } } +// RecordLLMRetry increments the retry counter for a given provider. +func (m *Metrics) RecordLLMRetry(provider string) { + if m == nil { + return + } + m.LLMRetryTotal.WithLabelValues(provider).Inc() +} + +// RecordLLMBudgetExhausted increments the budget exhaustion counter. +func (m *Metrics) RecordLLMBudgetExhausted() { + if m == nil { + return + } + m.LLMBudgetExhaustedTotal.Inc() +} + +// RecordReportWorkerSuccess increments the report success counter for a strategy. +func (m *Metrics) RecordReportWorkerSuccess(strategyID string) { + if m == nil { + return + } + m.ReportWorkerSuccessTotal.WithLabelValues(strategyID).Inc() +} + +// RecordReportWorkerError increments the report error counter for a strategy. +func (m *Metrics) RecordReportWorkerError(strategyID string) { + if m == nil { + return + } + m.ReportWorkerErrorTotal.WithLabelValues(strategyID).Inc() +} + +// ObserveReportStaleness records how stale a report is at query time. +func (m *Metrics) ObserveReportStaleness(strategyID string, seconds float64) { + if m == nil { + return + } + m.ReportStaleness.WithLabelValues(strategyID).Observe(seconds) +} + // Handler returns an http.Handler that serves Prometheus metrics from the // instance's private registry. func (m *Metrics) Handler() http.Handler { From 7fca608d34c26dff518430f69c424f95dea90d31 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 17 Apr 2026 04:52:10 +0000 Subject: [PATCH 2/2] test: address PR review feedback for observability wiring Agent-Logs-Url: https://github.com/PatrickFanella/augr/sessions/3c3eb647-082d-4ab2-8e2d-72f44dbd924d Co-authored-by: PatrickFanella <61631520+PatrickFanella@users.noreply.github.com> --- cmd/tradingagent/runtime.go | 13 +++- cmd/tradingagent/runtime_test.go | 22 ++++++ internal/api/report_handlers.go | 9 +++ internal/api/report_handlers_test.go | 63 ++++++++++++++++ internal/api/server.go | 4 +- internal/automation/orchestrator.go | 3 +- internal/automation/orchestrator_test.go | 53 ++++++++++++++ internal/llm/budget_test.go | 31 ++++++++ internal/llm/provider_chain_test.go | 60 +++++++++++++++ internal/llm/retry_test.go | 36 +++++++++ internal/metrics/metrics_test.go | 93 ++++++++++++++++++++++++ 11 files changed, 382 insertions(+), 5 deletions(-) diff --git a/cmd/tradingagent/runtime.go b/cmd/tradingagent/runtime.go index bd4a651d..57ff5aed 100644 --- a/cmd/tradingagent/runtime.go +++ b/cmd/tradingagent/runtime.go @@ -543,7 +543,10 @@ func chainOpts(cfg config.LLMConfig, appMetrics *metrics.Metrics, logger *slog.L if cfg.RetryMaxAttempts > 1 { opts = append(opts, llm.WithRetry(cfg.RetryMaxAttempts)) if appMetrics != nil { - opts = append(opts, llm.WithChainRetryMetrics(&retryMetricsAdapter{m: appMetrics, provider: strings.TrimSpace(cfg.DefaultProvider)})) + opts = append(opts, llm.WithChainRetryMetrics(&retryMetricsAdapter{ + m: appMetrics, + provider: configuredPrimaryRetryProviderLabel(cfg.DefaultProvider), + })) } } @@ -597,6 +600,14 @@ type retryMetricsAdapter struct { func (a *retryMetricsAdapter) RecordLLMRetry() { a.m.RecordLLMRetry(a.provider) } +func configuredPrimaryRetryProviderLabel(provider string) string { + name := strings.TrimSpace(provider) + if name == "" { + name = "unknown" + } + return fmt.Sprintf("configured_primary:%s", name) +} + func buildLLMBudget(cfg config.LLMConfig) *llm.Budget { // Validate() enforces non-negative values, but unit tests call runtime helpers // directly with hand-built LLMConfig values; clamp defensively for that path. diff --git a/cmd/tradingagent/runtime_test.go b/cmd/tradingagent/runtime_test.go index 39f311ae..a69c4966 100644 --- a/cmd/tradingagent/runtime_test.go +++ b/cmd/tradingagent/runtime_test.go @@ -951,6 +951,28 @@ func TestBuildProviderChain_InvalidFallbackSkipped(t *testing.T) { } } +func TestConfiguredPrimaryRetryProviderLabel(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + expected string + }{ + {name: "trimmed provider", input: " openai ", expected: "configured_primary:openai"}, + {name: "empty provider", input: " ", expected: "configured_primary:unknown"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if got := configuredPrimaryRetryProviderLabel(tt.input); got != tt.expected { + t.Fatalf("configuredPrimaryRetryProviderLabel(%q) = %q, want %q", tt.input, got, tt.expected) + } + }) + } +} + func slogDiscardLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } diff --git a/internal/api/report_handlers.go b/internal/api/report_handlers.go index d70c0b14..09f9cccc 100644 --- a/internal/api/report_handlers.go +++ b/internal/api/report_handlers.go @@ -1,10 +1,13 @@ package api import ( + "context" "math" "net/http" "time" + "github.com/google/uuid" + pgrepo "github.com/PatrickFanella/get-rich-quick/internal/repository/postgres" ) @@ -13,6 +16,12 @@ type ReportMetrics interface { ObserveReportStaleness(strategyID string, seconds float64) } +// ReportArtifactStore captures report artifact reads used by report handlers. +type ReportArtifactStore interface { + GetLatest(ctx context.Context, strategyID uuid.UUID, reportType string) (*pgrepo.ReportArtifact, error) + List(ctx context.Context, filter pgrepo.ReportArtifactFilter, limit, offset int) ([]pgrepo.ReportArtifact, error) +} + // reportLatestResponse wraps the latest report artifact with a stale_seconds // field showing how old the report is. type reportLatestResponse struct { diff --git a/internal/api/report_handlers_test.go b/internal/api/report_handlers_test.go index 1af57b50..65553491 100644 --- a/internal/api/report_handlers_test.go +++ b/internal/api/report_handlers_test.go @@ -1,6 +1,7 @@ package api import ( + "context" "encoding/json" "net/http" "testing" @@ -14,6 +15,31 @@ import ( // These tests exercise the "not configured" handler path by using the // default test server setup, where Server.reportArtifacts is left nil. +type stubReportArtifactStore struct { + latest *pgrepo.ReportArtifact + err error +} + +func (s *stubReportArtifactStore) GetLatest(context.Context, uuid.UUID, string) (*pgrepo.ReportArtifact, error) { + return s.latest, s.err +} + +func (s *stubReportArtifactStore) List(context.Context, pgrepo.ReportArtifactFilter, int, int) ([]pgrepo.ReportArtifact, error) { + return nil, nil +} + +type stubReportMetrics struct { + calls int + strategyID string + seconds float64 +} + +func (s *stubReportMetrics) ObserveReportStaleness(strategyID string, seconds float64) { + s.calls++ + s.strategyID = strategyID + s.seconds = seconds +} + func TestHandleGetLatestReport_NotConfigured(t *testing.T) { t.Parallel() @@ -80,3 +106,40 @@ func TestReportLatestResponse_StaleSeconds(t *testing.T) { t.Fatalf("stale_seconds = %f, want 300", stale) } } + +func TestHandleGetLatestReport_RecordsStalenessMetricWithResponseValue(t *testing.T) { + t.Parallel() + + completed := time.Now().Add(-5 * time.Minute) + metricsSink := &stubReportMetrics{} + deps := testDeps() + deps.ReportArtifacts = &stubReportArtifactStore{ + latest: &pgrepo.ReportArtifact{ + ID: uuid.New(), + StrategyID: stratA.ID, + ReportType: "paper_validation", + TimeBucket: time.Now().Truncate(24 * time.Hour), + Status: "completed", + ReportJSON: json.RawMessage(`{"decision":"GO"}`), + CompletedAt: &completed, + }, + } + deps.ReportMetrics = metricsSink + srv := newTestServerWithDeps(t, deps) + + rr := doRequest(t, srv, http.MethodGet, "/api/v1/strategies/"+stratA.ID.String()+"/reports/latest", nil) + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rr.Code, http.StatusOK) + } + resp := decodeJSON[reportLatestResponse](t, rr) + + if metricsSink.calls != 1 { + t.Fatalf("metrics calls = %d, want 1", metricsSink.calls) + } + if metricsSink.strategyID != stratA.ID.String() { + t.Fatalf("metrics strategyID = %q, want %q", metricsSink.strategyID, stratA.ID.String()) + } + if metricsSink.seconds != resp.StaleSeconds { + t.Fatalf("metrics stale seconds = %f, want response stale_seconds %f", metricsSink.seconds, resp.StaleSeconds) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index 4b38ed3f..4773b0ca 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -90,7 +90,7 @@ type Server struct { watchIndex *signal.WatchIndex // Report artifacts (optional; nil = feature not enabled). - reportArtifacts *pgrepo.ReportArtifactRepo + reportArtifacts ReportArtifactStore // Report metrics (optional; nil = no metrics). reportMetrics ReportMetrics @@ -197,7 +197,7 @@ type Deps struct { WatchIndex *signal.WatchIndex // Report artifacts (optional; nil = feature not enabled). - ReportArtifacts *pgrepo.ReportArtifactRepo + ReportArtifacts ReportArtifactStore // Report metrics (optional; nil = no metrics). ReportMetrics ReportMetrics diff --git a/internal/automation/orchestrator.go b/internal/automation/orchestrator.go index ce4ac3f2..a4b94e46 100644 --- a/internal/automation/orchestrator.go +++ b/internal/automation/orchestrator.go @@ -108,11 +108,10 @@ type AutomationJobMetrics interface { RecordAutomationJobError(jobName string) } -// ReportWorkerMetrics captures report worker success/error/staleness. +// ReportWorkerMetrics captures report worker success/error emission. type ReportWorkerMetrics interface { RecordReportWorkerSuccess(strategyID string) RecordReportWorkerError(strategyID string) - ObserveReportStaleness(strategyID string, seconds float64) } // JobOrchestrator is the central registry and cron runner for all automated jobs. diff --git a/internal/automation/orchestrator_test.go b/internal/automation/orchestrator_test.go index a2634fd0..7664522b 100644 --- a/internal/automation/orchestrator_test.go +++ b/internal/automation/orchestrator_test.go @@ -91,6 +91,59 @@ func TestJobOrchestratorStatus_IncludesStuckForWhenRunning(t *testing.T) { waitForJobRuns(t, orch, "job", 1) } +func TestJobOrchestratorRunJob_AutoDisablesAfterThreshold(t *testing.T) { + t.Parallel() + + orch := NewJobOrchestrator(OrchestratorDeps{}) + orch.Register("job", "always fails", schedulerSpecEveryMinute(), func(context.Context) error { + return errors.New("boom") + }) + orch.SetConsecutiveFailures("job", autoDisableThreshold-1) + + if err := orch.RunJob(context.Background(), "job"); err != nil { + t.Fatalf("RunJob() error = %v", err) + } + waitForJobRuns(t, orch, "job", 1) + + status := singleJobStatus(t, orch, "job") + if status.ConsecutiveFailures != autoDisableThreshold { + t.Fatalf("ConsecutiveFailures = %d, want %d", status.ConsecutiveFailures, autoDisableThreshold) + } + if status.Enabled { + t.Fatal("Enabled = true, want false after reaching auto-disable threshold") + } +} + +func TestJobOrchestratorWrapAndRun_AutoDisabledJobsAreSkipped(t *testing.T) { + t.Parallel() + + orch := NewJobOrchestrator(OrchestratorDeps{}) + orch.Register("job", "always fails", schedulerSpecEveryMinute(), func(context.Context) error { + return errors.New("boom") + }) + orch.SetConsecutiveFailures("job", autoDisableThreshold-1) + + job := orch.jobs["job"] + orch.wrapAndRun(job) + + status := singleJobStatus(t, orch, "job") + if status.ConsecutiveFailures != autoDisableThreshold { + t.Fatalf("ConsecutiveFailures = %d, want %d", status.ConsecutiveFailures, autoDisableThreshold) + } + if status.Enabled { + t.Fatal("Enabled = true, want false after reaching auto-disable threshold") + } + if status.RunCount != 1 { + t.Fatalf("RunCount after first run = %d, want 1", status.RunCount) + } + + orch.wrapAndRun(job) + status = singleJobStatus(t, orch, "job") + if status.RunCount != 1 { + t.Fatalf("RunCount after disabled scheduled invocation = %d, want 1", status.RunCount) + } +} + func waitForJobRuns(t *testing.T, orch *JobOrchestrator, jobName string, want int) { t.Helper() deadline := time.Now().Add(2 * time.Second) diff --git a/internal/llm/budget_test.go b/internal/llm/budget_test.go index d749aa60..563fd447 100644 --- a/internal/llm/budget_test.go +++ b/internal/llm/budget_test.go @@ -230,3 +230,34 @@ func TestErrBudgetExhausted_IsNotRetryable(t *testing.T) { t.Errorf("calls = %d, want 1 (should not retry budget errors)", calls) } } + +type stubBudgetMetrics struct{ calls atomic.Int32 } + +func (s *stubBudgetMetrics) RecordLLMBudgetExhausted() { s.calls.Add(1) } + +func TestBudgetGuardProvider_RecordsBudgetExhaustedMetric(t *testing.T) { + t.Parallel() + + inner := &trackingProvider{ + response: &llm.CompletionResponse{Content: "ok"}, + } + budget := llm.NewBudget(1, 0) + metrics := &stubBudgetMetrics{} + guard := llm.NewBudgetGuardProvider(inner, budget).WithBudgetMetrics(metrics) + + _, err := guard.Complete(context.Background(), llm.CompletionRequest{}) + if err != nil { + t.Fatalf("call 1: %v", err) + } + if metrics.calls.Load() != 0 { + t.Fatalf("metrics calls after successful request = %d, want 0", metrics.calls.Load()) + } + + _, err = guard.Complete(context.Background(), llm.CompletionRequest{}) + if !errors.Is(err, llm.ErrBudgetExhausted) { + t.Fatalf("call 2 error = %v, want ErrBudgetExhausted", err) + } + if metrics.calls.Load() != 1 { + t.Fatalf("metrics calls after budget rejection = %d, want 1", metrics.calls.Load()) + } +} diff --git a/internal/llm/provider_chain_test.go b/internal/llm/provider_chain_test.go index f427f8d3..a41de7a4 100644 --- a/internal/llm/provider_chain_test.go +++ b/internal/llm/provider_chain_test.go @@ -37,6 +37,10 @@ type stubCacheMetrics struct { func (s *stubCacheMetrics) RecordLLMCacheHit() { s.hits.Add(1) } func (s *stubCacheMetrics) RecordLLMCacheMiss() { s.misses.Add(1) } +type stubChainBudgetMetrics struct{ calls atomic.Int32 } + +func (s *stubChainBudgetMetrics) RecordLLMBudgetExhausted() { s.calls.Add(1) } + // --- NewProviderChain tests --- func TestProviderChain_PrimaryOnly(t *testing.T) { @@ -220,6 +224,33 @@ func TestProviderChain_WithRetry(t *testing.T) { } } +func TestProviderChain_WithRetryMetrics(t *testing.T) { + t.Parallel() + + mock := newMockProvider( + []*llm.CompletionResponse{nil, &llm.CompletionResponse{Content: "ok"}}, + []error{&httpError{code: 500, msg: "transient"}, nil}, + ) + retryStub := &stubRetryMetrics{} + + chain := llm.NewProviderChain(mock, discardLogger(), + llm.WithRetry(2), + llm.WithRetryBaseDelay(1*time.Millisecond), + llm.WithChainRetryMetrics(retryStub), + ) + + got, err := chain.Complete(context.Background(), llm.CompletionRequest{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Content != "ok" { + t.Fatalf("content = %q, want %q", got.Content, "ok") + } + if retryStub.calls.Load() != 1 { + t.Fatalf("retry metrics calls = %d, want 1", retryStub.calls.Load()) + } +} + func TestProviderChain_WithBudgetExhausted(t *testing.T) { t.Parallel() @@ -271,6 +302,35 @@ func TestProviderChain_BudgetExhaustedNotRetried(t *testing.T) { } } +func TestProviderChain_WithBudgetMetrics(t *testing.T) { + t.Parallel() + + p := &trackingProvider{response: &llm.CompletionResponse{Content: "ok"}} + budget := llm.NewBudget(1, 0) + budgetMetrics := &stubChainBudgetMetrics{} + + chain := llm.NewProviderChain(p, discardLogger(), + llm.WithBudget(budget), + llm.WithChainBudgetMetrics(budgetMetrics), + ) + + _, err := chain.Complete(context.Background(), llm.CompletionRequest{}) + if err != nil { + t.Fatalf("call 1: %v", err) + } + if budgetMetrics.calls.Load() != 0 { + t.Fatalf("budget metrics calls after success = %d, want 0", budgetMetrics.calls.Load()) + } + + _, err = chain.Complete(context.Background(), llm.CompletionRequest{}) + if !errors.Is(err, llm.ErrBudgetExhausted) { + t.Fatalf("call 2 error = %v, want ErrBudgetExhausted", err) + } + if budgetMetrics.calls.Load() != 1 { + t.Fatalf("budget metrics calls after rejection = %d, want 1", budgetMetrics.calls.Load()) + } +} + func TestProviderChain_WithCallTimeout(t *testing.T) { t.Parallel() diff --git a/internal/llm/retry_test.go b/internal/llm/retry_test.go index 59ef4bc1..8717b893 100644 --- a/internal/llm/retry_test.go +++ b/internal/llm/retry_test.go @@ -63,6 +63,10 @@ type discard struct{} func (discard) Write(p []byte) (int, error) { return len(p), nil } +type stubRetryMetrics struct{ calls atomic.Int32 } + +func (s *stubRetryMetrics) RecordLLMRetry() { s.calls.Add(1) } + // --- RetryProvider Tests --- func TestRetryProviderSucceedsOnFirstAttempt(t *testing.T) { @@ -113,6 +117,38 @@ func TestRetryProviderRetriesOnTransientError(t *testing.T) { } } +func TestRetryProvider_RecordsRetryMetricsPerBackoffAttempt(t *testing.T) { + t.Parallel() + + metrics := &stubRetryMetrics{} + want := &llm.CompletionResponse{Content: "ok"} + mock := newMockProvider( + []*llm.CompletionResponse{nil, nil, want}, + []error{ + &httpError{code: 500, msg: "server error 1"}, + &httpError{code: 500, msg: "server error 2"}, + nil, + }, + ) + + rp := llm.NewRetryProvider(mock, discardLogger(), llm.WithMaxAttempts(3)).WithRetryMetrics(metrics) + rp.SetTimerFn(immediateTimerFn()) + + got, err := rp.Complete(context.Background(), llm.CompletionRequest{}) + if err != nil { + t.Fatalf("Complete() error = %v, want nil", err) + } + if got == nil { + t.Fatal("Complete() response = nil, want non-nil") + } + if mock.calls.Load() != 3 { + t.Fatalf("Complete() calls = %d, want 3", mock.calls.Load()) + } + if metrics.calls.Load() != 2 { + t.Fatalf("RecordLLMRetry calls = %d, want 2", metrics.calls.Load()) + } +} + func TestRetryProviderRetriesOnRateLimit(t *testing.T) { t.Parallel() diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index ce5e07ad..c8ef258e 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -64,6 +64,21 @@ func TestNew(t *testing.T) { if m.KillSwitchActive == nil { t.Fatal("KillSwitchActive is nil") } + if m.LLMRetryTotal == nil { + t.Fatal("LLMRetryTotal is nil") + } + if m.LLMBudgetExhaustedTotal == nil { + t.Fatal("LLMBudgetExhaustedTotal is nil") + } + if m.ReportWorkerSuccessTotal == nil { + t.Fatal("ReportWorkerSuccessTotal is nil") + } + if m.ReportWorkerErrorTotal == nil { + t.Fatal("ReportWorkerErrorTotal is nil") + } + if m.ReportStaleness == nil { + t.Fatal("ReportStaleness is nil") + } } func TestConvenienceMethods(t *testing.T) { @@ -86,6 +101,11 @@ func TestConvenienceMethods(t *testing.T) { m.SetPositionsOpen(3) m.SetCircuitBreakerState(true) m.SetKillSwitchActive(false) + m.RecordLLMRetry("configured_primary:openai") + m.RecordLLMBudgetExhausted() + m.RecordReportWorkerSuccess("strategy-a") + m.RecordReportWorkerError("strategy-a") + m.ObserveReportStaleness("strategy-a", 120) } func TestHandler(t *testing.T) { @@ -106,6 +126,11 @@ func TestHandler(t *testing.T) { m.RecordSchedulerTick("strategy") m.RecordAutomationJobError("sync_positions") m.RecordStaleRunReconciled() + m.RecordLLMRetry("configured_primary:openai") + m.RecordLLMBudgetExhausted() + m.RecordReportWorkerSuccess("strategy-a") + m.RecordReportWorkerError("strategy-a") + m.ObserveReportStaleness("strategy-a", 120) h := m.Handler() if h == nil { @@ -138,6 +163,11 @@ func TestHandler(t *testing.T) { "tradingagent_positions_open", "tradingagent_circuit_breaker_state", "tradingagent_kill_switch_active", + "tradingagent_llm_retry_total", + "tradingagent_llm_budget_exhausted_total", + "tradingagent_report_worker_success_total", + "tradingagent_report_worker_error_total", + "tradingagent_report_staleness_seconds", } for _, name := range expected { if !strings.Contains(body, name) { @@ -207,6 +237,59 @@ tradingagent_scheduler_tick_total{type="strategy"} 1 # TYPE tradingagent_automation_job_errors_total counter tradingagent_automation_job_errors_total{job_name="reconcile_orders"} 1 tradingagent_automation_job_errors_total{job_name="sync_positions"} 2 +`, + }, + { + name: "llm retry", + collector: func(m *metrics.Metrics) prometheus.Collector { return m.LLMRetryTotal }, + add: func(m *metrics.Metrics) { + m.RecordLLMRetry("configured_primary:openai") + m.RecordLLMRetry("configured_primary:openai") + m.RecordLLMRetry("configured_primary:anthropic") + }, + want: `# HELP tradingagent_llm_retry_total Total LLM retry attempts by provider. +# TYPE tradingagent_llm_retry_total counter +tradingagent_llm_retry_total{provider="configured_primary:anthropic"} 1 +tradingagent_llm_retry_total{provider="configured_primary:openai"} 2 +`, + }, + { + name: "llm budget exhausted", + collector: func(m *metrics.Metrics) prometheus.Collector { return m.LLMBudgetExhaustedTotal }, + add: func(m *metrics.Metrics) { + m.RecordLLMBudgetExhausted() + m.RecordLLMBudgetExhausted() + }, + want: `# HELP tradingagent_llm_budget_exhausted_total Total times an LLM call was rejected due to budget exhaustion. +# TYPE tradingagent_llm_budget_exhausted_total counter +tradingagent_llm_budget_exhausted_total 2 +`, + }, + { + name: "report worker success", + collector: func(m *metrics.Metrics) prometheus.Collector { return m.ReportWorkerSuccessTotal }, + add: func(m *metrics.Metrics) { + m.RecordReportWorkerSuccess("strategy-a") + m.RecordReportWorkerSuccess("strategy-a") + m.RecordReportWorkerSuccess("strategy-b") + }, + want: `# HELP tradingagent_report_worker_success_total Total successful report generations by strategy ID. +# TYPE tradingagent_report_worker_success_total counter +tradingagent_report_worker_success_total{strategy_id="strategy-a"} 2 +tradingagent_report_worker_success_total{strategy_id="strategy-b"} 1 +`, + }, + { + name: "report worker error", + collector: func(m *metrics.Metrics) prometheus.Collector { return m.ReportWorkerErrorTotal }, + add: func(m *metrics.Metrics) { + m.RecordReportWorkerError("strategy-a") + m.RecordReportWorkerError("strategy-b") + }, + want: `# HELP tradingagent_report_worker_error_total Total failed report generations by strategy ID. +# TYPE tradingagent_report_worker_error_total counter +tradingagent_report_worker_error_total{strategy_id="strategy-a"} 1 +tradingagent_report_worker_error_total{strategy_id="strategy-b"} 1 `, }, } @@ -222,3 +305,13 @@ tradingagent_automation_job_errors_total{job_name="sync_positions"} 2 }) } } + +func TestObserveReportStaleness_EmitsSeries(t *testing.T) { + t.Parallel() + + m := metrics.New() + m.ObserveReportStaleness("strategy-a", 120) + if got := testutil.CollectAndCount(m.ReportStaleness); got != 1 { + t.Fatalf("CollectAndCount(report staleness) = %d, want 1", got) + } +}