Skip to content

Latest commit

 

History

History
302 lines (207 loc) · 9.52 KB

File metadata and controls

302 lines (207 loc) · 9.52 KB

Demo Guide — How to Use ArrowFlow

Practical step-by-step guide. Explains what the platform does, when to use it, and how to work with it in three ways: no-code UI, YAML editor, Python SDK.


Why ArrowFlow? — The Problem It Solves

You have raw data (CSV from an HR database, e-commerce orders, weather data from an API) and need to transform it into a clean, validated file ready for analysis or an ML model.

The typical path:

  1. Write an ad-hoc Python script
  2. The script does everything: reads, cleans, validates, saves — it's a fragile monolith
  3. Requirements change → rewrite the script
  4. Want to scale to larger datasets → rewrite again

ArrowFlow separates each operation into an independent service. You can compose steps like LEGO blocks, swap one without touching others, and scale only the slow part.

Plus: describe what you want in natural language, and the system generates and runs the pipeline for you.


Prerequisites

Stack running (if not started):

make quickstart   # first time: copies .env, build, start, load demo data
# or, if already built:
make up
make demo-data    # loads hr_demo/data.csv and ecommerce_demo/data.csv into containers

Verify services are healthy:

curl http://localhost:5001/health  # extract-csv-service
curl http://localhost:5002/health  # clean-nan-service

Scenario A — Streamlit UI (zero code)

When to use: you want to explore the platform or build a pipeline without writing YAML.

1. Open the UI

http://localhost:8501

You'll see four tabs: Pipeline Editor, Execution, Datasets, Services.

The Execution tab now includes:

  • Platform Readiness checks (Airflow, Streamlit, Prometheus, Grafana)
  • Quick Airflow Triggers (trigger core DAGs directly from Streamlit)
  • Execution Insights (slowest step, processed data, orchestration overhead)

2. Chat Tab — describe the pipeline in natural language

In the chat panel, type something like:

"Load the HR dataset, check data quality, remove outliers on monthly salary and save as CSV"

The panel shows:

  • The YAML generated by the AI agent
  • An Execute button
  • Real-time execution log of each step
  • At the bottom: preview of resulting data and download link

Note: AI generation requires OPENAI_API_KEY in .env. Without a key you can still use the YAML Editor with pre-built pipelines.

3. Execute and observe

Click Execute. You'll see steps complete sequentially (or in parallel if independent). When done, a table preview and saved file path in /app/data/ appear.

You can also use Quick Airflow Triggers in the same tab to launch DAGs directly via Airflow API.


Scenario B — YAML Editor (pre-built pipeline)

When to use: you already know the steps you want, don't need AI.

1. Copy an example pipeline

cat examples/pipelines/hr_analytics.yaml

2. Paste it in the editor

Open http://localhost:8501 → tab Pipeline Editor → paste the content in the YAML editor.

The validator shows any errors (nonexistent service, missing parameter, cycle in graph) before execution even starts.

3. Execute

Click Run Pipeline. Each step shows status, duration, and rows in/out.

The hr_analytics.yaml pipeline does 6 steps in ~2-3 seconds on 501 rows:

extract     → 501 rows read from /app/data/hr_demo/data.csv
quality     → 0 violations found
drop_cols   → 3 columns removed (EmployeeCount, Over18, StandardHours)
outliers    → N outliers removed (MonthlyIncome, z_threshold=3.0)
clean       → rows with null dropped
save        → CSV file saved to /app/data/hr_analytics/...

4. Available pipelines in examples/pipelines/

File What it does
hr_analytics.yaml HR attrition: quality → drop cols → outliers → clean → CSV
ecommerce_analytics.yaml E-commerce orders: quality → outliers → fill median → Parquet
weather_data.yaml Live weather (Open-Meteo, no API key) → quality → ffill → Parquet

Scenario C — Python SDK (direct scripting)

When to use: you integrate ArrowFlow into a notebook, batch script, or custom DAG.

Setup

import json
from preparator.preparator_v4 import Preparator

with open("preparator/services_config.json") as f:
    config = json.load(f)

HR pipeline in ~10 lines

with Preparator(config) as prep:
    # 1. Extract
    ipc = prep.extract_csv(
        dataset_name="hr_demo",
        file_path="/app/data/hr_demo/data.csv"
    )

    # 2. Quality
    ipc = prep.check_quality(ipc, dataset_name="hr_demo", rules={
        "min_rows": 10,
        "check_null_ratio": True,
        "threshold_null_ratio": 0.5,
        "check_duplicates": True,
    })

    # 3. Remove unnecessary columns
    ipc = prep.delete_columns(ipc, dataset_name="hr_demo",
                              columns=["EmployeeCount", "Over18", "StandardHours"])

    # 4. Detect and remove salary outliers
    ipc = prep.detect_outliers(ipc, dataset_name="hr_demo",
                               column="MonthlyIncome", z_threshold=3.0)

    # 5. Clean nulls
    ipc = prep.clean_nan(ipc, dataset_name="hr_demo", strategy="drop")

    # 6. Save
    result = prep.load_data(ipc, dataset_name="hr_demo", format="csv")
    print(result)  # {"status": "ok", "path": "/app/data/hr_demo/...", "rows": ...}

Each call is HTTP to the corresponding microservice. The Preparator handles automatic retries (3 attempts, 0.5s backoff) and propagates the same correlation_id across all hops.

Convert result to Pandas

import pyarrow as pa

# ipc is Arrow IPC bytes — convertible at any time
reader = pa.ipc.open_stream(ipc)
df = reader.read_all().to_pandas()
print(df.shape)

Scenario D — Airflow DAG (scheduled / production)

When to use: you want to schedule the pipeline, notifications on failure, automatic retries, execution history.

1. Open Airflow

http://localhost:8080admin / admin

2. Enable and trigger a DAG

Go to DAGs → search for hr_analytics_pipeline → toggle On → click Trigger DAG ▶.

Click the run to see the task graph and log for each step.

3. Trigger with custom config

Click Trigger DAG w/ config and pass JSON:

{
  "dataset_name": "hr_demo",
  "output_format": "parquet",
  "z_threshold": 2.5,
  "use_file_xcom": true
}

use_file_xcom: true causes Arrow data between tasks to be written to the shared volume (instead of passing through PostgreSQL) — essential for datasets >50k rows.

Available DAGs

DAG Schedule Description
hr_analytics_pipeline manual HR attrition: 6 steps, parameterized
ecommerce_pipeline manual Orders: 5 steps, parameterized
weather_api_pipeline @hourly Live API → Parquet (no API key)
parametrized_preparator_v4_quality manual Generic quality pipeline
parametrized_preparator_v4_quality_join manual Pipeline with two-dataset join

Browsing Results — Dataset Explorer

After any pipeline completes (UI, Airflow, or SDK), output files and metadata are written to the shared volume at /app/data/<dataset_name>/.

Open http://localhost:8501Datasets tab to:

  • Output Files — preview CSV, Parquet, JSON, or Excel files and download them directly
  • Pipeline Runs — view run history grouped by correlation_id, with:
    • run timeline (active processing vs queue/orchestration gap)
    • compact run comparison (current vs previous successful run)
    • per-step duration, row counts, and service details
  • Raw Metadata — inspect the JSON metadata files written by each service

The dataset overview also includes a Business KPI snapshot from the latest output (for example: revenue/AOV for e-commerce, attrition rate for HR).

Select any dataset from the sidebar dropdown to explore its contents.


What to observe during execution

Metrics in Grafana

http://localhost:3000admin / change-me-strong-password

The ETL Microservices — Monitoring Overview dashboard shows:

  • Services UP/DOWN in real-time
  • Request rate per service (req/s)
  • Error % in last 5 minutes
  • CPU and memory of containers (cAdvisor)

After launching a pipeline, counters requests_total and success_total rise in the same order as services are called.

Structured logs

# All logs from a specific run (correlation_id in UI body or SDK output)
docker compose logs | grep '"correlation_id": "abc-123"'

# Or with jq (if available)
docker compose logs 2>&1 | jq 'select(.correlation_id == "abc-123")'

Metadata audit

After each pipeline, every service writes a JSON file to /app/data/<dataset_name>/metadata/:

docker exec extract-csv-service ls /app/data/hr_demo/metadata/
# metadata_extract_csv_20260301T184000Z.json
# metadata_clean_nan_20260301T184001Z.json
# ...

Quick Troubleshooting

Symptom Likely Cause Fix
curl /health → connection refused service not started docker compose ps + docker compose up -d <service>
Streamlit shows "AI agent not available" OPENAI_API_KEY missing add key to .env + docker compose restart streamlit-app
Pipeline fails with "file not found" demo data not loaded make demo-data
Datasets tab shows no datasets no pipelines have run yet run a pipeline first, then refresh
Airflow task fail "No module named..." Airflow container not updated docker compose up -d --build airflow
Grafana "No data" in panels services not scraped yet wait 15s or run a pipeline to generate traffic