Implement task queue worker separated from main app and API#314
Merged
Implement task queue worker separated from main app and API#314
Conversation
New files: - app/tasks.py — Task SQLAlchemy model, @register_task decorator/registry, enqueue_task() with atomic dedup via BEGIN IMMEDIATE, query helpers, two built-in test tasks (echo, sleep) - app/worker.py — TaskWorker class (claim/execute/poll loop), start_worker_process() entry point that creates its own Flask app - app/migrations/versions/a1b2c3d4e5f6_add_tasks_table.py — Alembic migration for the tasks table Modified files: - app/db.py — imports tasks in init_db() so the Task model is registered before db.create_all() - app/app.py — imports tasks, adds 3 API endpoints (POST/GET /api/tasks, GET /api/tasks/<id>), starts worker as multiprocessing.Process with graceful shutdown Tests: Both pass, schema fixture updated to include the tasks table. Now the task queue infrastructure is in place. When you start the app with python app/app.py, it will launch both the Flask server and a worker process that polls for tasks every 2 seconds. You can enqueue tasks via the API and watch them get picked up.
13 registered tasks Pipeline chain (each enqueues successor, dedup prevents duplicates): update_titledb → scan_library → identify_library → add_missing_apps → remove_missing_files → update_titles → organize_library → generate_library Per-file child tasks (parent processes inline, tracks progress): - add_file — child of scan_library - identify_file — child of identify_library File event tasks (enqueued by watcher): - handle_file_added, handle_file_moved, handle_file_deleted Parent-child task model - parent_id FK on Task (self-referential), worker only claims parent_id IS NULL - Parent creates children, processes inline, completion_pct auto-updates - Alembic migration: b2c3d4e5f6a7_add_parent_id_to_tasks.py Removed from app.py post_library_change, update_and_scan_job, scan_library, scan/titledb locks, titles_lib/titledb imports Titledb reference counting Tasks that need titledb use _with_titledb(func) wrapper which loads, runs, decrements counter, calls debounced unload.
…, dead code cleanup Port parent-child task execution from inline processing to worker-claimable children, enabling multi-worker parallelism. Add last-child-triggers pattern where the final child to complete atomically marks the parent done and runs the continuation (next pipeline step). Introduce waiting_for_children status, TASK_CONTINUATIONS registry, and atomic _try_complete_parent using BEGIN IMMEDIATE to prevent race conditions. - Refactor scan_library, identify_library, organize_library into parent tasks that create children and return immediately - Register continuations for each parent task to chain the next pipeline step - Wrap identify_file and organize_file child tasks with _with_titledb for independent execution - Add organize_file as per-file child task, extract remove_outdated_updates as separate task - Replace in-memory ignored_events_tuples with DB-based IgnoredEvent table for cross-process watcher/organizer sync - Filter ignored events in on_library_change via pop_ignored_event before enqueueing tasks - Remove dead code: process_library_organization, threading lock/ignored events set in file_watcher, stale global watcher in reload_conf
Stale running/waiting_for_children tasks from a previous crash blocked dedup, preventing the pipeline from re-enqueueing those steps. Now tasks are cleaned up at startup and deleted immediately after completion.
Replace Flask's dev server with Gunicorn (gthread, 1 worker, 4 threads) for production use. New run.py entrypoint orchestrates startup: DB init, task worker subprocess, and Gunicorn with hooks for watcher/scheduler lifecycle. Worker subprocess ignores SIGINT for graceful shutdown via stop_event. python app.py remains available for local development.
Add settings UI and API for configuring the number of task worker processes (1 to cpu_count). Workers scale dynamically at runtime by watching settings.yaml for changes — no restart required. - WorkerPool class manages worker subprocesses with per-worker stop events - Settings watcher thread polls config file mtime and calls pool.scale() - Each worker gets a numeric ID (worker-1, worker-2, ...) shown in logs - Fix parent task progress race condition: move progress update into _try_complete_parent under BEGIN IMMEDIATE to prevent duplicate/skipped counts when multiple workers complete children simultaneously - Both run.py (Gunicorn) and app.py (dev) use the same pool + watcher
Delete scheduler.py (JobScheduler, ThreadPoolExecutor, croniter dependency) and replace with run_after column on the tasks table. update_titledb now re-enqueues itself with run_after=now+interval after each execution, creating a self-perpetuating schedule driven by the existing worker poll loop.
- Add run_after column to Task model and merged migration
- Worker claim_task filters by run_after <= datetime('now')
- enqueue_task supports run_after param with adjusted dedup logic
- update_scheduled_task creates/updates/deletes pending scheduled tasks
- Settings API uses update_scheduled_task instead of JobScheduler
- cleanup_tasks removes stale scheduled tasks on restart
- Move interval parsing utils from scheduler.py to utils.py
- Fix SQLite datetime comparison (isoformat T-separator vs space)
- Add debug logging for task enqueue/schedule operations
- Update gunicorn to 25.3.0, remove croniter dependency
- Add get_settings() in settings.py with mtime-cached lazy reload - Reuse file_watcher Observer for settings.yaml via add_file_callback - Remove app_settings global and all reload_conf() defensive calls - Move WorkerPool creation into Gunicorn post_worker_init so the watcher callback can scale it directly
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Background work (library scanning, file identification, titledb updates, file organization) ran inline within the Flask process using thread locks. This coupled the web server to long-running background jobs, preventing the move to Gunicorn — where multiple threads serve concurrent requests but background work must not block or interfere with request handling. The single-threaded, GIL-bound execution also meant no multi-core utilization, no task visibility, and no deduplication.
Architecture
The API server enqueues tasks into a SQLite table. A pool of worker subprocesses polls the table and executes claimed tasks in their own processes. Workers scale dynamically from the settings UI without restart.
Expensive operations (scan, identify, organize) create per-file child tasks that are processed in parallel across workers. Parent tasks track progress atomically and trigger continuations when all children complete, forming the processing pipeline.
Scheduling is handled by the task queue itself —
update_titledbre-enqueues itself with arun_afterdelay after each execution, replacing the scheduler module entirely.Implementation
croniterdependency