Technical architecture document for contributors and developers. Explains why the system is designed this way, not just how it works.
- Overview
- Data Flow: Extract → Transform → Load
- Inter-service Communication Protocol
- Internal Structure of Each Microservice
- Preparator SDK — Client-side Orchestration
- Pipeline Compiler — Parallelism and Topological DAG
- AI Agent — Natural Language to Pipeline
- Multi-worker: Gunicorn and Implications
- Observability: Tracing, Metrics, Logging
- Airflow and File-based XCom
- Security Design
- Trade-offs and Known Technical Debt
The platform implements a decomposable ETL: each data operation (extract, clean, validate, load) is an independent Flask service. Orchestration is separated from execution.
┌─────────────────────────────────────────────────────┐
│ Orchestration Layer │
│ │
│ ┌──────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Airflow │ │ AI Agent │ │ Streamlit │ │
│ │ (DAGs) │ │ (NL → YAML) │ │ (UI) │ │
│ └────┬─────┘ └──────┬──────┘ └──────┬──────┘ │
└───────┼────────────────┼─────────────────┼─────────┘
│ │ │
└────────────────┴─────────────────┘
│
Preparator SDK (v4)
HTTP + Arrow IPC + retry
│
┌─────────────────────────────────┼──────────────────────────────────┐
│ ETL Services Layer │
│ │
│ Extract Transform Load │
│ ──────── ───────────────────────────── ──── │
│ csv :5001 clean-nan :5002 :5009 │
│ sql :5005 delete-cols :5004 │
│ api :5006 data-quality :5010 │
│ excel:5007 outliers :5011 │
│ join :5008 │
│ llm-complete :5012 │
└────────────────────────────────────────────────────────────────────┘
│ │
┌─────────┴─────────┐ ┌─────────┴─────────┐
│ Shared Volume │ │ Docker Network │
│ /app/data │ │ etl-network │
│ (Arrow IPC, │ │ (container DNS) │
│ metadata, │ └───────────────────┘
│ XCom files) │
└───────────────────┘
Guiding principle: no service knows about the others. They communicate only via the Preparator.
All data exchanges use Apache Arrow IPC Streaming (not CSV, not JSON).
[CSV File]
│
▼
[extract-csv-service]
│ POST /extract-csv body: {"file_path": "...", "dataset_name": "..."}
│ ← response: binary Apache Arrow IPC stream (~165 KB per 500 HR rows)
│
▼ ipc_bytes
[clean-nan-service]
│ POST /clean-nan body: <Arrow IPC> header X-Params: {"strategy": "fill_median"}
│ ← response: transformed Arrow IPC
│
▼ ipc_bytes
[load-data-service]
│ POST /load-data body: <Arrow IPC> header X-Params: {"format": "parquet"}
│ ← response: {"status": "ok", "path": "..."} (JSON, only exception)
▼
[Parquet file on shared volume]
| Criterion | CSV/JSON | Arrow IPC |
|---|---|---|
| Payload size | 1x (baseline) | 0.3–0.6x (columnar + compression) |
| Parse overhead | High (row-by-row deserialization) | Nearly zero (zero-copy read) |
| Type safety | None (everything string) | Typed schema (int64, float64, date, ...) |
| Interoperability | Universal | Native in Python, Java, R, Rust, C++ |
HTTP+Arrow serialization overhead is ~5–15ms per hop, negligible compared to actual processing.
There are 4 distinct patterns, depending on service type:
POST /extract-csv
Content-Type: application/json
X-Correlation-ID: <uuid>
{"file_path": "/app/data/demo/hr_sample.csv", "dataset_name": "hr"}
→ 200 OK
Content-Type: application/vnd.apache.arrow.stream
<binary Arrow IPC>
POST /clean-nan
Content-Type: application/vnd.apache.arrow.stream
X-Params: {"strategy": "fill_median", "dataset_name": "hr"}
X-Correlation-ID: <uuid>
<binary Arrow IPC in body>
→ 200 OK
Content-Type: application/vnd.apache.arrow.stream
<binary transformed Arrow IPC>
Parameters go in the X-Params header (JSON string) because the body is already occupied by data.
This avoids URL length limits and supports complex nested objects.
The join requires two independent inputs → multipart/form-data with fields file1 and file2
(both Arrow IPC) and parameters (join_key, join_type) in X-Params.
Only exception: load-data-service responds with JSON ({"status": "ok", "path": "...", "rows": N}).
Input body is always Arrow IPC; X-Params carries the output format (csv, json, xlsx, parquet).
Every service follows the Flask app factory pattern with Blueprint:
services/<name>/
├── Dockerfile FROM python:3.9-slim, gunicorn 4 workers
├── requirements.txt
├── run.py dev entry point (not used in Docker)
└── app/
├── __init__.py create_app(): factory, config, blueprint registration
├── routes.py HTTP concerns: parse, dispatch, metrics, metadata
└── <logic>.py Pure data logic: no Flask imports
routes.py logic.py
───────────────────────── ─────────────────────────────
parse request receive pa.Table / pd.DataFrame
read X-Params header apply transformation
increment Prometheus counters return pa.Table + stats dict
call logic function (no Flask, no HTTP, no I/O)
deserialize Arrow IPC
call logic()
serialize result to Arrow IPC
write metadata JSON file
build Response
propagate X-Correlation-ID
Pure logic in logic.py is testable without HTTP scaffolding — just instantiate a
pa.Table and call the function. Unit tests never use app.test_client() for transformations.
create_app() configures three fundamental things: MAX_CONTENT_LENGTH = 500 MB, structured
JSON logging via configure_service_logging(), and JSON handlers for 404/413 (instead of
Flask's default HTML).
The Preparator (preparator/preparator_v4.py) is the client layer that abstracts all HTTP communication.
Airflow DAGs, AI Agent, Streamlit: all call exclusively via the Preparator.
self.session = requests.Session()
# A single Session reuses TCP connections between consecutive calls
# → eliminates TCP handshake overhead on each hop of the pipelineretry_strategy = Retry(
total=3,
backoff_factor=0.5, # sleep 0.5s, 1s, 2s between attempts
status_forcelist=[502, 503, 504], # only transient errors
allowed_methods=["POST", "GET"],
)Only transient errors (gateway/timeout) trigger retry. 4xx errors (bad request) are not retried — they would be deterministic.
DEFAULT_TIMEOUT = (5, 300)
# connect_timeout=5s: service must respond immediately to TCP connect
# read_timeout=300s: but processing can take up to 5 minutes
# (e.g. LLM inference on large dataset)self.correlation_id = correlation_id or str(uuid.uuid4())
# Every HTTP call includes:
headers={"X-Correlation-ID": self.correlation_id}One correlation_id per Preparator instance → all hops of a single pipeline execution
share the same ID → logs from all services are linkable.
with Preparator(config) as prep:
ipc = prep.extract_csv(dataset_name="hr", file_path="...")
ipc = prep.clean_nan(ipc, strategy="fill_median")
prep.load_data(ipc, format="parquet")
# session.close() called automatically in __exit__The PipelineCompiler (ai_agent/pipeline_compiler.py) executes pipeline YAML with
real parallelism when steps don't depend on each other.
steps:
- id: extract # no depends_on → layer 0
service: extract_csv
- id: quality # depends on extract → layer 1
depends_on: [extract]
service: data_quality
- id: clean # depends on quality → layer 2
depends_on: [quality]
service: clean_nan
- id: remove_cols # depends on quality (not clean) → layer 2
depends_on: [quality]
service: delete_columns
- id: save # depends on both → layer 3
depends_on: [clean, remove_cols]
service: load_dataLayer 0: [extract] → executed first
Layer 1: [quality] → parallel (1 step)
Layer 2: [clean, remove_cols] → PARALLEL EXECUTION ✓
Layer 3: [save] → executed last
The algorithm computes in_degree for each node; nodes with degree zero form the current layer,
then decrements the degrees of children and repeats. If the graph has a cycle (e.g. A depends on B which depends on A), Kahn terminates before visiting all nodes → ValueError: cycle detected. Validation happens before execution.
def _execute_layer(self, layer, outputs, prep):
"""Execute all steps in the layer in parallel."""
with ThreadPoolExecutor(max_workers=len(layer)) as executor:
futures = {
executor.submit(self._execute_step, step, outputs, prep): step
for step in layer
}
for future in as_completed(futures):
step = futures[future]
result = future.result() # propagates exceptions
outputs[step["id"]] = result.data # save output for downstream stepsThread safety: each step writes to the outputs dict with a different key (its own id).
No shared data access → no race conditions.
Note: ThreadPoolExecutor is used (threads, not processes) because the bottleneck is I/O-bound
(HTTP calls), not CPU-bound. Python's GIL is not a problem here — threads release the GIL
during socket.send() and socket.recv().
Instead of:
# ❌ fragile, must update every time a service is added
if step["service"] == "extract_csv":
result = prep.extract_csv(...)
elif step["service"] == "clean_nan":
result = prep.clean_nan(...)
elif ...The compiler uses a registry:
# ✓ extensible: adding a service = adding 1 entry to the dict
registry = {
"extract_csv": lambda p, inp, ds, _: prep.extract_csv(file_path=p["file_path"], ...),
"clean_nan": lambda p, inp, ds, _: prep.clean_nan(inp, strategy=p.get("strategy","drop"), ...),
"load_data": lambda p, inp, ds, _: prep.load_data(inp, format=p.get("format","csv"), ...),
# ...
}
handler = registry[step["service"]]
output = handler(params, input_data, dataset_name, input_data_2)User: "extract hr_sample.csv, remove outliers, save as parquet"
│
▼
PipelineAgent.generate_pipeline()
│
├─► Builds system prompt from service_registry.json
│ (names, types, required and optional parameters of each service)
│
├─► Calls LLM (OpenAI GPT-4o-mini or local HuggingFace)
│
├─► Receives raw YAML
│
└─► validate_pipeline(yaml)
│
├─ JSON Schema validation (pipeline_schema.json)
├─ Service names exist in registry
├─ Required parameters present
├─ depends_on references valid steps
└─ No cycle in graph
│
▼
PipelineCompiler.execute(pipeline_def)
class LLMProvider(ABC):
@abstractmethod
def complete(self, system_prompt, user_prompt) -> str: ...
class OpenAIProvider(LLMProvider):
# GPT-4o-mini via API (requires OPENAI_API_KEY)
class LocalProvider(LLMProvider):
# Calls text-completion-llm-service (HuggingFace Llama 3.2 1B Instruct)
# Useful for air-gapped environments or testing without API costsSelection via env var: LLM_PROVIDER=openai (default) or LLM_PROVIDER=local.
Every Flask service runs under Gunicorn with 4 pre-fork workers:
gunicorn --workers=4 --timeout=300 --bind=0.0.0.0:<port> app:create_app()
Exception: text-completion-llm-service uses 1 worker (HuggingFace model ~1.5GB cannot
be replicated across 4 workers in memory).
The master forks 4 independent worker processes (separate memory spaces). Nginx/any reverse proxy does round-robin between them. The master handles OS signals and automatic restart of crashed workers.
Prometheus counters (Counter(...)) are in-process memory. Each worker maintains its
own counters. When Prometheus scrapes GET /metrics, it reaches only one worker per request.
Practical effect: the extract_csv_requests_total counter you see in Prometheus is the value
from the single worker that responded to the scrape request — typically ~1/4 of actual requests
(with 4 workers and round-robin distribution).
Unimplemented workaround (known technical debt): prometheus_client supports
multiprocess mode
to aggregate counters from all workers via a shared directory:
# Would require:
os.environ["PROMETHEUS_MULTIPROC_DIR"] = "/tmp/prometheus_multiproc"
from prometheus_client import CollectorRegistry, multiprocessPractical implication for monitoring: trends and rates remain significant and reliable for detecting anomalies. Absolute values are underestimated.
# text-completion-llm-service/app/completion.py
_model = None
_tokenizer = None
def get_model():
global _model, _tokenizer
if _model is None:
_model = AutoModelForCausalLM.from_pretrained(MODEL_PATH)
_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
return _model, _tokenizerThe model is loaded on first request, not at container startup. This allows the container to become "healthy" in ~3s instead of 30+s. First request will have higher latency (~15–30s cold start).
gunicorn --timeout=300 # 5 minutes per request
The longest estimated pipeline is LLM inference on 500k rows (~4 min). Preparator SDK uses
read_timeout=300s by default. If pipeline exceeds 5 minutes, Gunicorn kills the worker
and client receives 502 Bad Gateway → retry strategy triggers backoff.
Preparator.__init__()
│ generates UUID → self.correlation_id
│
├─► POST /extract-csv X-Correlation-ID: abc-123
├─► POST /clean-nan X-Correlation-ID: abc-123
├─► POST /load-data X-Correlation-ID: abc-123
Every service reads the correlation ID from the header (or generates a new one if absent) via
get_correlation_id() from common/service_utils.py, includes it in structured logs, and
propagates it in the response. Search for correlation ID in logs to reconstruct the entire trace.
Every log line is JSON on a single line with fields: timestamp, level, service, message,
correlation_id, dataset_name. Directly filterable with jq:
docker compose logs | jq 'select(.correlation_id == "abc-123")'Every service writes a JSON file to /app/data/<dataset_name>/metadata/ after processing:
{
"service": "clean-nan-service",
"dataset_name": "hr",
"timestamp": "2026-03-01T18:40:01Z",
"rows_in": 501,
"rows_out": 498,
"duration_sec": 0.043,
"strategy": "fill_median",
"columns_affected": 3,
"nulls_filled": 12
}These files are low-latency (written locally on shared volume) and allow reconstructing what happened at each step without depending on external systems.
The Streamlit UI now exposes runtime diagnostics directly in the Execution and Datasets tabs to reduce context switching between Airflow, Grafana, and logs.
- Platform Readiness: probes Airflow (
/health), Streamlit, Prometheus, and Grafana - Airflow scheduler heartbeat status: explicit healthy/not-ready signal from Airflow health payload
- Execution insights: successful steps, total processed KB, slowest step, orchestration overhead (%)
- Run timeline diagnostics: active processing vs queue/orchestration gap per step
- Run comparison: latest run vs previous successful run deltas for duration, final rows, and outliers removed
This UX layer does not replace Prometheus/Grafana but gives immediate operator feedback for common questions like "is the platform ready?", "where is time spent?", and "did this run improve or regress vs the previous one?".
every 15 seconds
Prometheus → GET /metrics on each service → saves counters to TSDB
retention: 15 days
storage: /etc/prometheus/data (volume etl-prometheus-data)
Airflow by default uses PostgreSQL to pass data between tasks via XCom (cross-communication). The practical limit of PostgreSQL for bytea is ~1MB; for Arrow IPC datasets with 50k rows this limit is easily exceeded.
# airflow/dags/xcom_file_utils.py
def save_ipc_to_shared(ipc_data, dataset_name, step_name) -> str:
"""Saves Arrow IPC to /app/data/<ds>/xcom/<step>_<ts>_<uuid>.arrow"""
path = f"/app/data/{dataset_name}/xcom/{step_name}_{ts}_{uuid}.arrow"
with open(path, "wb") as f:
f.write(ipc_data)
return path # ← returns only the path (string, <100 bytes) via XCom Postgres
def load_ipc_from_shared(file_path) -> bytes:
with open(file_path, "rb") as f:
return f.read()The etl-containers-shared-data volume is mounted in all containers (Airflow, all
microservices) at /app/data → files written by one task are readable by the next task.
XCom Postgres: task_A → task_B = "/app/data/hr/xcom/extract_20260301T184000Z_abc.arrow"
↑
small string → ok for Postgres
File system: task_A → task_B = Arrow IPC binary (165 KB, 2 MB, 50 MB...)
↑
actual data → ok for shared filesystem
# services/common/path_utils.py
def resolve_input_path(file_path: str) -> str:
allowed_root = os.environ.get("ETL_DATA_ROOT", "/app/data")
resolved = os.path.realpath(os.path.join(allowed_root, file_path.lstrip("/")))
if not resolved.startswith(os.path.realpath(allowed_root)):
raise ValueError(f"Parameter 'file_path' must stay under {allowed_root}")
return resolvedNo service accepts arbitrary paths. os.path.realpath resolves symlinks and .. before
checking. Attempting /etc/passwd or ../../secrets returns HTTP 400.
If ALLOW_PRIVATE_API_URLS=false (default), the service blocks URLs to:
localhost,127.*,10.*,192.168.*,172.16–31.*(RFC 1918)- Cloud metadata:
169.254.169.254(AWS/GCP/Azure IMDS)
Prevents SSRF (Server-Side Request Forgery) in Docker environments where other containers are reachable via internal DNS.
# extract-sql service does not accept multiple statements
if ";" in query.rstrip(";"):
raise ValueError("Multiple SQL statements are not allowed")
# Whitelist of allowed prefixes
ALLOWED_PREFIXES = ("SELECT ", "WITH ")
if not query.strip().upper().startswith(ALLOWED_PREFIXES):
raise ValueError("Only SELECT/WITH queries are allowed")| Decision | Alternative Rejected | Rationale |
|---|---|---|
| Arrow IPC over HTTP | gRPC + Protobuf | simpler setup, no proto schema needed |
| Thread pool for parallelism | asyncio | more readable, I/O-bound doesn't benefit from async |
| File-based XCom | Redis/Celery | no extra dependencies, uses already-present volume |
| X-Params header | URL query params | supports nested objects, no URL length limit |
python:3.9-slim |
newer images | stability; Arrow 18.x supports 3.9 |
-
Prometheus multi-process mode not enabled — Gunicorn counters are per-worker. Trends are reliable, absolute values underestimated by ~1/4x.
-
Arrow → Pandas → Arrow — most transforms convert to Pandas for processing. For datasets >1M rows use
pyarrow.computedirectly. Exception:load-data-servicewith Parquet writes directly from Arrow (viapyarrow.parquet). -
No authentication on microservices — open on Docker internal network. For production: API key header or mTLS between services.
-
Prometheus counters reset on container restart —
restart: alwaysin docker-compose is a workaround for crashes, but zeroes metrics. Consider Pushgateway for persistent metrics. -
No Kubernetes manifests — Docker Compose only. For horizontal scaling (multiple workers of the same service behind a load balancer) requires migration to K8s + Horizontal Pod Autoscaler.