Practical step-by-step guide. Explains what the platform does, when to use it, and how to work with it in three ways: no-code UI, YAML editor, Python SDK.
You have raw data (CSV from an HR database, e-commerce orders, weather data from an API) and need to transform it into a clean, validated file ready for analysis or an ML model.
The typical path:
- Write an ad-hoc Python script
- The script does everything: reads, cleans, validates, saves — it's a fragile monolith
- Requirements change → rewrite the script
- Want to scale to larger datasets → rewrite again
ArrowFlow separates each operation into an independent service. You can compose steps like LEGO blocks, swap one without touching others, and scale only the slow part.
Plus: describe what you want in natural language, and the system generates and runs the pipeline for you.
Stack running (if not started):
make quickstart # first time: copies .env, build, start, load demo data
# or, if already built:
make up
make demo-data # loads hr_demo/data.csv and ecommerce_demo/data.csv into containersVerify services are healthy:
curl http://localhost:5001/health # extract-csv-service
curl http://localhost:5002/health # clean-nan-serviceWhen to use: you want to explore the platform or build a pipeline without writing YAML.
You'll see four tabs: Pipeline Editor, Execution, Datasets, Services.
The Execution tab now includes:
- Platform Readiness checks (Airflow, Streamlit, Prometheus, Grafana)
- Quick Airflow Triggers (trigger core DAGs directly from Streamlit)
- Execution Insights (slowest step, processed data, orchestration overhead)
In the chat panel, type something like:
"Load the HR dataset, check data quality, remove outliers on monthly salary and save as CSV"
The panel shows:
- The YAML generated by the AI agent
- An Execute button
- Real-time execution log of each step
- At the bottom: preview of resulting data and download link
Note: AI generation requires
OPENAI_API_KEYin.env. Without a key you can still use the YAML Editor with pre-built pipelines.
Click Execute. You'll see steps complete sequentially (or in parallel if independent).
When done, a table preview and saved file path in /app/data/ appear.
You can also use Quick Airflow Triggers in the same tab to launch DAGs directly via Airflow API.
When to use: you already know the steps you want, don't need AI.
cat examples/pipelines/hr_analytics.yamlOpen http://localhost:8501 → tab Pipeline Editor → paste the content in the YAML editor.
The validator shows any errors (nonexistent service, missing parameter, cycle in graph) before execution even starts.
Click Run Pipeline. Each step shows status, duration, and rows in/out.
The hr_analytics.yaml pipeline does 6 steps in ~2-3 seconds on 501 rows:
extract → 501 rows read from /app/data/hr_demo/data.csv
quality → 0 violations found
drop_cols → 3 columns removed (EmployeeCount, Over18, StandardHours)
outliers → N outliers removed (MonthlyIncome, z_threshold=3.0)
clean → rows with null dropped
save → CSV file saved to /app/data/hr_analytics/...
| File | What it does |
|---|---|
hr_analytics.yaml |
HR attrition: quality → drop cols → outliers → clean → CSV |
ecommerce_analytics.yaml |
E-commerce orders: quality → outliers → fill median → Parquet |
weather_data.yaml |
Live weather (Open-Meteo, no API key) → quality → ffill → Parquet |
When to use: you integrate ArrowFlow into a notebook, batch script, or custom DAG.
import json
from preparator.preparator_v4 import Preparator
with open("preparator/services_config.json") as f:
config = json.load(f)with Preparator(config) as prep:
# 1. Extract
ipc = prep.extract_csv(
dataset_name="hr_demo",
file_path="/app/data/hr_demo/data.csv"
)
# 2. Quality
ipc = prep.check_quality(ipc, dataset_name="hr_demo", rules={
"min_rows": 10,
"check_null_ratio": True,
"threshold_null_ratio": 0.5,
"check_duplicates": True,
})
# 3. Remove unnecessary columns
ipc = prep.delete_columns(ipc, dataset_name="hr_demo",
columns=["EmployeeCount", "Over18", "StandardHours"])
# 4. Detect and remove salary outliers
ipc = prep.detect_outliers(ipc, dataset_name="hr_demo",
column="MonthlyIncome", z_threshold=3.0)
# 5. Clean nulls
ipc = prep.clean_nan(ipc, dataset_name="hr_demo", strategy="drop")
# 6. Save
result = prep.load_data(ipc, dataset_name="hr_demo", format="csv")
print(result) # {"status": "ok", "path": "/app/data/hr_demo/...", "rows": ...}Each call is HTTP to the corresponding microservice. The
Preparatorhandles automatic retries (3 attempts, 0.5s backoff) and propagates the samecorrelation_idacross all hops.
import pyarrow as pa
# ipc is Arrow IPC bytes — convertible at any time
reader = pa.ipc.open_stream(ipc)
df = reader.read_all().to_pandas()
print(df.shape)When to use: you want to schedule the pipeline, notifications on failure, automatic retries, execution history.
http://localhost:8080 — admin / admin
Go to DAGs → search for hr_analytics_pipeline → toggle On → click Trigger DAG ▶.
Click the run to see the task graph and log for each step.
Click Trigger DAG w/ config and pass JSON:
{
"dataset_name": "hr_demo",
"output_format": "parquet",
"z_threshold": 2.5,
"use_file_xcom": true
}use_file_xcom: true causes Arrow data between tasks to be written to the shared volume
(instead of passing through PostgreSQL) — essential for datasets >50k rows.
| DAG | Schedule | Description |
|---|---|---|
hr_analytics_pipeline |
manual | HR attrition: 6 steps, parameterized |
ecommerce_pipeline |
manual | Orders: 5 steps, parameterized |
weather_api_pipeline |
@hourly |
Live API → Parquet (no API key) |
parametrized_preparator_v4_quality |
manual | Generic quality pipeline |
parametrized_preparator_v4_quality_join |
manual | Pipeline with two-dataset join |
After any pipeline completes (UI, Airflow, or SDK), output files and metadata are written
to the shared volume at /app/data/<dataset_name>/.
Open http://localhost:8501 → Datasets tab to:
- Output Files — preview CSV, Parquet, JSON, or Excel files and download them directly
- Pipeline Runs — view run history grouped by
correlation_id, with:- run timeline (active processing vs queue/orchestration gap)
- compact run comparison (current vs previous successful run)
- per-step duration, row counts, and service details
- Raw Metadata — inspect the JSON metadata files written by each service
The dataset overview also includes a Business KPI snapshot from the latest output (for example: revenue/AOV for e-commerce, attrition rate for HR).
Select any dataset from the sidebar dropdown to explore its contents.
http://localhost:3000 — admin / change-me-strong-password
The ETL Microservices — Monitoring Overview dashboard shows:
- Services UP/DOWN in real-time
- Request rate per service (req/s)
- Error % in last 5 minutes
- CPU and memory of containers (cAdvisor)
After launching a pipeline, counters requests_total and success_total rise
in the same order as services are called.
# All logs from a specific run (correlation_id in UI body or SDK output)
docker compose logs | grep '"correlation_id": "abc-123"'
# Or with jq (if available)
docker compose logs 2>&1 | jq 'select(.correlation_id == "abc-123")'After each pipeline, every service writes a JSON file to /app/data/<dataset_name>/metadata/:
docker exec extract-csv-service ls /app/data/hr_demo/metadata/
# metadata_extract_csv_20260301T184000Z.json
# metadata_clean_nan_20260301T184001Z.json
# ...| Symptom | Likely Cause | Fix |
|---|---|---|
curl /health → connection refused |
service not started | docker compose ps + docker compose up -d <service> |
| Streamlit shows "AI agent not available" | OPENAI_API_KEY missing |
add key to .env + docker compose restart streamlit-app |
| Pipeline fails with "file not found" | demo data not loaded | make demo-data |
| Datasets tab shows no datasets | no pipelines have run yet | run a pipeline first, then refresh |
| Airflow task fail "No module named..." | Airflow container not updated | docker compose up -d --build airflow |
| Grafana "No data" in panels | services not scraped yet | wait 15s or run a pipeline to generate traffic |