From 0712313931a8b7ad7f16fbcf4a4624124f52cba6 Mon Sep 17 00:00:00 2001 From: Billy Kennedy <214814620+bkennedyshit@users.noreply.github.com> Date: Thu, 2 Apr 2026 16:39:12 -0400 Subject: [PATCH] feat(issue-#130): resilient background job retry & monitoring --- app/src/api/jobs.ts | 24 ++++++ app/src/components/JobMonitor.tsx | 110 ++++++++++++++++++++++++ app/src/pages/Analytics.tsx | 3 + packages/backend/app/routes/__init__.py | 2 + packages/backend/app/routes/jobs.py | 108 +++++++++++++++++++++++ 5 files changed, 247 insertions(+) create mode 100644 app/src/api/jobs.ts create mode 100644 app/src/components/JobMonitor.tsx create mode 100644 packages/backend/app/routes/jobs.py diff --git a/app/src/api/jobs.ts b/app/src/api/jobs.ts new file mode 100644 index 00000000..34f0e597 --- /dev/null +++ b/app/src/api/jobs.ts @@ -0,0 +1,24 @@ +import { api } from './client'; + +export type Job = { + id: string; + job_type: string; + status: 'pending' | 'running' | 'failed' | 'complete'; + retry_count: number; + next_retry_at: string | null; + created_at: string; + completed_at: string | null; + failures: { timestamp: string; error: string }[]; +}; + +export async function listJobs(): Promise { + return api('/jobs'); +} + +export async function createJob(job_type: string): Promise { + return api('/jobs', { method: 'POST', body: { job_type } }); +} + +export async function getJob(id: string): Promise { + return api(`/jobs/${id}`); +} diff --git a/app/src/components/JobMonitor.tsx b/app/src/components/JobMonitor.tsx new file mode 100644 index 00000000..93851494 --- /dev/null +++ b/app/src/components/JobMonitor.tsx @@ -0,0 +1,110 @@ +import { useEffect, useState } from 'react'; +import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; +import { + Collapsible, + CollapsibleContent, + CollapsibleTrigger, +} from '@/components/ui/collapsible'; +import { listJobs, createJob, type Job } from '@/api/jobs'; +import { ChevronDown, ChevronRight, RefreshCw } from 'lucide-react'; + +const JOB_TYPES = ['export_data', 'generate_report', 'sync_transactions'] as const; + +const STATUS_VARIANT: Record = { + pending: 'secondary', + running: 'default', + failed: 'destructive', + complete: 'outline', +}; + +export function JobMonitor() { + const [jobs, setJobs] = useState([]); + const [open, setOpen] = useState(false); + const [loading, setLoading] = useState(false); + + async function refresh() { + try { + setLoading(true); + setJobs(await listJobs()); + } catch { + // silent + } finally { + setLoading(false); + } + } + + useEffect(() => { + if (!open) return; + void refresh(); + const id = setInterval(() => void refresh(), 10_000); + return () => clearInterval(id); + }, [open]); + + async function handleCreate(type: string) { + await createJob(type); + await refresh(); + } + + return ( + + + + {open ? : } + Background Jobs + {jobs.length > 0 && ( + {jobs.length} + )} + + + +
+ {JOB_TYPES.map((t) => ( + + ))} + +
+ {jobs.length === 0 ? ( +

No jobs yet.

+ ) : ( +
+ {jobs.map((job) => ( +
+
+ {job.job_type.replace(/_/g, ' ')} + {job.status} +
+
+ Created: {new Date(job.created_at).toLocaleString()} +
+ {job.retry_count > 0 && ( +
+ Retries: {job.retry_count}/3 + {job.next_retry_at && ( + <> ยท Next retry: {new Date(job.next_retry_at).toLocaleString()} + )} +
+ )} + {job.failures.length > 0 && ( +
+ Last error: {job.failures[job.failures.length - 1].error} +
+ )} + {job.completed_at && ( +
+ Completed: {new Date(job.completed_at).toLocaleString()} +
+ )} +
+ ))} +
+ )} +
+
+ ); +} diff --git a/app/src/pages/Analytics.tsx b/app/src/pages/Analytics.tsx index 3efc8acc..43fd4bf0 100644 --- a/app/src/pages/Analytics.tsx +++ b/app/src/pages/Analytics.tsx @@ -12,6 +12,7 @@ import { import { useToast } from '@/hooks/use-toast'; import { getBudgetSuggestion, type BudgetSuggestion } from '@/api/insights'; import { formatMoney } from '@/lib/currency'; +import { JobMonitor } from '@/components/JobMonitor'; const PERSONAS = [ 'Balanced coach', @@ -193,6 +194,8 @@ export function Analytics() { ) : null} + + ); } diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..8d585f25 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .jobs import bp as jobs_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(jobs_bp, url_prefix="/jobs") diff --git a/packages/backend/app/routes/jobs.py b/packages/backend/app/routes/jobs.py new file mode 100644 index 00000000..6229c78a --- /dev/null +++ b/packages/backend/app/routes/jobs.py @@ -0,0 +1,108 @@ +import time +import uuid +import threading +import logging +from datetime import datetime, timezone +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity + +bp = Blueprint("jobs", __name__) +logger = logging.getLogger("finmind.jobs") + +VALID_JOB_TYPES = {"export_data", "generate_report", "sync_transactions"} +MAX_RETRIES = 3 + +# In-memory job store: {job_id: {...}} +_jobs: dict[str, dict] = {} +_lock = threading.Lock() + + +def _run_job(job_id: str) -> None: + """Simulate async job execution with retry logic.""" + with _lock: + job = _jobs.get(job_id) + if not job: + return + job["status"] = "running" + + # Simulate work that always fails for demo (replace with real logic) + import random + + success = random.random() > 0.5 # noqa: S311 + + with _lock: + job = _jobs[job_id] + if success: + job["status"] = "complete" + job["completed_at"] = datetime.now(timezone.utc).isoformat() + logger.info("Job %s completed", job_id) + else: + job["retry_count"] += 1 + error_msg = f"Simulated failure at {datetime.now(timezone.utc).isoformat()}" + job["failures"].append( + {"timestamp": datetime.now(timezone.utc).isoformat(), "error": error_msg} + ) + if job["retry_count"] >= MAX_RETRIES: + job["status"] = "failed" + logger.warning("Job %s failed after %d retries", job_id, MAX_RETRIES) + else: + backoff = 2 ** job["retry_count"] + job["status"] = "pending" + job["next_retry_at"] = datetime.fromtimestamp( + time.time() + backoff, tz=timezone.utc + ).isoformat() + logger.info( + "Job %s retry %d in %ds", job_id, job["retry_count"], backoff + ) + threading.Timer(backoff, _run_job, args=[job_id]).start() + + +@bp.post("") +@jwt_required() +def create_job(): + uid = int(get_jwt_identity()) + data = request.get_json() or {} + job_type = data.get("job_type") + if job_type not in VALID_JOB_TYPES: + return jsonify(error=f"Invalid job_type. Must be one of: {', '.join(sorted(VALID_JOB_TYPES))}"), 400 + + job_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + job = { + "id": job_id, + "user_id": uid, + "job_type": job_type, + "status": "pending", + "retry_count": 0, + "next_retry_at": None, + "created_at": now, + "completed_at": None, + "failures": [], + } + with _lock: + _jobs[job_id] = job + + logger.info("Created job %s type=%s user=%s", job_id, job_type, uid) + threading.Thread(target=_run_job, args=[job_id], daemon=True).start() + return jsonify(job), 201 + + +@bp.get("") +@jwt_required() +def list_jobs(): + uid = int(get_jwt_identity()) + with _lock: + user_jobs = [j for j in _jobs.values() if j["user_id"] == uid] + user_jobs.sort(key=lambda j: j["created_at"], reverse=True) + return jsonify(user_jobs) + + +@bp.get("/") +@jwt_required() +def get_job(job_id: str): + uid = int(get_jwt_identity()) + with _lock: + job = _jobs.get(job_id) + if not job or job["user_id"] != uid: + return jsonify(error="not found"), 404 + return jsonify(job)