A complete ETL (Extract, Transform, Load) pipeline system built with Apache Airflow, featuring REST API access, web dashboard, and full Docker containerization.
- Overview
- Features
- Prerequisites
- Quick Start
- Project Structure
- Accessing Services
- Running DAGs
- Common Commands
- Troubleshooting
- Development
- Documentation
This project implements a comprehensive ETL pipeline for processing e-commerce data (customers, products, stores, sales, exchange rates) with the following capabilities:
- 5 Source DAGs: Individual pipelines for each data source
- 1 Master Orchestrator: Coordinates all pipeline executions
- 1 Reporting DAG: Generates 9 analytical reports
- REST API: 13 endpoints for monitoring and querying
- Web Dashboard: Interactive UI for easy pipeline monitoring
- Full Containerization: 6 Docker services working together
- Input: CSV files from
data/raw/dataset/ - Output: PostgreSQL database (
etl_outputschema) - Processing: Data cleaning, transformation, validation, and reporting
- Volume: ~47,000+ records processed across 5 tables
- ✅ Sprint 2: Data Quality & Cleaning
- ✅ Sprint 3: Advanced Transformations
- ✅ Sprint 4: Database Loading Strategies
- ✅ Sprint 5: DAG Orchestration
- ✅ Sprint 6: Combined E-T-L Pipelines
- ✅ Sprint 7: REST API & Monitoring
- ✅ Sprint 8: Docker Deployment
- 🔄 Event-driven DAG triggering with ExternalTaskSensor
- 📊 9 automated business reports (customer segmentation, sales trends, product performance, etc.)
- 🔍 Data quality validation with rejected records tracking
- 🎯 Incremental and full load strategies
- 🚀 Bulk operations with optimized chunking
- 📈 Real-time monitoring via REST API
- 🌐 User-friendly web dashboard
- 🗄️ Database visualization with pgAdmin
Before you begin, ensure you have the following installed:
- Docker Desktop: Version 20.10 or higher
- Docker Compose: Version 2.0 or higher
- Git: For cloning the repository
- 4GB RAM: Minimum available memory for Docker
- 10GB Disk Space: For images and volumes
# Check Docker
docker --version
docker-compose --version
# Check Docker is running
docker psgit clone https://github.com/SammyBoy-09/Airflow_ETL.git
cd Airflow_ETLEnsure your CSV data files are in the correct location:
data/raw/dataset/
├── Customers.csv
├── Products.csv
├── Stores.csv
├── Sales.csv
└── Exchange_Rates.csv
First-Time Users: If you don't have data files, the project includes sample data. Verify the files exist:
ls data\raw\datasetcd Docker
docker-compose up -dThis single command will:
- Pull required Docker images (first time: ~5-10 minutes, downloads ~2GB)
- Create a PostgreSQL database with Airflow metadata schema
- Initialize Airflow database tables
- Start 15 services (Postgres, Redis, Airflow Webserver, Scheduler, REST API, pgAdmin, Grafana, Prometheus, Loki, etc.)
- Set up internal Docker networking
- Create persistent volumes for data storage
What to Expect on First Run:
[+] Running 16/16
✔ Network docker_etl-network Created
✔ Container docker-postgres-1 Healthy
✔ Container docker-redis-1 Healthy
✔ Container docker-loki-1 Healthy
✔ Container docker-airflow-init-1 Exited (0)
✔ Container docker-webserver-1 Healthy
✔ Container docker-scheduler-1 Healthy
✔ Container docker-api-1 Healthy
✔ Container docker-grafana-1 Started
✔ Container docker-prometheus-1 Started
...
IMPORTANT: First-time setup requires ~3-5 minutes for complete initialization:
- Minute 1-2: PostgreSQL database creation and Airflow schema setup
- Minute 2-3: Airflow webserver starting and DAG parsing
- Minute 3-4: All services becoming healthy
- Minute 4-5: DAGs appearing in Airflow UI
Check service status:
docker-compose psAll services should show Up with status (healthy):
NAME STATUS
docker-webserver-1 Up (healthy)
docker-scheduler-1 Up
docker-postgres-1 Up (healthy)
docker-redis-1 Up (healthy)
...
If services show as starting or unhealthy, wait an additional 1-2 minutes.
Open your browser and navigate to:
- Airflow UI: http://localhost:8080
- Username:
airflow - Password:
airflow
- Username:
First Login: The Airflow UI will show a list of DAGs. Initially, all DAGs will be paused (gray toggle). This is normal!
cd Docker
docker compose run --rm webserver airflow users create --username airflow --firstname Airflow --lastname Admin --role Admin --email admin@airflow.com --password airflowOr to create a custom admin user:
docker compose run --rm webserver airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password adminAfter running this, you should be able to login with the credentials you specified.
Expected DAGs (12 total):
Core ETL Pipeline (REQUIRED - Run These First):
etl_master_orchestrator- Master pipeline coordinator (start here!)etl_customers- Customer data pipelineetl_products- Product data pipelineetl_stores- Store data pipelineetl_exchange_rates- Exchange rate pipelineetl_sales- Sales transaction pipelineetl_reports- Report generation pipelineetl_data_quality- Data quality checks
Advanced Features (OPTIONAL - Skip on First Run):
etl_file_watcher- Monitors for new files (runs automatically every 5 min)etl_sql_ingestion- Extract from existing DB tables (requires main pipeline to run first)etl_api_ingestion- Extract from REST APIs (demo feature)etl_json_ingestion- Extract from JSON files (demo feature)
- Only trigger
etl_master_orchestratoron first run - The advanced ingestion DAGs (SQL/API/JSON) will fail if main pipeline hasn't run yet
- File watcher runs automatically - no manual trigger needed
pgAdmin provides a graphical interface to view and query the PostgreSQL database.
Step 6.1: Open pgAdmin at http://localhost:5050
Step 6.2: Login with default credentials:
- Email:
admin@admin.com - Password:
admin
Step 6.3: Register the PostgreSQL Server
-
In pgAdmin, right-click "Servers" in left sidebar
-
Select "Register" → "Server..."
-
Fill in the General Tab:
- Name:
Airflow ETL Database(or any name you prefer)
- Name:
-
Fill in the Connection Tab:
- Host name/address:
postgres(this is the Docker service name) - Port:
5432(internal Docker port, NOT 5434) - Maintenance database:
airflow - Username:
airflow - Password:
airflow
- Host name/address:
-
Click "Save"
Step 6.4: Verify Connection
After saving, you should see:
Servers
└── Airflow ETL Database
└── Databases (2)
├── airflow (Airflow metadata)
└── postgres (system database)
Step 6.5: View ETL Data
-
Expand: Servers → Airflow ETL Database → Databases → airflow → Schemas
-
You'll see two schemas:
- public: Airflow system tables (dag_run, task_instance, etc.)
- etl_output: Your ETL data tables (customers, products, sales, etc.)
-
To view data:
- Right-click on
etl_output.customers→ View/Edit Data → First 100 Rows - Or use the Query Tool: Tools → Query Tool, then run:
SELECT * FROM etl_output.customers LIMIT 100;
- Right-click on
Troubleshooting pgAdmin Connection:
- Error "Unable to connect to server":
- Make sure PostgreSQL container is healthy:
docker-compose ps postgres - Use
postgresas hostname (NOTlocalhost) - Use port
5432(NOT 5434)
- Make sure PostgreSQL container is healthy:
- Access Denied: Verify username/password are both
airflow
The web dashboard provides a modern, user-friendly interface for monitoring all DAGs, viewing execution logs, and downloading processed data.
Step 7.1: Create Python Environment
# If using Conda
conda create -n airflow_env python=3.11
conda activate airflow_env
# If using venv
python -m venv airflow_env
.\airflow_env\Scripts\Activate.ps1Step 7.2: Install Dependencies
# Navigate to project root
cd D:\sam\Projects\Infosys\Airflow
# Install requirements
pip install -r requirements.txtThis will install:
- Flask 3.1.0 (web framework)
- pandas 2.2.3 (data processing)
- requests 2.32.3 (API calls)
- python-dotenv 1.0.1 (config management)
Step 7.3: Start the Web Dashboard
# Ensure you're in project root
cd D:\sam\Projects\Infosys\Airflow
# Start Flask dashboard
python scripts/api/web_dashboard.pyExpected Output:
* Serving Flask app 'web_dashboard'
* Debug mode: off
🚀 Airflow API Web Dashboard
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
API Endpoint: http://localhost:8000
Dashboard URL: http://localhost:5000
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Dashboard Features:
• DAG Overview & Status Monitoring
• Real-time Task Execution Tracking
• Interactive Log Viewer
• Data Downloads (Bronze/Silver/Gold Layers)
• Performance Metrics & Charts
Press Ctrl+C to stop the server
Step 7.4: Access the Web Dashboard
Open http://localhost:5000 in your browser.
Dashboard Features:
- Overview Tab: Summary of all DAGs with status
- DAG Details: Execution history and task breakdown
- Logs Tab: Real-time log streaming
- Downloads Tab: Download processed datasets and reports
- Metrics Tab: Performance analytics
Keep Terminal Running: The web dashboard requires the terminal to stay open. The REST API at http://localhost:8000 must also be running (it's part of the Docker stack).
Before running pipelines, verify all components are working:
Check 1: Docker Services
docker-compose psAll services should be Up and healthy.
Check 2: Airflow UI
- Open http://localhost:8080
- Login successful
- DAGs visible in list
Check 3: REST API
# Test API health
Invoke-RestMethod -Uri "http://localhost:8000/health"Should return: {"status":"healthy","timestamp":"..."}
Check 4: Database Connection
# Test database from container
docker exec -it docker-postgres-1 psql -U airflow -d airflow -c "\dt etl_output.*"Should show: "Did not find any relations" (tables will appear after first DAG run)
Check 5: Web Dashboard (if running)
- Open http://localhost:5000
- Should show empty state (no runs yet)
All Checks Passed? You're ready to trigger your first DAG! 🎉
By default, the admin user has username/password airflow/airflow. You can create additional users:
# Access Airflow container
docker exec -it docker-webserver-1 bash
# Create a new admin user
airflow users create \
--username your_username \
--firstname Your \
--lastname Name \
--role Admin \
--email your.email@example.com
# You'll be prompted to enter a password
# Exit container
exitAvailable Roles:
- Admin: Full access to all features
- User: Can view and trigger DAGs
- Viewer: Read-only access
- Op: Operations (manage connections, variables)
You're now ready to execute the ETL pipeline!
Option A: Using Airflow UI (Recommended for beginners)
- Open http://localhost:8080
- Find
etl_master_orchestratorin the DAG list - Click the toggle switch on the left to unpause the DAG (should turn blue/green)
- Click the "Play" button (▶) on the right
- Select "Trigger DAG"
- Optional: Click "Trigger DAG w/ config" to add parameters
- Click "Trigger" to start execution
What Happens Next:
- Master orchestrator starts
- Triggers 4 dimension DAGs in parallel: customers, products, stores, exchange_rates
- Waits for products to complete, then triggers sales (fact table)
- After all data loads, triggers reports DAG (generates 9 reports)
- Finally triggers data quality checks
Expected Duration: 5-8 minutes for complete pipeline execution
Option B: Using Web Dashboard
- Open http://localhost:5000
- Select
etl_master_orchestratorfrom dropdown - Click "Trigger DAG" button
- Monitor execution in real-time
Option C: Using REST API
Invoke-RestMethod -Uri "http://localhost:8000/api/v1/dags/etl_master_orchestrator/trigger" `
-Method POST `
-Headers @{"X-API-Key"="dev-key-12345"}In Airflow UI:
- Click on
etl_master_orchestratorDAG name - Click on the latest run (shown with execution date)
- Click "Graph" view to see task dependencies
- Green = Success, Red = Failed, Yellow = Running, Gray = Not started
- Click any task → "Log" to see detailed execution logs
In Web Dashboard:
- Go to http://localhost:5000
- Select DAG from dropdown
- View real-time status updates
- Click "View Logs" for any task
Common Things to Watch:
- Dimension DAGs (customers, products, stores, exchange_rates) run in parallel - should all complete within 1-2 minutes
- Sales DAG waits for products completion - starts after dimensions finish
- Reports DAG generates 9 CSV reports in
data/reports/ - Data Quality DAG runs quality checks and creates scorecard
After successful execution:
Check Database Tables:
# Using pgAdmin (http://localhost:5050)
# Navigate to: airflow → Schemas → etl_output → Tables
# Or using psql
docker exec -it docker-postgres-1 psql -U airflow -d airflow
\dt etl_output.*
SELECT COUNT(*) FROM etl_output.customers; -- Should show ~15,000 rows
SELECT COUNT(*) FROM etl_output.products; -- Should show ~2,500 rows
SELECT COUNT(*) FROM etl_output.sales; -- Should show ~26,000 rows
\qCheck Generated Reports:
ls data\reports\Expected files:
customer_summary.csv- Customer statisticscustomer_segmentation.csv- RFM analysisorder_status.csv- Order trackingsales_trends_daily.csv- Daily patternsproduct_performance.csv- Top productsstore_performance.csv- Store rankingsanomaly_detection.csv- Outliersdata_quality_scorecard.csv- Quality metricsdag_execution_summary.csv- Pipeline performance
Download Reports from Dashboard:
- Go to http://localhost:5000
- Click "Downloads" tab
- Select "Generated Reports" category
- Click "Preview" to view first 50 rows
- Click "Download" to save locally
Congratulations! 🎉 You've successfully set up and run your first ETL pipeline!
Docker Architecture:
┌─────────────────────────────────────────────────────────┐
│ Docker Network │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Airflow │ │ Airflow │ │ PostgreSQL │ │
│ │ Webserver │ │ Scheduler │ │ Database │ │
│ │ :8080 │ │ │ │ :5432 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ REST API │ │ pgAdmin │ │ Grafana │ │
│ │ :8000 │ │ :5050 │ │ :3000 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
│
┌──────────────┐
│ Flask Web │
│ Dashboard │
│ :5000 │
└──────────────┘
Data Flow:
CSV Files (data/raw/dataset/)
↓
Airflow DAGs Extract → Transform → Load
↓
PostgreSQL (etl_output schema)
↓
Reports Generated (data/reports/)
↓
Accessible via: pgAdmin | Web Dashboard | REST API
File Locations:
- Source Data:
data/raw/dataset/(CSV files) - Staging Data:
data/staging/(intermediate processing) - Cleaned Data:
data/processed/(validated CSVs) - Reports:
data/reports/(business analytics) - Logs:
logs/dag_id=*/(execution logs) - DAGs:
dags/(pipeline definitions) - Scripts:
scripts/(ETL logic) - Config:
config/(YAML configurations)
Now that your system is running:
- Explore the Airflow UI: Try pausing/unpausing DAGs, viewing logs, checking task durations
- Query the Database: Use pgAdmin to run SQL queries on your data
- Review Reports: Analyze the business reports generated in
data/reports/ - Try the API: Use the interactive docs at http://localhost:8000/docs
- Monitor with Grafana: Set up dashboards at http://localhost:3000 (default login: admin/admin)
- Experiment: Modify DAG schedules, add new transformations, create custom reports
Airflow_ETL/
├── dags/ # Airflow DAG definitions
│ ├── etl_master_orchestrator.py # Master coordination DAG
│ ├── etl_customers.py # Customer dimension pipeline
│ ├── etl_products.py # Product dimension pipeline
│ ├── etl_stores.py # Store dimension pipeline
│ ├── etl_exchange_rates.py # Exchange rate pipeline
│ ├── etl_sales.py # Sales fact pipeline
│ ├── etl_reports.py # Business reporting DAG
│ └── dag_base.py # Shared configuration
│
├── scripts/ # Utility modules
│ ├── Extract.py # Data extraction logic
│ ├── Transform.py # Data transformation logic
│ ├── Load.py # Database loading logic
│ ├── ReportGenerator.py # Report generation
│ ├── api/ # REST API service
│ │ ├── main.py # FastAPI application
│ │ ├── web_dashboard.py # Flask web UI
│ │ ├── routes/ # API endpoints
│ │ ├── models/ # Pydantic models
│ │ └── utils/ # API utilities
│ └── utils/ # Shared utilities
│
├── config/ # YAML configurations
│ ├── customers_config.yaml
│ ├── products_config.yaml
│ ├── stores_config.yaml
│ ├── sales_config.yaml
│ └── exchange_rates_config.yaml
│
├── data/ # Data directories
│ ├── raw/dataset/ # Source CSV files
│ ├── staging/ # Intermediate processing
│ ├── processed/ # Cleaned data
│ └── reports/ # Generated reports
│
├── Docker/ # Docker configuration
│ ├── docker-compose.yaml # Service orchestration
│ ├── Dockerfile # Airflow image
│ ├── Dockerfile.api # API service image
│ └── .env # Environment variables
│
└── docs/ # Documentation
└── Implementation_Snippets.md # Code reference
After running docker-compose up -d, access these services:
| Service | URL | Credentials | Purpose |
|---|---|---|---|
| Airflow UI | http://localhost:8080 | airflow / airflow | DAG management & monitoring |
| REST API | http://localhost:8000 | API Key: dev-key-12345 | Programmatic access |
| API Docs | http://localhost:8000/docs | - | Interactive API documentation |
| Web Dashboard | http://localhost:5000 | - | User-friendly monitoring UI (run python scripts/api/web_dashboard.py) |
| pgAdmin | http://localhost:5050 | admin@admin.com / admin | Database visualization |
| PostgreSQL | localhost:5434 | airflow / airflow | Direct database access |
In addition to pgAdmin's graphical interface, you can connect to PostgreSQL directly:
Method 1: From Outside Docker (Desktop Tools)
Use any PostgreSQL client (DBeaver, DataGrip, psql) with these settings:
- Host:
localhost(external access) - Port:
5434(exposed Docker port) - Database:
airflow - Username:
airflow - Password:
airflow
Example with psql CLI:
# If you have PostgreSQL installed locally
psql -h localhost -p 5434 -U airflow -d airflow
# List tables in etl_output schema
\dt etl_output.*
# Query data
SELECT COUNT(*) FROM etl_output.customers;Method 2: From Inside Docker Container
# Access PostgreSQL container directly
docker exec -it docker-postgres-1 psql -U airflow -d airflow
# You're now in psql prompt
airflow=# \l -- List databases
airflow=# \dn -- List schemas
airflow=# \dt etl_output.* -- List tables in etl_output schema
airflow=# \d etl_output.sales -- Describe sales table structure
airflow=# \q -- Exit psqlConnection String for Applications:
postgresql://airflow:airflow@localhost:5434/airflow
Security Note: These are development credentials. For production:
- Change default passwords in
Docker/.env - Use secrets management (Docker secrets, Vault)
- Restrict network access with firewall rules
- Enable SSL/TLS connections
Runs all pipelines in the correct order:
1. Navigate to Airflow UI (http://localhost:8080)
2. Find "etl_master_orchestrator"
3. Toggle to ON (unpause)
4. Click Play → Trigger DAG
Execution Order:
- Customers, Products, Stores, Exchange Rates (parallel)
- Sales (waits for Products to complete)
- Reports (waits for all pipelines to complete)
Run specific pipelines independently:
etl_customers- Customer dimensionetl_products- Product dimensionetl_stores- Store dimensionetl_exchange_rates- Exchange rate dimensionetl_sales- Sales fact tableetl_reports- Generate 9 business reports
# Trigger via REST API
Invoke-RestMethod -Uri "http://localhost:8000/api/v1/dags/etl_master_orchestrator/trigger" `
-Method POST `
-Headers @{"X-API-Key"="dev-key-12345"}Via Airflow UI:
- Click on DAG name → Graph view
- See task status (green=success, red=failed, yellow=running)
- Click task → Logs to see execution details
Via Web Dashboard:
- Open http://localhost:5000
- Select DAG from dropdown
- View runs, tasks, and logs
Via REST API:
# Get DAG status
Invoke-RestMethod -Uri "http://localhost:8000/api/v1/dags/etl_master_orchestrator/status" `
-Headers @{"X-API-Key"="dev-key-12345"}
# Get recent runs
Invoke-RestMethod -Uri "http://localhost:8000/api/v1/dags/etl_master_orchestrator/runs?page_size=10" `
-Headers @{"X-API-Key"="dev-key-12345"}# Start all services
cd Docker
docker-compose up -d
# Stop all services
docker-compose down
# View service status
docker-compose ps
# View logs for specific service
docker-compose logs -f airflow-webserver
docker-compose logs -f airflow-scheduler
docker-compose logs -f api
# View logs for all services
docker-compose logs -f
# Restart a specific service
docker-compose restart airflow-webserver
# Restart all services
docker-compose restart
# Stop and remove all containers + volumes (CAUTION: deletes database!)
docker-compose down -v
# Rebuild images after code changes
docker-compose build
docker-compose up -dExecute commands inside the webserver container:
# Access Airflow CLI
docker exec -it airflow-webserver bash
# List all DAGs
airflow dags list
# Trigger a specific DAG
airflow dags trigger etl_master_orchestrator
# Test a specific task
airflow tasks test etl_customers extract 2026-01-19
# Clear task state to re-run
airflow tasks clear etl_sales --yes
# Check DAG structure
airflow dags show etl_master_orchestrator
# List DAG runs
airflow dags list-runs -d etl_master_orchestrator
# Create Airflow user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com# Connect to PostgreSQL via Docker
docker exec -it airflow-postgres psql -U airflow -d airflow
# Common SQL queries
# List all tables
\dt etl_output.*
# Count records
SELECT 'customers' as table, COUNT(*) FROM etl_output.customers
UNION ALL
SELECT 'products', COUNT(*) FROM etl_output.products
UNION ALL
SELECT 'stores', COUNT(*) FROM etl_output.stores
UNION ALL
SELECT 'sales', COUNT(*) FROM etl_output.sales
UNION ALL
SELECT 'exchange_rates', COUNT(*) FROM etl_output.exchange_rates;
# Exit psql
\q# Activate conda environment
conda activate KB_1978
# Install dependencies
pip install -r requirements.txt
# Run REST API locally (not in Docker)
cd scripts/api
uvicorn main:app --reload --port 8000
# Run Flask dashboard locally
python web_dashboard.py
# Run tests
python test_api.pySymptom: error during connect: This error may indicate that the docker daemon is not running
Solution:
# Start Docker Desktop from Start Menu
# Or run command (if installed as service)
Start-Service docker
# Verify Docker is running
docker versionSymptom: First docker-compose up -d stuck on "Pulling..."
Context: First-time setup downloads ~2GB of images (PostgreSQL, Airflow, Redis, Grafana, etc.)
Solution:
- Be patient: Initial download can take 5-15 minutes depending on internet speed
- Check progress: Open Docker Desktop → Images to see download progress
- Resume interrupted download: Just run
docker-compose up -dagain - Use faster mirror: Edit docker daemon settings to use a closer registry mirror
Symptom: Bind for 0.0.0.0:8080 failed: port is already allocated
Solution:
# Identify what's using the port
netstat -ano | findstr :8080
# Kill the process (replace <PID> with actual process ID)
taskkill /PID <PID> /F
# Or change port in Docker/.env file
AIRFLOW_PORT=8081 # Change to any available portCommon Port Conflicts:
- 8080: Airflow Webserver (alternative: 8081, 8082)
- 5434: PostgreSQL (alternative: 5435, 5436)
- 8000: REST API (alternative: 8001, 8888)
- 5000: Flask Dashboard (alternative: 5001, 3001)
- 5050: pgAdmin (alternative: 5051, 8888)
Symptom: Containers crash with OOMKilled or fail to start
Solution:
# Increase Docker memory allocation
# Docker Desktop → Settings → Resources → Memory
# Set to at least 4GB (recommended: 6-8GB)Symptom: docker-airflow-init-1 exited with code 1
Solution:
# Check init logs
docker-compose logs airflow-init
# Common causes:
# 1. Database not ready - wait 30 seconds and restart
docker-compose restart airflow-init
# 2. Permissions issue - reset volumes
docker-compose down -v
docker-compose up -d
# 3. Configuration error - check Docker/.env fileSymptom: Airflow UI is empty, no DAGs listed
Solution:
# Wait 2-3 minutes for DAG parsing
# Check scheduler logs
docker-compose logs airflow-scheduler | Select-String -Pattern "ERROR"
# Verify DAG files are mounted
docker exec -it docker-webserver-1 ls /opt/airflow/dags
# Check for Python syntax errors
docker exec -it docker-webserver-1 python /opt/airflow/dags/etl_master_orchestrator.py
# Force DAG refresh
docker-compose restart airflow-schedulerSymptom: "Unable to connect to server"
Common Mistakes & Solutions:
❌ Mistake 1: Using localhost as hostname
✅ Solution: Use postgres (Docker service name)
❌ Mistake 2: Using port 5434
✅ Solution: Use port 5432 (internal Docker port)
❌ Mistake 3: PostgreSQL not ready
✅ Solution: Wait 2-3 minutes after docker-compose up, check health:
docker-compose ps postgres
# Should show: Up (healthy)❌ Mistake 4: Wrong credentials
✅ Solution: Username=airflow, Password=airflow, Database=airflow
Symptom: Dashboard shows "Failed to connect to API" or "Connection refused"
Solution:
# 1. Verify REST API is running
docker-compose ps api
# Should show: Up
# 2. Test API health
Invoke-RestMethod -Uri "http://localhost:8000/health"
# Should return: {"status":"healthy"}
# 3. Check API logs for errors
docker-compose logs api
# 4. Restart API service
docker-compose restart api
# 5. Verify Python dependencies installed
pip list | Select-String -Pattern "flask|requests|pandas"Symptom: Tasks fail with ModuleNotFoundError: No module named 'scripts'
Solution:
# Verify PYTHONPATH in docker-compose.yaml
# Should include: PYTHONPATH=/opt/airflow
# Restart scheduler to reload environment
docker-compose restart airflow-scheduler
# Check if scripts are mounted correctly
docker exec -it docker-webserver-1 ls /opt/airflow/scriptsSymptom: Master orchestrator or child DAGs running very slowly
Solutions Applied (Already Optimized in v2.0):
- ✅ Sensor poke interval reduced: 30s → 5s
- ✅ Sensor timeout reduced: 3600s → 1200s
- ✅ Sensor mode changed to 'reschedule'
- ✅ Parallelism increased: 32 concurrent tasks
- ✅ Expected duration: 5-8 minutes
If Still Slow:
# Check active task count
docker exec -it docker-webserver-1 airflow tasks states-for-dag-run etl_master_orchestrator <run_id>
# Review task durations in Airflow UI
# Go to: DAG → Run → Task Duration chart
# Check for blocked sensors
docker-compose logs airflow-scheduler | Select-String -Pattern "sensor"Problem: REST API endpoints not working
# Check API container logs
docker-compose logs api
# Verify API is running
Invoke-RestMethod -Uri "http://localhost:8000/health"
# Check database connection from API
docker exec -it airflow-api python -c "from utils.airflow_client import AirflowClient; print('OK')"Problem: Data lost after restart
- Don't use
docker-compose down -v(deletes volumes) - Use
docker-compose downto preserve data - Check volume status:
docker volume ls docker volume inspect docker_postgres-db-volume
- Create new DAG file in
dags/directory - Import from
dag_base.pyfor consistency:from dag_base import DEFAULT_ARGS, DEFAULT_DAG_CONFIG, get_db_config
- Test locally:
python dags/your_new_dag.py - Restart scheduler to pick up changes:
docker-compose restart airflow-scheduler
For changes to scripts/ or dags/:
- Files are mounted as volumes - changes reflect immediately
- No need to rebuild Docker images
- Restart scheduler if DAG structure changes
For changes to API code (scripts/api/):
- Restart API service:
docker-compose restart api
For changes to Docker configuration:
# Rebuild and restart
docker-compose build
docker-compose up -d# Test a specific task without running full DAG
docker exec -it airflow-webserver \
airflow tasks test etl_customers extract 2026-01-19
# Run Python scripts manually
docker exec -it airflow-webserver python /opt/airflow/scripts/Extract.pyComprehensive documentation is available in the following files:
| Document | Description |
|---|---|
| PROJECT_TRACKER.md | Master tracking with all sprints, tasks, and credentials |
| TEAM1_TASK_TRACKER.md | Detailed Team 1 implementation tracker |
| TEAM2_TASK_TRACKER.md | Team 2 planning tracker |
| API_ROUTES_GUIDE.md | Complete REST API reference (850+ lines) |
| Docker/API_SERVICE_GUIDE.md | Docker-specific API usage guide |
| scripts/api/README.md | API developer documentation |
| docs/Implementation_Snippets.md | Code examples for all 42 tasks |
- Task Reference: All 42 tasks (T0007-T0042) documented in Implementation_Snippets.md
- API Examples: 13 endpoints with curl/PowerShell examples in API_ROUTES_GUIDE.md
- Configuration: YAML files in
config/directory for each data source - Credentials: All access information in PROJECT_TRACKER.md
- DAG: Directed Acyclic Graph - defines workflow
- Task: Single unit of work (extract, transform, load)
- Operator: Template for a task (PythonOperator, BashOperator)
- Sensor: Waits for condition (ExternalTaskSensor)
- XCom: Cross-communication between tasks
- Master-Child DAGs: Orchestrator triggers individual pipelines
- ExternalTaskSensor: Wait for upstream DAG completion
- Task Groups: Organize related tasks visually
- Bulk Operations: Process data in optimized chunks
- Rejected Records: Track and store failed validations
After successful execution:
-- etl_output schema contains:
customers (15,266 rows)
products (2,517 rows)
stores (67 rows)
exchange_rates (3,655 rows)
sales (26,326 rows)Located in data/reports/:
customer_summary.csv- Customer statisticscustomer_segmentation.csv- RFM analysisorder_status.csv- Order trackingsales_trends_daily.csv- Daily sales patternsproduct_performance.csv- Top productsstore_performance.csv- Store rankingsanomaly_detection.csv- Outlier identificationdata_quality_scorecard.csv- Quality metricsdag_execution_summary.csv- Pipeline performance
- Task logs:
logs/dag_id={dag_name}/run_id={run_id}/task_id={task_name}/ - Scheduler logs:
logs/scheduler/
This project was developed as part of Infosys training program. For questions or issues:
- Check existing documentation
- Review logs for error messages
- Consult troubleshooting section
- Contact project maintainers
# START EVERYTHING
cd Docker
docker-compose up -d
# START WEB DASHBOARD (in separate terminal)
conda activate KB_1978
cd D:\sam\Projects\Infosys\Airflow
python scripts/api/web_dashboard.py
# CHECK STATUS
docker-compose ps
# VIEW LOGS
docker-compose logs -f
# STOP EVERYTHING
docker-compose down
# AIRFLOW UI
http://localhost:8080
airflow / airflow
# TRIGGER MASTER DAG
# Go to UI → etl_master_orchestrator → Play → Trigger
# CHECK RESULTS
# pgAdmin: http://localhost:5050
# Web Dashboard: http://localhost:5000Use this checklist to ensure your environment is properly configured:
- Docker Desktop installed (version 20.10+)
- Run:
docker --version
- Run:
- Docker Compose installed (version 2.0+)
- Run:
docker-compose --version
- Run:
- Docker Desktop is running
- Check: Docker icon in system tray should be green
- Docker has sufficient resources
- Docker Desktop → Settings → Resources
- Memory: Minimum 4GB (Recommended: 6-8GB)
- Disk: At least 10GB free space
- Repository cloned
git clone https://github.com/SammyBoy-09/Airflow_ETL.git
- CSV data files present
- Check:
ls data\raw\datasetshows 5 CSV files
- Check:
- Port availability checked
- Ports free: 8080, 5434, 8000, 5000, 5050, 3000, 9090
- Navigated to Docker directory
cd Docker
- Started services
docker-compose up -d
- Waited for initialization
- Wait 3-5 minutes for first-time setup
- Verified all services running
docker-compose ps- All services showUpand(healthy)
- Checked logs for errors
docker-compose logs | Select-String -Pattern "ERROR"
- Accessed Airflow UI
- Open: http://localhost:8080
- Logged in successfully
- Username:
airflow, Password:airflow
- Username:
- Verified DAGs visible
- Should see 8+ DAGs in list
- All DAGs parsed without errors
- No red "Import Error" messages
- Accessed pgAdmin
- Open: http://localhost:5050
- Logged into pgAdmin
- Email:
admin@admin.com, Password:admin
- Email:
- Registered PostgreSQL server
- Host:
postgres, Port:5432, Database:airflow - Username:
airflow, Password:airflow
- Host:
- Connection successful
- Can see: Servers → Airflow ETL Database → Databases → airflow
- Verified schemas present
publicschema (Airflow metadata)etl_outputschema (for ETL data - empty until first run)
- Verified REST API running
- Test:
Invoke-RestMethod -Uri "http://localhost:8000/health" - Should return:
{"status":"healthy"}
- Test:
- Python environment created (for dashboard)
conda create -n airflow_env python=3.11ORpython -m venv airflow_env
- Python environment activated
conda activate airflow_envOR.\airflow_env\Scripts\Activate.ps1
- Dependencies installed
pip install -r requirements.txt
- Web dashboard started
python scripts/api/web_dashboard.py
- Dashboard accessible
- Open: http://localhost:5000
- Unpaused master orchestrator
- Airflow UI →
etl_master_orchestrator→ Toggle to ON
- Airflow UI →
- Triggered master DAG
- Click Play button → Trigger DAG
- Monitored execution
- Watch Graph view or use Web Dashboard
- Verified successful completion
- All tasks green in Graph view
- Expected duration: 5-8 minutes
- Checked database tables
- pgAdmin: etl_output schema should have 5 tables with data
- Verified reports generated
- Check:
ls data\reportsshows 9 CSV reports
- Check:
- Downloaded sample report
- Web Dashboard → Downloads → Preview/Download reports
- Accessed Grafana
- Open: http://localhost:3000
- Default login:
admin/admin
- Accessed Prometheus
- Open: http://localhost:9090
- Set up custom dashboards (optional)
- Customer data loaded
- Query:
SELECT COUNT(*) FROM etl_output.customers; - Expected: ~15,000 rows
- Query:
- Product data loaded
- Query:
SELECT COUNT(*) FROM etl_output.products; - Expected: ~2,500 rows
- Query:
- Sales data loaded
- Query:
SELECT COUNT(*) FROM etl_output.sales; - Expected: ~26,000 rows
- Query:
- Reports accessible
- All 9 reports in
data/reports/directory
- All 9 reports in
- API endpoints working
- Test: http://localhost:8000/docs (interactive API docs)
If you encounter issues, refer to:
- Troubleshooting section (above)
- Docker logs:
docker-compose logs <service-name> - Airflow task logs: Airflow UI → DAG → Task → Logs
- Service health:
docker-compose ps
What is Airflow?
- Apache Airflow is a workflow orchestration tool
- DAG (Directed Acyclic Graph) = A workflow with tasks and dependencies
- Task = A single unit of work (e.g., extract data, transform data)
- Operator = A template for creating tasks (PythonOperator, BashOperator)
- Scheduler = Runs DAGs based on schedule or trigger
- Webserver = Provides the UI for monitoring
What is the ETL Process?
- Extract: Read data from CSV files
- Transform: Clean, validate, and reshape data
- Load: Insert data into PostgreSQL database
How Does This Project Work?
- Master Orchestrator triggers individual DAG workflows
- Dimension DAGs (customers, products, stores, exchange_rates) load reference data in parallel
- Fact DAG (sales) loads transaction data after products complete
- Reports DAG generates analytical reports after all data is loaded
- Quality DAG runs validation checks and creates scorecards
What Technologies Are Used?
- Docker: Containerization for consistent environments
- PostgreSQL: Relational database for storing data
- Airflow: Workflow orchestration and scheduling
- Flask: Web dashboard framework
- FastAPI: REST API framework
- Grafana/Prometheus: Monitoring and metrics
- Pandas: Data processing in Python
┌─────────────────────────────────────────────────────────────┐
│ USER INTERFACES │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Airflow UI │ │ Web Dashboard│ │ pgAdmin │ │
│ │ :8080 │ │ :5000 │ │ :5050 │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ DOCKER CONTAINER LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Webserver │ │ Scheduler │ │ REST API │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └──────────┬───────┴──────────────────┘ │
│ │ │
│ ┌──────────▼──────────┐ ┌──────────────┐ │
│ │ PostgreSQL │ │ Redis │ │
│ │ (Metadata + ETL) │ │ (Queue) │ │
│ └─────────────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ DATA STORAGE LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ CSV Files │ │ Processed │ │ Reports │ │
│ │ (data/raw) │ │ (data/proc) │ │ (data/rep) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────┐
│ CSV Files │ Customers.csv, Products.csv, Stores.csv,
└──────┬──────┘ Sales.csv, Exchange_Rates.csv
│
▼
┌─────────────────────────────────────────────────────────┐
│ AIRFLOW ETL PIPELINE │
│ │
│ Stage 1: Ingestion (Parallel) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────────┐ │
│ │Customer │ │Product │ │ Store │ │Exchange Rate │ │
│ │ ETL │ │ ETL │ │ ETL │ │ ETL │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └──────┬───────┘ │
│ │ │ │ │ │
│ └───────────┴────────────┴─────────────┘ │
│ │ │
│ Stage 2: Facts (Sequential) │
│ ┌───────────▼───────────┐ │
│ │ Sales ETL │ (Waits for Products) │
│ └───────────┬───────────┘ │
│ │ │
│ Stage 3: Analytics │
│ ┌───────────▼───────────┐ │
│ │ Report Generation │ (9 Business Reports) │
│ └───────────┬───────────┘ │
│ │ │
│ Stage 4: Quality │
│ ┌───────────▼───────────┐ │
│ │ Data Quality Checks │ │
│ └───────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ PostgreSQL Database (etl_output schema) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Customers │ │ Products │ │ Stores │ │ Sales │ │
│ │15K rows │ │ 2.5K rows│ │ 67 rows │ │26K rows │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Generated Reports (CSV) │
│ • Customer Summary • Sales Trends Daily │
│ • Customer Segmentation • Product Performance │
│ • Order Status • Store Performance │
│ • Anomaly Detection • Data Quality Scorecard │
│ • DAG Execution Summary │
└─────────────────────────────────────────────────────────┘
Last Updated: January 28, 2026
Version: 2.0 (Performance Optimized)
This project is licensed under the MIT License - see the LICENSE file for details.