This guide walks you through the two most common extension tasks:
- Adding a new microservice (transform, extract, or load)
- Creating a new pipeline (Airflow DAG or YAML for the AI agent)
Every microservice in this platform follows the same structure:
services/my-service/
├── Dockerfile # Container build (python:3.9-slim + gunicorn)
├── requirements.txt # Python dependencies
├── run.py # Local dev entry point (not used in Docker)
└── app/
├── __init__.py # Flask app factory
├── routes.py # HTTP endpoint + Prometheus metrics
└── logic.py # Pure business logic (no Flask imports)
The key design principle: routes.py handles HTTP, logic.py handles data. This separation makes logic unit-testable without HTTP scaffolding.
cp -r templates/new_service services/my-transform-serviceOpen every file in services/my-transform-service/ and replace:
| Placeholder | Example Value |
|---|---|
{{SERVICE_NAME}} |
my-transform-service |
{{SERVICE_SLUG}} |
my_transform |
{{SERVICE_PORT}} |
5013 |
{{ENDPOINT_NAME}} |
my-transform |
{{LOGIC_MODULE}} |
logic |
{{LOGIC_FUNCTION}} |
process_data |
Port allocation: Current max is 5012 (port 5003 is unused). Use 5013 for the next service.
Edit app/logic.py. Your function receives a pyarrow.Table and a params dict, and returns a pyarrow.Table:
import pyarrow as pa
import pandas as pd
def my_transform(table: pa.Table, params: dict) -> pa.Table:
df = table.to_pandas()
# Your transformation here
threshold = params.get('threshold', 0.5)
df = df[df['score'] >= threshold]
return pa.Table.from_pandas(df, preserve_index=False)Update routes.py to import and call your function instead of process_data.
preparator/services_config.json — add the URL mapping:
{
"my_transform": "http://my-transform-service:5013/my-transform"
}schemas/service_registry.json — add full metadata (used by the AI agent):
{
"name": "my_transform",
"type": "transform",
"description": "Filters rows by a score threshold",
"endpoint": "/my-transform",
"input_format": "arrow_ipc",
"output_format": "arrow_ipc",
"params": {
"threshold": {
"type": "number",
"required": false,
"default": 0.5,
"description": "Minimum score to keep"
}
}
}docker-compose.yml — add a service block:
my-transform-service:
build:
context: ./services
dockerfile: my-transform-service/Dockerfile
container_name: my-transform-service
ports:
- "5013:5013"
volumes:
- etl-containers-shared-data:/app/data
networks:
- etl-network
restart: unless-stoppedprometheus/prometheus.yml — add a scrape target:
- job_name: 'my-transform-service'
static_configs:
- targets: ['my-transform-service:5013']Edit preparator/preparator_v4.py:
def my_transform(self, ipc_data, dataset_name="default_dataset", threshold=0.5):
"""
'my_transform' microservice.
"""
params = {
"dataset_name": dataset_name,
"threshold": threshold,
}
return self.run_service_ipc_in_ipc_out("my_transform", ipc_data, params)tests/unit/test_logic.py:
import os
import sys
# Clear cached 'app' package to avoid namespace collision
for _mod in list(sys.modules):
if _mod == "app" or _mod.startswith("app."):
del sys.modules[_mod]
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "services", "my-transform-service"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "services"))
from app.logic import my_transform # noqa: E402
def test_my_transform_filters_correctly(sample_arrow_table):
result = my_transform(sample_arrow_table, {"threshold": 0.5})
assert result.num_rows > 0Important: The
sys.modulescleanup is mandatory. All services share theapp/namespace, so Python must be told to forget previousapp.*imports before loading a different service's module. Seetests/conftest.pyfor shared fixtures.
# Build the new service
docker compose build my-transform-service
# Start it
docker compose up -d my-transform-service
# Check health
curl http://localhost:5013/health
# Run unit tests
python -m pytest tests/unit/test_logic.py -vCreate a new file in airflow/dags/. Follow the existing pattern:
import json
import logging
from datetime import datetime
from airflow.decorators import task
from airflow.models.param import Param
from xcom_file_utils import cleanup_xcom_files, load_ipc_from_shared, save_ipc_to_shared
from airflow import DAG
from preparator.preparator_v4 import Preparator
logger = logging.getLogger("my_pipeline")
CONFIG_PATH = "/opt/airflow/preparator/services_config.json"
with DAG(
"my_pipeline",
default_args={"owner": "airflow", "start_date": datetime(2025, 1, 1), "retries": 1},
schedule_interval=None,
tags=["custom", "etl"],
params={
"dataset_name": Param("my_dataset", type="string"),
"file_path": Param("/app/data/my_dataset/data.csv", type="string"),
"output_format": Param("csv", type="string", enum=["csv", "xlsx", "json", "parquet"]),
"use_file_xcom": Param(True, type="boolean"),
},
) as dag:
@task.python
def extract(params: dict) -> str:
with open(CONFIG_PATH) as f:
config = json.load(f)
with Preparator(config) as prep:
ipc = prep.extract_csv(
dataset_name=params["dataset_name"],
file_path=params["file_path"],
)
if params.get("use_file_xcom", True):
return save_ipc_to_shared(ipc, params["dataset_name"], "extract")
return ipc.hex()
@task.python
def transform(data_ref: str, params: dict) -> str:
with open(CONFIG_PATH) as f:
config = json.load(f)
use_file = params.get("use_file_xcom", True)
ipc = load_ipc_from_shared(data_ref) if use_file else bytes.fromhex(data_ref)
with Preparator(config) as prep:
result = prep.clean_nan(ipc, dataset_name=params["dataset_name"])
if use_file:
return save_ipc_to_shared(result, params["dataset_name"], "transform")
return result.hex()
@task.python
def load(data_ref: str, params: dict):
with open(CONFIG_PATH) as f:
config = json.load(f)
use_file = params.get("use_file_xcom", True)
ipc = load_ipc_from_shared(data_ref) if use_file else bytes.fromhex(data_ref)
with Preparator(config) as prep:
result = prep.load_data(ipc, format=params.get("output_format", "csv"),
dataset_name=params["dataset_name"])
if use_file:
cleanup_xcom_files(params["dataset_name"])
return result
# DAG wiring
extracted = extract()
transformed = transform(extracted)
load(transformed)Key patterns:
- File-based XCom (
save_ipc_to_shared/load_ipc_from_shared) for large datasets - Preparator as context manager (
with Preparator(config) as prep:) - Parameterized via
Param()for runtime configuration - Cleanup XCom temp files in the final step
Create a YAML file following the pipeline schema (schemas/pipeline_schema.json):
pipeline:
name: my_pipeline
description: My custom ETL pipeline
steps:
- id: extract
service: extract_csv
params:
file_path: /app/data/my_dataset/data.csv
- id: clean
service: clean_nan
params:
strategy: fill_median
depends_on: [extract]
- id: save
service: load_data
params:
format: csv
depends_on: [clean]You can paste this into the Streamlit YAML Editor tab, or save it as a file and use the AI agent to execute it.
Available services for service field:
- Extract:
extract_csv,extract_sql,extract_api,extract_excel - Transform:
clean_nan,delete_columns,join_datasets,data_quality,outlier_detection,text_completion_llm - Load:
load_data
See examples/pipelines/ for complete working examples.
| Service Type | Request Body | Parameters | Response |
|---|---|---|---|
| Extract | JSON config | In JSON body | Arrow IPC binary |
| Transform | Arrow IPC binary | X-Params header (JSON string) |
Arrow IPC binary |
| Load | Arrow IPC binary | X-Params header |
JSON status |
| Join | Two Arrow IPC files (multipart form) | X-Params header |
Arrow IPC binary |
All services import these shared modules:
| Module | Key Functions |
|---|---|
arrow_utils.py |
ipc_to_table(), table_to_ipc() |
service_utils.py |
create_service_counters(), register_standard_endpoints(), parse_x_params(), get_correlation_id(), save_metadata() |
logging_config.py |
configure_service_logging(), get_correlation_logger() |
health.py |
create_health_response() |
path_utils.py |
sanitize_dataset_name(), resolve_input_path() |
json_utils.py |
NpEncoder (handles numpy types in JSON serialization) |
All services return errors as:
{"status": "error", "message": "Human-readable description"}400— Client error (bad parameters, missing data)500— Server error (processing failure)