Skip to content

Gabym950/document-processing-gateway

Repository files navigation

Document Processing Gateway

Overview

This project implements a document processing backend that orchestrates a configurable pipeline, executes it asynchronously, and publishes lifecycle events using Redis Streams.

The system is designed to demonstrate:

  • asynchronous processing
  • pipeline orchestration
  • event-driven architecture
  • resiliency strategies
  • clean separation of concerns

Tech Stack

  • Python 3.12
  • Django + Django REST Framework
  • MySQL
  • Redis
  • django-rq (background jobs)
  • Redis Streams (event streaming)
  • Docker & Docker Compose

Architecture

flowchart TD
    Client[Client] --> API[Django REST API]
    API --> JobService[JobService]
    JobService --> DB[(MySQL)]
    JobService --> Queue[Redis Queue RQ]

    Queue --> Worker[RQ Worker]
    Worker --> Task[process_job]
    Task --> Pipeline[ProcessingPipeline]

    Pipeline --> Extractor[Extractor Provider]
    Extractor --> Fast[FastExtractor]
    Extractor --> Slow[SlowExtractor]

    Pipeline --> Analyzer[Analyzer Provider]
    Pipeline --> Enricher[Enricher Provider]

    Pipeline --> DB
    Pipeline --> Stream[Redis Streams job_events]

    Stream --> Consumer[consume_event command]
    Consumer --> Logs[Logs / downstream simulation]
Loading

This diagram maps the challenge architecture to the concrete implementation. The Document Processing Gateway is implemented with Django REST API, JobService, RQ worker, and ProcessingPipeline. Providers are mocked but hidden behind abstractions so they can be replaced without changing the pipeline logic.


Main Flow

  1. Client creates a job through POST /jobs/.
  2. The API validates the payload and stores the job as pending.
  3. The job id is enqueued in Redis Queue (RQ) to trigger asynchronous pipeline execution.
  4. An RQ worker consumes the job id and executes the configured processing pipeline stages.
  5. Each stage updates partial results in the database and publishes events to Redis Streams.
  6. A Redis Streams consumer reads and logs events using consumer groups and explicit ACK.
  7. The client can query the job endpoint at any time to retrieve status and partial results.

Pipeline Configuration

Valid stage combinations:

  • ["extraction"]
  • ["extraction", "analysis"]
  • ["extraction", "analysis", "enrichment"]

The system enforces stage ordering because each stage depends on the previous one:

raw content → extracted text → analyzed entities → enriched metadata

A document may require only extraction and analysis without enrichment, and this is fully supported.


Providers

Providers are implemented behind abstractions, allowing the pipeline to remain independent of concrete implementations.

Implemented providers:

  • FastExtractor (low latency simulation)
  • SlowExtractor (high latency simulation)
  • AnalyzerProvider
  • EnricherProvider

Extending the System: Adding a New Provider

The system is designed to allow new provider implementations without modifying the pipeline orchestration logic.

1. Implement the Provider Interface

Create a new provider class that extends BaseProvider and implements the required process() method.

from apps.jobs.providers.base_provider import BaseProvider

class CustomExtractor(BaseProvider):
    def process(self, document_content: str):
        # Custom extraction logic
        return extracted_data

All providers must expose a process() method with the expected input/output contract.


2. Wire the Provider in the Composition Layer

Providers are not instantiated inside the pipeline. Instead, they are injected from the task layer (process_job).

from apps.jobs.providers.custom_extractor import CustomExtractor

def process_job(job_id):
    pipeline = ProcessingPipeline(
        extractor=CustomExtractor(),
        analyzer=AnalyzerProvider(),
        enricher=EnricherProvider(),
    )
    pipeline.process(job_id)

This keeps the pipeline independent of concrete implementations.


3. (Optional) Make It Configurable

You can select providers dynamically using environment variables:

if settings.EXTRACTOR_PROVIDER == "custom":
    extractor = CustomExtractor()
else:
    extractor = FastExtractor()

Design Rationale

This approach follows the Dependency Injection pattern:

  • The pipeline depends on abstractions, not implementations
  • Providers can be replaced without changing orchestration logic
  • New providers can be introduced with minimal impact

This makes the system more flexible and easier to extend.


API Endpoints

Create Job

POST /api/jobs/

Creates a job and triggers asynchronous processing.

Get Job

GET /api/jobs/{id}/

Returns status and partial results.

List Jobs

GET /api/jobs/

Cancel Job

POST /api/jobs/{id}/cancel/


Example Request

{
  "document_name": "My Document",
  "document_type": "important",
  "document_content": "Hi, my name is Gabriela, I want to cancel my subscription. SubscriptionID 2342",
  "pipeline_config": {
    "stages": ["extraction", "analysis", "enrichment"]
  }
}

Example Job Result Structure

{
  "status": "processing",
  "results": {
    "stages": {
      "extraction": {
        "status": "completed",
        "results": "Extracted text",
        "timestamp": "..."
      },
      "analysis": {
        "status": "started",
        "timestamp": "..."
      }
    }
  }
}

Example Error Structure

{
  "status": "failed",
  "results": {
    "stages": {
      "extraction": {
        "status": "completed"
      },
      "analysis": {
        "status": "failed",
        "error": "Provider timeout"
      }
    }
  },
  "errors": {
    "stage": "analysis",
    "message": "Provider timeout"
  }
}

API Error Handling

The API returns validation errors with 400 Bad Request when the request payload is invalid.

Examples:

  • missing required fields
  • invalid pipeline_config
  • invalid stage combination
  • duplicated or unknown stages

Example invalid request:

{
  "pipeline_config": {
    "stages": ["analysis"]
  }
}

Example response:

{
  "pipeline_config": [
    "Invalid stage combination. Valid combinations are: ['extraction'], ['extraction', 'analysis'], ['extraction', 'analysis', 'enrichment']."
  ]
}

Invalid state transitions also return 400 Bad Request.

Example: trying to cancel a completed job.

{
  "error": "Cannot cancel job from status completed"
}

If a job does not exist, the API returns 404 Not Found.

{
  "detail": "Not found."
}

Events

Published events:

  • job.created
  • job.stage_started
  • job.stage_completed
  • job.completed
  • job.failed
  • job.cancelled

Each event includes:

{
  "event_type": "job.stage_completed",
  "job_id": "1",
  "timestamp": "2026-04-27T13:32:50Z",
  "payload": {
    "stage": "analysis",
    "result": {}
  }
}

Why Redis Streams?

Redis Streams was chosen because it satisfies:

  • message persistence
  • consumer groups
  • acknowledgment

Compared to Kafka, it provides a simpler operational setup while still demonstrating event streaming capabilities.


Asynchronous Processing

The API does not execute processing synchronously.

Instead:

  • requests enqueue a task
  • workers process jobs independently
API -> creates job -> enqueues job id -> returns response
Worker -> consumes job -> runs pipeline

This ensures responsiveness and scalability.


Resilience Strategy

The project implements retry with exponential backoff for provider calls.

This was chosen because provider failures and timeouts are often transient. Retrying immediately could increase pressure on the external provider, so backoff adds delay between attempts.

If retries are exhausted:

  • the current stage is marked as failed
  • the job is marked as failed
  • previous stage results are preserved
  • an error is stored in the job response

For a production system, I would consider adding:

  • circuit breaker for repeated provider outages
  • outbox pattern for event publication failures
  • dead letter queue for later inspection/reprocessing

Event Consumer

A custom Django management command:

python manage.py consume_event

This simulates a downstream service by:

  • reading events
  • logging them
  • acknowledging processing

Trade-offs

Authentication was not implemented because it was not required by the challenge. In a production environment, I would secure the API using JWT or OAuth2.

The providers are mocked as internal classes instead of external services. This keeps the challenge focused on orchestration, abstraction, async processing, and event streaming. The design allows replacing them with real HTTP clients without changing the pipeline logic.


Running the Project

cp .env.example .env
docker compose up --build

Run migrations:

docker compose exec web python manage.py migrate

Run RQ worker:

The RQ worker is started automatically by Docker Compose.

Run event consumer:

docker compose exec web python manage.py consume_event

Running Tests

docker compose exec web pytest

About

Asynchronous document processing service with configurable pipeline, pluggable providers, and event-driven architecture using Redis Streams and background workers.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors