Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions app/src/api/jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { api } from './client';

export type Job = {
id: string;
job_type: string;
status: 'pending' | 'running' | 'failed' | 'complete';
retry_count: number;
next_retry_at: string | null;
created_at: string;
completed_at: string | null;
failures: { timestamp: string; error: string }[];
};

export async function listJobs(): Promise<Job[]> {
return api<Job[]>('/jobs');
}

export async function createJob(job_type: string): Promise<Job> {
return api<Job>('/jobs', { method: 'POST', body: { job_type } });
}

export async function getJob(id: string): Promise<Job> {
return api<Job>(`/jobs/${id}`);
}
110 changes: 110 additions & 0 deletions app/src/components/JobMonitor.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { useEffect, useState } from 'react';
import { Badge } from '@/components/ui/badge';
import { Button } from '@/components/ui/button';
import {
Collapsible,
CollapsibleContent,
CollapsibleTrigger,
} from '@/components/ui/collapsible';
import { listJobs, createJob, type Job } from '@/api/jobs';
import { ChevronDown, ChevronRight, RefreshCw } from 'lucide-react';

const JOB_TYPES = ['export_data', 'generate_report', 'sync_transactions'] as const;

const STATUS_VARIANT: Record<string, 'default' | 'secondary' | 'destructive' | 'outline'> = {
pending: 'secondary',
running: 'default',
failed: 'destructive',
complete: 'outline',
};

export function JobMonitor() {
const [jobs, setJobs] = useState<Job[]>([]);
const [open, setOpen] = useState(false);
const [loading, setLoading] = useState(false);

async function refresh() {
try {
setLoading(true);
setJobs(await listJobs());
} catch {
// silent
} finally {
setLoading(false);
}
}

useEffect(() => {
if (!open) return;
void refresh();
const id = setInterval(() => void refresh(), 10_000);
return () => clearInterval(id);
}, [open]);

async function handleCreate(type: string) {
await createJob(type);
await refresh();
}

return (
<Collapsible open={open} onOpenChange={setOpen} className="rounded-lg border p-4">
<CollapsibleTrigger className="flex w-full items-center justify-between text-sm font-semibold">
<span className="flex items-center gap-2">
{open ? <ChevronDown className="h-4 w-4" /> : <ChevronRight className="h-4 w-4" />}
Background Jobs
{jobs.length > 0 && (
<Badge variant="secondary" className="ml-1">{jobs.length}</Badge>
)}
</span>
</CollapsibleTrigger>
<CollapsibleContent className="mt-3 space-y-3">
<div className="flex flex-wrap gap-2">
{JOB_TYPES.map((t) => (
<Button key={t} size="sm" variant="outline" onClick={() => void handleCreate(t)}>
+ {t.replace(/_/g, ' ')}
</Button>
))}
<Button size="sm" variant="ghost" onClick={() => void refresh()} disabled={loading}>
<RefreshCw className={`h-3 w-3 mr-1 ${loading ? 'animate-spin' : ''}`} />
Refresh
</Button>
</div>
{jobs.length === 0 ? (
<p className="text-sm text-muted-foreground">No jobs yet.</p>
) : (
<div className="space-y-2">
{jobs.map((job) => (
<div key={job.id} className="rounded-md border p-3 text-sm space-y-1">
<div className="flex items-center justify-between">
<span className="font-medium">{job.job_type.replace(/_/g, ' ')}</span>
<Badge variant={STATUS_VARIANT[job.status] ?? 'secondary'}>{job.status}</Badge>
</div>
<div className="text-xs text-muted-foreground">
Created: {new Date(job.created_at).toLocaleString()}
</div>
{job.retry_count > 0 && (
<div className="text-xs text-muted-foreground">
Retries: {job.retry_count}/3
{job.next_retry_at && (
<> · Next retry: {new Date(job.next_retry_at).toLocaleString()}</>
)}
</div>
)}
{job.failures.length > 0 && (
<div className="text-xs text-red-600">
Last error: {job.failures[job.failures.length - 1].error}
</div>
)}
{job.completed_at && (
<div className="text-xs text-green-600">
Completed: {new Date(job.completed_at).toLocaleString()}
</div>
)}
</div>
))}
</div>
)}
</CollapsibleContent>
</Collapsible>
);
}
3 changes: 3 additions & 0 deletions app/src/pages/Analytics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { useToast } from '@/hooks/use-toast';
import { getBudgetSuggestion, type BudgetSuggestion } from '@/api/insights';
import { formatMoney } from '@/lib/currency';
import { JobMonitor } from '@/components/JobMonitor';

const PERSONAS = [
'Balanced coach',
Expand Down Expand Up @@ -193,6 +194,8 @@ export function Analytics() {
</FinancialCard>
</div>
) : null}

<JobMonitor />
</div>
);
}
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/jobs")
108 changes: 108 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import time
import uuid
import threading
import logging
from datetime import datetime, timezone
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity

bp = Blueprint("jobs", __name__)
logger = logging.getLogger("finmind.jobs")

VALID_JOB_TYPES = {"export_data", "generate_report", "sync_transactions"}
MAX_RETRIES = 3

# In-memory job store: {job_id: {...}}
_jobs: dict[str, dict] = {}
_lock = threading.Lock()


def _run_job(job_id: str) -> None:
"""Simulate async job execution with retry logic."""
with _lock:
job = _jobs.get(job_id)
if not job:
return
job["status"] = "running"

# Simulate work that always fails for demo (replace with real logic)
import random

success = random.random() > 0.5 # noqa: S311

with _lock:
job = _jobs[job_id]
if success:
job["status"] = "complete"
job["completed_at"] = datetime.now(timezone.utc).isoformat()
logger.info("Job %s completed", job_id)
else:
job["retry_count"] += 1
error_msg = f"Simulated failure at {datetime.now(timezone.utc).isoformat()}"
job["failures"].append(
{"timestamp": datetime.now(timezone.utc).isoformat(), "error": error_msg}
)
if job["retry_count"] >= MAX_RETRIES:
job["status"] = "failed"
logger.warning("Job %s failed after %d retries", job_id, MAX_RETRIES)
else:
backoff = 2 ** job["retry_count"]
job["status"] = "pending"
job["next_retry_at"] = datetime.fromtimestamp(
time.time() + backoff, tz=timezone.utc
).isoformat()
logger.info(
"Job %s retry %d in %ds", job_id, job["retry_count"], backoff
)
threading.Timer(backoff, _run_job, args=[job_id]).start()


@bp.post("")
@jwt_required()
def create_job():
uid = int(get_jwt_identity())
data = request.get_json() or {}
job_type = data.get("job_type")
if job_type not in VALID_JOB_TYPES:
return jsonify(error=f"Invalid job_type. Must be one of: {', '.join(sorted(VALID_JOB_TYPES))}"), 400

job_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
job = {
"id": job_id,
"user_id": uid,
"job_type": job_type,
"status": "pending",
"retry_count": 0,
"next_retry_at": None,
"created_at": now,
"completed_at": None,
"failures": [],
}
with _lock:
_jobs[job_id] = job

logger.info("Created job %s type=%s user=%s", job_id, job_type, uid)
threading.Thread(target=_run_job, args=[job_id], daemon=True).start()
return jsonify(job), 201


@bp.get("")
@jwt_required()
def list_jobs():
uid = int(get_jwt_identity())
with _lock:
user_jobs = [j for j in _jobs.values() if j["user_id"] == uid]
user_jobs.sort(key=lambda j: j["created_at"], reverse=True)
return jsonify(user_jobs)


@bp.get("/<job_id>")
@jwt_required()
def get_job(job_id: str):
uid = int(get_jwt_identity())
with _lock:
job = _jobs.get(job_id)
if not job or job["user_id"] != uid:
return jsonify(error="not found"), 404
return jsonify(job)