Skip to content

Implement task queue worker separated from main app and API#314

Merged
a1ex4 merged 29 commits intodevelopfrom
task-queue-worker-foundation
Apr 8, 2026
Merged

Implement task queue worker separated from main app and API#314
a1ex4 merged 29 commits intodevelopfrom
task-queue-worker-foundation

Conversation

@a1ex4
Copy link
Copy Markdown
Owner

@a1ex4 a1ex4 commented Apr 3, 2026

Problem

Background work (library scanning, file identification, titledb updates, file organization) ran inline within the Flask process using thread locks. This coupled the web server to long-running background jobs, preventing the move to Gunicorn — where multiple threads serve concurrent requests but background work must not block or interfere with request handling. The single-threaded, GIL-bound execution also meant no multi-core utilization, no task visibility, and no deduplication.

Architecture

┌─────────────┐      ┌──────────────┐     ┌─────────────────┐
│  Flask API  │────> │  tasks table │<────│  WorkerPool     │
│  (Gunicorn) │      │  (SQLite)    │     │  (N processes)  │
└─────────────┘      └──────────────┘     └─────────────────┘

The API server enqueues tasks into a SQLite table. A pool of worker subprocesses polls the table and executes claimed tasks in their own processes. Workers scale dynamically from the settings UI without restart.

Expensive operations (scan, identify, organize) create per-file child tasks that are processed in parallel across workers. Parent tasks track progress atomically and trigger continuations when all children complete, forming the processing pipeline.

Scheduling is handled by the task queue itself — update_titledb re-enqueues itself with a run_after delay after each execution, replacing the scheduler module entirely.

Implementation

  • 13 registered tasks forming the processing pipeline, with atomic deduplication and progress tracking
  • Gunicorn replaces Flask's dev server for production
  • Configurable worker count with dynamic scaling via settings UI
  • Race-safe concurrent inserts for multi-worker identification
  • Removed scheduler module and croniter dependency

a1ex4 added 16 commits April 2, 2026 14:08
  New files:
  - app/tasks.py — Task SQLAlchemy model, @register_task decorator/registry, enqueue_task() with atomic dedup via BEGIN IMMEDIATE, query helpers, two built-in test tasks (echo, sleep)
  - app/worker.py — TaskWorker class (claim/execute/poll loop), start_worker_process() entry point that creates its own Flask app
  - app/migrations/versions/a1b2c3d4e5f6_add_tasks_table.py — Alembic migration for the tasks table

  Modified files:
  - app/db.py — imports tasks in init_db() so the Task model is registered before db.create_all()
  - app/app.py — imports tasks, adds 3 API endpoints (POST/GET /api/tasks, GET /api/tasks/<id>), starts worker as multiprocessing.Process with graceful shutdown

  Tests: Both pass, schema fixture updated to include the tasks table.

  Now the task queue infrastructure is in place. When you start the app with python app/app.py, it will launch both the Flask server and a worker process that polls for tasks every 2 seconds. You can
  enqueue tasks via the API and watch them get picked up.
 13 registered tasks

 Pipeline chain (each enqueues successor, dedup prevents duplicates):
 update_titledb → scan_library → identify_library → add_missing_apps → remove_missing_files → update_titles → organize_library → generate_library

 Per-file child tasks (parent processes inline, tracks progress):
 - add_file — child of scan_library
 - identify_file — child of identify_library

 File event tasks (enqueued by watcher):
 - handle_file_added, handle_file_moved, handle_file_deleted

 Parent-child task model

 - parent_id FK on Task (self-referential), worker only claims parent_id IS NULL
 - Parent creates children, processes inline, completion_pct auto-updates
 - Alembic migration: b2c3d4e5f6a7_add_parent_id_to_tasks.py

 Removed from app.py

 post_library_change, update_and_scan_job, scan_library, scan/titledb locks, titles_lib/titledb imports

 Titledb reference counting

 Tasks that need titledb use _with_titledb(func) wrapper which loads, runs, decrements counter, calls debounced unload.
…, dead code cleanup

  Port parent-child task execution from inline processing to worker-claimable children, enabling multi-worker parallelism. Add last-child-triggers pattern where the final child to
  complete atomically marks the parent done and runs the continuation (next pipeline step). Introduce waiting_for_children status, TASK_CONTINUATIONS registry, and atomic
  _try_complete_parent using BEGIN IMMEDIATE to prevent race conditions.

  - Refactor scan_library, identify_library, organize_library into parent tasks that create children and return immediately
  - Register continuations for each parent task to chain the next pipeline step
  - Wrap identify_file and organize_file child tasks with _with_titledb for independent execution
  - Add organize_file as per-file child task, extract remove_outdated_updates as separate task
  - Replace in-memory ignored_events_tuples with DB-based IgnoredEvent table for cross-process watcher/organizer sync
  - Filter ignored events in on_library_change via pop_ignored_event before enqueueing tasks
  - Remove dead code: process_library_organization, threading lock/ignored events set in file_watcher, stale global watcher in reload_conf
  Stale running/waiting_for_children tasks from a previous crash blocked dedup, preventing the pipeline from re-enqueueing those steps. Now tasks are cleaned up at startup and deleted immediately after completion.
Replace Flask's dev server with Gunicorn (gthread, 1 worker, 4 threads) for production use. New run.py entrypoint orchestrates startup: DB init, task worker subprocess, and Gunicorn with hooks for watcher/scheduler lifecycle. Worker subprocess ignores SIGINT for graceful shutdown via stop_event. python app.py remains available for local development.
Add settings UI and API for configuring the number of task worker processes (1 to cpu_count). Workers scale dynamically at runtime by watching settings.yaml for changes — no restart required.
- WorkerPool class manages worker subprocesses with per-worker stop events
- Settings watcher thread polls config file mtime and calls pool.scale()
- Each worker gets a numeric ID (worker-1, worker-2, ...) shown in logs
- Fix parent task progress race condition: move progress update into
  _try_complete_parent under BEGIN IMMEDIATE to prevent duplicate/skipped
  counts when multiple workers complete children simultaneously
- Both run.py (Gunicorn) and app.py (dev) use the same pool + watcher
Delete scheduler.py (JobScheduler, ThreadPoolExecutor, croniter dependency) and replace with run_after column on the tasks table. update_titledb now re-enqueues itself with run_after=now+interval after each execution, creating a self-perpetuating schedule driven by the existing worker poll loop.

- Add run_after column to Task model and merged migration
- Worker claim_task filters by run_after <= datetime('now')
- enqueue_task supports run_after param with adjusted dedup logic
- update_scheduled_task creates/updates/deletes pending scheduled tasks
- Settings API uses update_scheduled_task instead of JobScheduler
- cleanup_tasks removes stale scheduled tasks on restart
- Move interval parsing utils from scheduler.py to utils.py
- Fix SQLite datetime comparison (isoformat T-separator vs space)
- Add debug logging for task enqueue/schedule operations
- Update gunicorn to 25.3.0, remove croniter dependency
@a1ex4 a1ex4 added the type/refactor PR for a code change that contains refactor label Apr 3, 2026
@a1ex4 a1ex4 merged commit 3e125e9 into develop Apr 8, 2026
2 checks passed
@a1ex4 a1ex4 deleted the task-queue-worker-foundation branch April 8, 2026 09:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/refactor PR for a code change that contains refactor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant