Skip to content

Suhas-30/QueueCTL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

4 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🧩 QueueCTL β€” Lightweight Job Queue & Worker System

QueueCTL is a modular command-line job queue manager built with Python, Typer, SQLite, and Docker.
It provides a lightweight yet powerful framework to enqueue shell commands, manage workers, handle retries, and inspect a Dead Letter Queue (DLQ) β€” all from the CLI.


🎬 Demo of QueueCTL

πŸ“Ή Watch Demo of QueueCTL


πŸ“‹ Table of Contents


✨ Features

  • πŸš€ CLI-based job queue management with Typer
  • πŸ“Š SQLite database for lightweight persistence
  • πŸ”„ Automatic retry logic with exponential backoff
  • πŸ’€ Dead Letter Queue (DLQ) for failed jobs
  • βš™οΈ Configurable settings (max retries, worker count, etc.)
  • 🐳 Docker support with persistent volumes
  • πŸ‘· Multi-worker architecture for concurrent job processing
  • πŸ›‘οΈ Graceful shutdown handling
  • 🎨 Rich CLI output with tables and status indicators

βš™οΈ Setup Instructions

🧠 Requirements

  • Python 3.11+
  • Docker & Docker Compose (for containerized setup)
  • pip (Python package manager)

🧰 Local Setup

# Clone the repository
git clone https://github.com/<your-username>/queuectl.git
cd queuectl

# Create a virtual environment
python -m venv venv
source venv/bin/activate       # (Linux/Mac)
venv\Scripts\activate          # (Windows)

# Install dependencies
pip install -e .

# Initialize database (automatically happens on first command)
queuectl list

🐳 Docker Setup

# Build the Docker image
docker build -t queuectl .

# Run interactively
docker run -it queuectl bash

# Inside container, run commands
queuectl enqueue "echo 'Hello from Docker!'"
queuectl list --state pending

βœ… Persistent Data:
A Docker volume queuectl-data ensures your SQLite database persists between container runs.

# Using Docker Compose (if available)
docker-compose up -d
docker-compose exec queuectl queuectl list

πŸ’‘ Usage Examples

🧱 Enqueue a Job

queuectl enqueue "echo 'Test Job'" --max-retries 3

Output:

Job added with ID: 5f07c123-45ef-42b8-88cd-8e6e33bd4e5d

πŸ“‹ List Jobs

# List all jobs
queuectl list

# Filter by state
queuectl list --state pending
queuectl list --state completed
queuectl list --state failed

Output:

┏━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ ID         ┃ Command            ┃ State     ┃ Retries   ┃ Created At   ┃
┑━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
β”‚ 5f07c123   β”‚ echo 'Test Job'    β”‚ pending   β”‚ 0/3       β”‚ 2025-01-15   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

βš™οΈ Start Workers

# Start 2 worker processes
queuectl worker start --count 2

Output:

[Manager PID 1234] Starting 2 workers...
[Worker PID 5678] Loop started
[Executor] Running job 5f07c123...
[Executor] Job completed successfully.

Stop Workers:

# Graceful shutdown (Ctrl+C or send SIGTERM)
^C
[Worker PID 5678] Received shutdown signal, finishing current job...
[Worker PID 5678] Shutdown complete.

πŸ’€ Dead Letter Queue (DLQ)

# View all failed jobs in DLQ
queuectl dlq list

# Retry a specific job from DLQ
queuectl dlq retry <job_id>

# Clear all jobs from DLQ
queuectl dlq clear

🧩 Configuration Management

# Set configuration values
queuectl config set max-retries 5
queuectl config set worker-count 4

# Get a specific config value
queuectl config get max-retries

# List all configuration
queuectl config list

Output:

Current configuration:
  max-retries: 5
  worker-count: 4

🧠 Architecture Overview

πŸ—οΈ System Architecture Diagram

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚          CLI Layer           β”‚
                    β”‚ (Typer + Rich interface)     β”‚
                    β”‚------------------------------β”‚
                    β”‚ β€’ queuectl enqueue/list      β”‚
                    β”‚ β€’ queuectl worker start      β”‚
                    β”‚ β€’ queuectl dlq/config        β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                   β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚                        β”‚                        β”‚
          β–Ό                        β–Ό                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Repository Layer  β”‚   β”‚ Worker Management     β”‚   β”‚   Config Layer       β”‚
β”‚ (JobRepository,    β”‚   β”‚ (manager.py)          β”‚   β”‚ (config.py)          β”‚
β”‚  ConfigRepo)       β”‚   β”‚-----------------------β”‚   β”‚----------------------β”‚
β”‚--------------------β”‚   β”‚ β€’ Spawns processes    β”‚   β”‚ β€’ Key-value config   β”‚
β”‚ β€’ SQLite queries   β”‚   β”‚ β€’ Graceful shutdown   β”‚   β”‚ β€’ CLI-based updates  β”‚
β”‚ β€’ Job persistence  β”‚   β”‚ β€’ Process isolation   β”‚   β”‚ β€’ Persists to DB     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚                          β”‚                         β”‚
           β”‚                          β”‚                         β”‚
           β–Ό                          β–Ό                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Worker Execution   β”‚     β”‚ Retry & DLQ Logic  β”‚     β”‚ Database Layer     β”‚
β”‚ (workLoop, executorβ”‚     β”‚ (retry.py)         β”‚     β”‚ (SQLite + Volume)  β”‚
β”‚  jobLifeCycle)     β”‚     β”‚--------------------β”‚     β”‚--------------------β”‚
β”‚--------------------β”‚     β”‚ β€’ Exponential back β”‚     β”‚ β€’ jobs table       β”‚
β”‚ β€’ Runs shell cmds  β”‚     β”‚ β€’ DLQ transition   β”‚     β”‚ β€’ config table     β”‚
β”‚ β€’ Updates job stateβ”‚     β”‚ β€’ Attempts trackingβ”‚     β”‚ β€’ Persistent volumeβ”‚
β”‚ β€’ Manages retries  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

🧩 Component Descriptions

Component Description
CLI Layer Command interface for managing jobs, workers, config, and DLQ using Typer and Rich
Repository Layer Handles job creation, listing, and database operations through JobRepository
Worker Management Spawns and manages worker processes with graceful shutdown capabilities
Config Layer Key-value configuration system with CLI-based updates persisted to database
Worker Execution Runs shell commands, updates job states, and manages the execution lifecycle
Retry & DLQ Logic Implements exponential backoff and transitions failed jobs to Dead Letter Queue
Database Layer SQLite-based persistence for jobs and config with Docker volume support

πŸ”„ Job Lifecycle

pending  β†’  processing  β†’  completed
   ↓             ↓
   ↓         failed β†’ retry (pending)
   ↓
   └──> dead (DLQ after max retries)

Each job moves through these states based on worker execution results.


πŸ“¦ Data Persistence

  • Jobs are stored in queuectl.db (SQLite)
  • In Docker, this file lives inside /app/data (mounted via queuectl-data volume)
  • Configuration settings are persisted in the same database
  • Ensures your queue survives application and container restarts

βš–οΈ Assumptions & Trade-offs

Design Decision Rationale
βœ… SQLite Simple, embedded database β€” ideal for local or small-scale deployments
βš™οΈ One command per job Designed for shell-style tasks; keeps job model simple
πŸ” Exponential backoff Prevents overwhelming systems during transient failures
🧠 Single-node design No distributed locking (but could be extended with PostgreSQL or Redis)
🧩 Multiprocess workers Scalable on one machine; not yet distributed/clustered
πŸ’Ύ Minimal dependencies Pure Python standard libs + Typer + Rich for portability

πŸ§ͺ Testing Instructions

πŸ”¬ Run Unit Tests

# Install test dependencies
pip install pytest

# Run all tests
pytest -v

# Run specific test file
pytest tests/test_jobs.py -v

# Run with coverage
pytest --cov=queuectl tests/

Expected Output:

tests/test_config.py::test_set_and_get_config PASSED
tests/test_dlq.py::test_dlq_job_state PASSED
tests/test_jobs.py::test_enqueue_and_list_jobs PASSED
tests/test_workers.py::test_run_valid_job PASSED

🧱 Test Coverage

Test File Purpose
test_jobs.py Verifies enqueue & listing of jobs
test_workers.py Ensures workers execute commands correctly
test_dlq.py Confirms failed jobs move to DLQ
test_config.py Validates configuration persistence

πŸ“‚ Project Structure

queuectl/
β”œβ”€β”€ cli/
β”‚   β”œβ”€β”€ main.py           # Main CLI entry point
β”‚   β”œβ”€β”€ job_cli.py        # Job management commands
β”‚   β”œβ”€β”€ worker_cli.py     # Worker control commands
β”‚   β”œβ”€β”€ dlq_cli.py        # Dead Letter Queue commands
β”‚   └── config_cli.py     # Configuration commands
β”œβ”€β”€ worker/
β”‚   β”œβ”€β”€ manager.py        # Worker process manager
β”‚   β”œβ”€β”€ jobExecutor.py    # Job execution logic
β”‚   β”œβ”€β”€ jobLifeCycle.py   # State transitions
β”‚   β”œβ”€β”€ retry.py          # Retry logic & backoff
β”‚   β”œβ”€β”€ workLoop.py       # Main worker loop
β”‚   └── shutdown.py       # Graceful shutdown handler
β”œβ”€β”€ repository.py         # Database operations
β”œβ”€β”€ config.py             # Configuration management
β”œβ”€β”€ dbConnection.py       # SQLite connection handler
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ test_config.py
β”‚   β”œβ”€β”€ test_dlq.py
β”‚   β”œβ”€β”€ test_jobs.py
β”‚   └── test_workers.py
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ docker-compose.yml
β”œβ”€β”€ setup.py
β”œβ”€β”€ requirements.txt
└── README.md

🧾 License

MIT License Β© 2025 β€” QueueCTL Project

This project is licensed under the MIT License. See the LICENSE file for details.


βœ‰οΈ Author

Developed by Suhas

πŸ“§ 3suhashs@gamil.com
πŸ™ GitHub: Suhas-30


πŸŽ₯ Video Submission

πŸ“Ή Watch Demo of QueueCTL


🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

πŸ“ Roadmap

  • Add support for job priorities
  • Implement job dependencies (DAG execution)
  • Add web UI for queue monitoring
  • Support for PostgreSQL backend
  • Distributed worker support with Redis
  • Webhook notifications for job completion
  • Scheduled/cron-style job execution

πŸ™ Acknowledgments

  • Built with Typer for CLI
  • Styled with Rich for beautiful terminal output

⭐ If you find this project useful, please consider giving it a star on GitHub!

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors