This project implements a document processing backend that orchestrates a configurable pipeline, executes it asynchronously, and publishes lifecycle events using Redis Streams.
The system is designed to demonstrate:
- asynchronous processing
- pipeline orchestration
- event-driven architecture
- resiliency strategies
- clean separation of concerns
- Python 3.12
- Django + Django REST Framework
- MySQL
- Redis
- django-rq (background jobs)
- Redis Streams (event streaming)
- Docker & Docker Compose
flowchart TD
Client[Client] --> API[Django REST API]
API --> JobService[JobService]
JobService --> DB[(MySQL)]
JobService --> Queue[Redis Queue RQ]
Queue --> Worker[RQ Worker]
Worker --> Task[process_job]
Task --> Pipeline[ProcessingPipeline]
Pipeline --> Extractor[Extractor Provider]
Extractor --> Fast[FastExtractor]
Extractor --> Slow[SlowExtractor]
Pipeline --> Analyzer[Analyzer Provider]
Pipeline --> Enricher[Enricher Provider]
Pipeline --> DB
Pipeline --> Stream[Redis Streams job_events]
Stream --> Consumer[consume_event command]
Consumer --> Logs[Logs / downstream simulation]
This diagram maps the challenge architecture to the concrete implementation. The Document Processing Gateway is implemented with Django REST API, JobService, RQ worker, and ProcessingPipeline. Providers are mocked but hidden behind abstractions so they can be replaced without changing the pipeline logic.
- Client creates a job through POST /jobs/.
- The API validates the payload and stores the job as pending.
- The job id is enqueued in Redis Queue (RQ) to trigger asynchronous pipeline execution.
- An RQ worker consumes the job id and executes the configured processing pipeline stages.
- Each stage updates partial results in the database and publishes events to Redis Streams.
- A Redis Streams consumer reads and logs events using consumer groups and explicit ACK.
- The client can query the job endpoint at any time to retrieve status and partial results.
Valid stage combinations:
- ["extraction"]
- ["extraction", "analysis"]
- ["extraction", "analysis", "enrichment"]
The system enforces stage ordering because each stage depends on the previous one:
raw content → extracted text → analyzed entities → enriched metadata
A document may require only extraction and analysis without enrichment, and this is fully supported.
Providers are implemented behind abstractions, allowing the pipeline to remain independent of concrete implementations.
Implemented providers:
- FastExtractor (low latency simulation)
- SlowExtractor (high latency simulation)
- AnalyzerProvider
- EnricherProvider
The system is designed to allow new provider implementations without modifying the pipeline orchestration logic.
Create a new provider class that extends BaseProvider and implements the required process() method.
from apps.jobs.providers.base_provider import BaseProvider
class CustomExtractor(BaseProvider):
def process(self, document_content: str):
# Custom extraction logic
return extracted_dataAll providers must expose a process() method with the expected
input/output contract.
Providers are not instantiated inside the pipeline. Instead, they are
injected from the task layer (process_job).
from apps.jobs.providers.custom_extractor import CustomExtractor
def process_job(job_id):
pipeline = ProcessingPipeline(
extractor=CustomExtractor(),
analyzer=AnalyzerProvider(),
enricher=EnricherProvider(),
)
pipeline.process(job_id)This keeps the pipeline independent of concrete implementations.
You can select providers dynamically using environment variables:
if settings.EXTRACTOR_PROVIDER == "custom":
extractor = CustomExtractor()
else:
extractor = FastExtractor()This approach follows the Dependency Injection pattern:
- The pipeline depends on abstractions, not implementations
- Providers can be replaced without changing orchestration logic
- New providers can be introduced with minimal impact
This makes the system more flexible and easier to extend.
POST /api/jobs/
Creates a job and triggers asynchronous processing.
GET /api/jobs/{id}/
Returns status and partial results.
GET /api/jobs/
POST /api/jobs/{id}/cancel/
{
"document_name": "My Document",
"document_type": "important",
"document_content": "Hi, my name is Gabriela, I want to cancel my subscription. SubscriptionID 2342",
"pipeline_config": {
"stages": ["extraction", "analysis", "enrichment"]
}
}{
"status": "processing",
"results": {
"stages": {
"extraction": {
"status": "completed",
"results": "Extracted text",
"timestamp": "..."
},
"analysis": {
"status": "started",
"timestamp": "..."
}
}
}
}{
"status": "failed",
"results": {
"stages": {
"extraction": {
"status": "completed"
},
"analysis": {
"status": "failed",
"error": "Provider timeout"
}
}
},
"errors": {
"stage": "analysis",
"message": "Provider timeout"
}
}The API returns validation errors with 400 Bad Request when the request payload is invalid.
Examples:
- missing required fields
- invalid
pipeline_config - invalid stage combination
- duplicated or unknown stages
Example invalid request:
{
"pipeline_config": {
"stages": ["analysis"]
}
}{
"pipeline_config": [
"Invalid stage combination. Valid combinations are: ['extraction'], ['extraction', 'analysis'], ['extraction', 'analysis', 'enrichment']."
]
}Invalid state transitions also return 400 Bad Request.
Example: trying to cancel a completed job.
{
"error": "Cannot cancel job from status completed"
}If a job does not exist, the API returns 404 Not Found.
{
"detail": "Not found."
}Published events:
- job.created
- job.stage_started
- job.stage_completed
- job.completed
- job.failed
- job.cancelled
Each event includes:
{
"event_type": "job.stage_completed",
"job_id": "1",
"timestamp": "2026-04-27T13:32:50Z",
"payload": {
"stage": "analysis",
"result": {}
}
}Redis Streams was chosen because it satisfies:
- message persistence
- consumer groups
- acknowledgment
Compared to Kafka, it provides a simpler operational setup while still demonstrating event streaming capabilities.
The API does not execute processing synchronously.
Instead:
- requests enqueue a task
- workers process jobs independently
API -> creates job -> enqueues job id -> returns response
Worker -> consumes job -> runs pipeline
This ensures responsiveness and scalability.
The project implements retry with exponential backoff for provider calls.
This was chosen because provider failures and timeouts are often transient. Retrying immediately could increase pressure on the external provider, so backoff adds delay between attempts.
If retries are exhausted:
- the current stage is marked as failed
- the job is marked as failed
- previous stage results are preserved
- an error is stored in the job response
For a production system, I would consider adding:
- circuit breaker for repeated provider outages
- outbox pattern for event publication failures
- dead letter queue for later inspection/reprocessing
A custom Django management command:
python manage.py consume_event
This simulates a downstream service by:
- reading events
- logging them
- acknowledging processing
Authentication was not implemented because it was not required by the challenge. In a production environment, I would secure the API using JWT or OAuth2.
The providers are mocked as internal classes instead of external services. This keeps the challenge focused on orchestration, abstraction, async processing, and event streaming. The design allows replacing them with real HTTP clients without changing the pipeline logic.
cp .env.example .env
docker compose up --build
docker compose exec web python manage.py migrate
The RQ worker is started automatically by Docker Compose.
docker compose exec web python manage.py consume_event
docker compose exec web pytest