A production-inspired data engineering and backend platform for retail analytics, designed to demonstrate cloud infrastructure, workflow orchestration, data warehousing, SQL analytics, API development, and DevOps automation.
This platform simulates a real retail company analytics system.
It generates realistic retail data, lands raw source files in object storage, loads those files into a local warehouse, transforms raw records into clean relational tables, builds business-ready Gold analytics models, and prepares the foundation for reporting APIs and multi-cloud deployment.
The platform is designed to map closely to responsibilities expected from a Senior Software Engineer – Data Platform role, including:
- cloud infrastructure design
- data pipeline development
- object storage ingestion
- warehouse modeling
- SQL transformations
- workflow orchestration
- backend reporting APIs
- CI/CD automation
- AWS and GCP architecture readiness
Retail companies collect data from many different systems:
- online orders
- physical stores
- customer profiles
- product catalogs
- inventory systems
- marketing campaigns
- returns and refunds
Without a centralized analytics platform, business teams struggle to answer questions such as:
- What is daily revenue?
- Which products are top sellers?
- Which customers are most valuable?
- Which stores are underperforming?
- Which inventory items are at risk?
- Which campaigns produce the best ROI?
This project solves that problem by creating a structured analytics platform from raw data generation to business-ready reporting models.
Retail Source Data Generator
|
v
Generated JSON Source Files
|
v
Bronze Object Storage
MinIO locally / AWS S3 later
|
v
Bronze Warehouse Loader
|
v
PostgreSQL Local Warehouse
Bronze -> Silver -> Gold
|
v
Gold Analytics Models
|
v
FastAPI Reporting API
|
v
Dashboards / BI / API Consumers
- Python 3.11+
- Docker and Docker Compose
- MinIO for S3-compatible object storage
- PostgreSQL as local warehouse simulation
- Apache Airflow for orchestration foundation
- Pydantic for schema validation
- psycopg2 for PostgreSQL access
- pytest for testing
- Ruff for linting
- DBeaver for visual database inspection
Later phases are designed to map to:
- AWS S3
- AWS Redshift
- AWS EC2 Spot Instances
- AWS ECS
- AWS IAM and VPC concepts
- GCP Cloud Storage
- GCP BigQuery
- Jenkins CI/CD
- FastAPI reporting services
Current completed phase:
Step 12: Airflow End-to-End DAG Orchestration
Completed so far:
[✓] Step 1: Repository structure and documentation foundation
[✓] Step 2: Local Docker infrastructure with PostgreSQL, MinIO, and Airflow
[✓] Step 3: Retail source data generator
[✓] Step 4: Object storage abstraction and MinIO client
[✓] Step 5: Bronze ingestion into object storage
[✓] Step 6: Local warehouse foundation with Bronze/Silver/Gold/Ops schemas
[✓] Step 7: Bronze warehouse loader from MinIO to PostgreSQL
[✓] Step 8: Silver transformations from raw JSONB to typed relational tables
[✓] Step 9: Gold analytics models for business reporting
[✓] Step 10: FastAPI reporting API
[✓] Step 11: API Dockerization
[✓] Step 12: Airflow end-to-end DAG orchestration
Next planned phase:
Step 13: Data quality checks
multi-cloud-retail-analytics-platform/
|
├── README.md
├── .env.example
├── .gitignore
├── pyproject.toml
├── docker-compose.yml
|
├── config/
│ └── datasets.yml
|
├── docs/
│ ├── architecture.md
│ ├── data_model.md
│ ├── local_development.md
│ └── pipeline_design.md
|
├── ingestion/
│ ├── generate_retail_data.py
│ ├── ingest_to_object_storage.py
│ └── schemas.py
|
├── storage/
│ ├── exceptions.py
│ ├── object_storage_client.py
│ └── minio_client.py
|
├── warehouse/
│ └── local_postgres/
│ ├── ddl/
│ │ ├── 001_create_warehouse_schemas.sql
│ │ ├── 002_create_silver_tables.sql
│ │ └── 003_create_gold_tables.sql
│ └── load/
│ └── bronze_loader.py
|
├── transformations/
│ ├── silver/
│ │ └── 001_transform_bronze_to_silver.sql
│ └── gold/
│ └── 001_transform_silver_to_gold.sql
|
├── scripts/
│ ├── init_local_warehouse.py
│ ├── verify_local_warehouse.py
│ ├── verify_minio_storage.py
│ ├── run_local_bronze_ingestion.py
│ ├── load_bronze_warehouse.py
│ ├── run_silver_transformations.py
│ ├── verify_silver_tables.py
│ ├── run_gold_transformations.py
│ └── verify_gold_tables.py
|
├── airflow/
│ └── dags/
│ ├── health_check_dag.py
│ └── retail_analytics_e2e_dag.py
|
├── tests/
│ ├── unit/
│ ├── integration/
│ └── data_quality/
|
└── infrastructure/
├── aws/
├── gcp/
└── docker/
The Bronze layer stores raw source records as JSONB.
Purpose:
- preserve raw data
- support replay and reprocessing
- maintain auditability
- keep source structure intact
Example tables:
bronze.raw_sales
bronze.raw_products
bronze.raw_customers
bronze.raw_stores
bronze.raw_inventory
bronze.raw_campaigns
bronze.raw_returns
The Silver layer converts raw JSONB records into clean typed relational tables.
Purpose:
- enforce SQL data types
- simplify joins
- standardize fields
- deduplicate business records
- prepare data for analytics
Example tables:
silver.sales
silver.products
silver.customers
silver.stores
silver.inventory
silver.campaigns
silver.returns
The Gold layer stores business-ready analytics models.
Purpose:
- power reports
- support dashboard queries
- prepare data for FastAPI endpoints
- avoid expensive calculations inside the API layer
Gold tables:
gold.daily_revenue
gold.product_sales_performance
gold.customer_lifetime_value
gold.store_performance
gold.inventory_risk
gold.campaign_roi
gold.executive_kpis
The source data generator creates these datasets:
| Dataset | File | Purpose |
|---|---|---|
| Products | data/generated/products.json | Product catalog |
| Customers | data/generated/customers.json | Customer profiles |
| Stores | data/generated/stores.json | Retail locations |
| Campaigns | data/generated/campaigns.json | Marketing campaigns |
| Sales | data/generated/sales.json | Sales transactions |
| Inventory | data/generated/inventory.json | Stock records |
| Returns | data/generated/returns.json | Refund and return records |
Generated data preserves relationships such as:
sales.product_idreferencesproducts.product_idsales.customer_idreferencescustomers.customer_idsales.store_idreferencesstores.store_idsales.campaign_idoptionally referencescampaigns.campaign_idreturns.order_idreferencessales.order_idinventory.product_idreferencesproducts.product_idinventory.store_idreferencesstores.store_id
python -m venv .venvWindows PowerShell:
.venv\Scripts\Activate.ps1macOS/Linux:
source .venv/bin/activateUpgrade pip:
python -m pip install --upgrade pipInstall dependencies:
pip install -e ".[dev,storage,warehouse]"cp .env.example .envWindows PowerShell:
Copy-Item .env.example .envdocker compose up -dCheck service status:
docker compose psLocal service URLs:
| Service | URL |
|---|---|
| MinIO Console | http://localhost:9001 |
| Airflow UI | http://localhost:8080 |
| PostgreSQL | localhost:5432 |
Default MinIO credentials:
Username: minioadmin
Password: minioadmin
Default Airflow credentials:
Username: admin
Password: admin
Run the full pipeline from source generation to Gold analytics:
docker compose up -d
python scripts/init_local_warehouse.py
python ingestion/generate_retail_data.py
python ingestion/ingest_to_object_storage.py
python scripts/load_bronze_warehouse.py
python scripts/run_silver_transformations.py
python scripts/verify_silver_tables.py
python scripts/run_gold_transformations.py
python scripts/verify_gold_tables.pyExpected final result:
Gold verification passed.
Gold table row counts:
- gold.daily_revenue: ...
- gold.product_sales_performance: 100
- gold.customer_lifetime_value: 500
- gold.store_performance: 20
- gold.inventory_risk: 500
- gold.campaign_roi: 12
- gold.executive_kpis: 1+
Run:
python ingestion/generate_retail_data.pyThis creates JSON source files in:
data/generated/
Custom smaller test run:
python ingestion/generate_retail_data.py --products 10 --customers 20 --stores 5 --campaigns 3 --sales 50 --inventory 30 --return-rate 0.10 --seed 123Windows PowerShell one-line version:
python ingestion/generate_retail_data.py --products 10 --customers 20 --stores 5 --campaigns 3 --sales 50 --inventory 30 --return-rate 0.10 --seed 123Make sure Docker services are running:
docker compose up -dRun:
python scripts/verify_minio_storage.pyThis validates that Python can:
- connect to MinIO
- create or verify the Bronze bucket
- upload a test object
- list objects
- download the object
- verify downloaded content
Generate source data first:
python ingestion/generate_retail_data.pyRun Bronze ingestion:
python ingestion/ingest_to_object_storage.pyThis uploads generated JSON files from:
data/generated/
to MinIO using this layout:
bronze/{dataset}/dt={YYYY-MM-DD}/run_id={run_id}/{dataset}.json
A manifest is also written locally and uploaded to object storage:
manifests/bronze/dt={YYYY-MM-DD}/run_id={run_id}/
Run with custom date and run ID:
python ingestion/ingest_to_object_storage.py --ingestion-date 2026-05-03 --run-id manual_test_001Run:
python scripts/init_local_warehouse.pyVerify required schemas and tables:
python scripts/verify_local_warehouse.pyThis creates:
bronze
silver
gold
ops
The ops schema tracks pipeline runs, loaded files, and data quality results.
Run:
python scripts/load_bronze_warehouse.pyVerify Bronze row counts:
docker exec -it retail_postgres psql -U retail_user -d retail_analytics -c "SELECT COUNT(*) FROM bronze.raw_sales;"Verify loaded file audit records:
docker exec -it retail_postgres psql -U retail_user -d retail_analytics -c "SELECT dataset_name, load_status, record_count FROM ops.loaded_files;"Run the loader again to test idempotency:
python scripts/load_bronze_warehouse.pyAlready-loaded files should be skipped instead of duplicated.
After Bronze files are loaded into PostgreSQL, run:
python scripts/run_silver_transformations.pyVerify Silver row counts:
python scripts/verify_silver_tables.pyExample Silver query:
SELECT
s.order_id,
s.total_amount,
s.order_status,
c.email,
p.product_name,
st.store_name
FROM silver.sales s
JOIN silver.customers c
ON s.customer_id = c.customer_id
JOIN silver.products p
ON s.product_id = p.product_id
JOIN silver.stores st
ON s.store_id = st.store_id
LIMIT 50;After Silver tables are populated, run:
python scripts/run_gold_transformations.pyVerify Gold row counts:
python scripts/verify_gold_tables.pyExample Gold query:
SELECT
product_name,
category,
units_sold,
gross_revenue,
estimated_gross_margin
FROM gold.product_sales_performance
ORDER BY gross_revenue DESC
LIMIT 20;After Gold tables are populated, start the reporting API:
uvicorn api.app.main:app --reloadAPI runs at:
http://localhost:8000
Interactive docs at:
http://localhost:8000/docs
| Method | Endpoint | Description |
|---|---|---|
| GET | / |
Root — app name, environment, links |
| GET | /health |
Health check |
| GET | /api/v1/kpis/overview |
Latest executive KPI snapshot |
| GET | /api/v1/revenue/daily |
Daily revenue (filterable by date range) |
| GET | /api/v1/products/performance |
Product sales performance |
| GET | /api/v1/customers/lifetime-value |
Customer lifetime value |
| GET | /api/v1/inventory/risk |
Inventory risk by stock status |
| GET | /api/v1/campaigns/roi |
Campaign ROI metrics |
Query parameters supported on list endpoints:
start_date YYYY-MM-DD filter
end_date YYYY-MM-DD filter
limit max rows returned (default 30, max 365)
All endpoints read directly from Gold analytics tables. No calculations happen inside the API layer.
The API uses environment variables for database connection. Defaults match local Docker setup:
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=retail_analytics
POSTGRES_USER=retail_user
POSTGRES_PASSWORD=retail_password
The reporting API is containerized for reproducible local runs and future ECS/cloud deployment.
Build the API image:
docker compose build apiStart all services including the API:
docker compose up -dAPI available at:
http://localhost:8000
- Base image:
python:3.11-slim - Installs system deps:
gcc,libpq-dev,curl - Copies project source and installs with
pip install -e ".[api,warehouse,storage]" - Exposes port
8000 - Healthcheck hits
/healthevery 30 seconds - Starts with
uvicorn api.app.main:app --host 0.0.0.0 --port 8000
Docker layer caching is optimized — pyproject.toml is copied before source so dependency installation is only re-run when dependencies change.
Check API container health:
docker compose psView API logs:
docker compose logs apiThe full pipeline is orchestrated by a single end-to-end Airflow DAG defined in airflow/dags/retail_analytics_e2e_dag.py.
DAG ID: retail_analytics_e2e_pipeline
initialize_local_warehouse
↓
generate_source_data
↓
ingest_to_bronze_storage
↓
load_bronze_warehouse
↓
run_silver_transformations
↓
verify_silver_tables
↓
run_gold_transformations
↓
verify_gold_tables
↓
verify_api_queries
| Task ID | Script | Purpose |
|---|---|---|
initialize_local_warehouse |
scripts/init_local_warehouse.py |
Creates Bronze/Silver/Gold/Ops schemas and tables |
generate_source_data |
ingestion/generate_retail_data.py |
Generates realistic JSON retail datasets |
ingest_to_bronze_storage |
ingestion/ingest_to_object_storage.py |
Uploads JSON files to MinIO Bronze bucket |
load_bronze_warehouse |
scripts/load_bronze_warehouse.py |
Loads Bronze files from MinIO into PostgreSQL raw tables |
run_silver_transformations |
scripts/run_silver_transformations.py |
Transforms JSONB Bronze records into typed Silver tables |
verify_silver_tables |
scripts/verify_silver_tables.py |
Asserts Silver row counts and data presence |
run_gold_transformations |
scripts/run_gold_transformations.py |
Builds Gold analytics models from Silver |
verify_gold_tables |
scripts/verify_gold_tables.py |
Asserts Gold row counts and data presence |
verify_api_queries |
scripts/verify_api_queries.py |
Validates that Gold tables serve API queries correctly |
| Setting | Value |
|---|---|
| Schedule | Manual trigger only (schedule=None) |
| Catchup | Disabled |
| Max active runs | 1 |
| Retries per task | 2 |
| Retry delay | 2 minutes |
The DAG generates a standard production-representative dataset:
Products: 100
Customers: 500
Stores: 20
Campaigns: 12
Sales: 2000
Inventory: 500
Return rate: 8%
Via Airflow UI at http://localhost:8080:
- Navigate to DAGs
- Find
retail_analytics_e2e_pipeline - Click the play button to trigger manually
Via Airflow CLI:
docker exec -it retail_airflow airflow dags trigger retail_analytics_e2e_pipelineMonitor run status:
docker exec -it retail_airflow airflow dags list-runs --dag-id retail_analytics_e2e_pipelineView task logs in Airflow UI under Browse → Task Instances.
The ingest_to_bronze_storage task passes the Airflow execution timestamp as the run ID using Jinja templating:
--run-id {{ ts_nodash }}
This ensures each DAG run writes Bronze files to a unique partition:
bronze/{dataset}/dt={YYYY-MM-DD}/run_id={ts_nodash}/{dataset}.json
SELECT *
FROM gold.executive_kpis
ORDER BY snapshot_at DESC
LIMIT 5;SELECT *
FROM gold.daily_revenue
ORDER BY revenue_date DESC
LIMIT 30;SELECT
product_name,
category,
brand,
units_sold,
gross_revenue,
estimated_gross_margin,
return_rate
FROM gold.product_sales_performance
ORDER BY gross_revenue DESC
LIMIT 20;SELECT
customer_name,
email,
country,
loyalty_tier,
completed_orders,
total_spent,
average_order_value
FROM gold.customer_lifetime_value
ORDER BY total_spent DESC
LIMIT 20;SELECT
product_name,
category,
store_name,
region,
stock_quantity,
reorder_level,
stock_status
FROM gold.inventory_risk
WHERE stock_status IN ('low_stock', 'out_of_stock')
ORDER BY stock_status, stock_quantity ASC
LIMIT 50;SELECT
campaign_name,
channel,
target_region,
budget,
attributed_orders,
attributed_revenue,
estimated_roi
FROM gold.campaign_roi
ORDER BY estimated_roi DESC;Use DBeaver with this PostgreSQL connection:
Host: localhost
Port: 5432
Database: retail_analytics
Username: retail_user
Password: retail_password
If DBeaver shows a timezone error like:
FATAL: invalid value for parameter "TimeZone": "Asia/Katmandu"
Set the JDBC URL to:
jdbc:postgresql://localhost:5432/retail_analytics?options=-c%20TimeZone=UTC
Then run:
SHOW timezone;Expected:
UTC
Run tests:
python -m pytestRun Ruff:
ruff check .Auto-fix where possible:
ruff check . --fixThis project follows:
- clean architecture
- separation of concerns
- environment-based configuration
- defensive error handling
- idempotent pipeline behavior
- layered warehouse modeling
- operational audit tracking
- data quality readiness
- object storage abstraction
- repeatable local development
- documentation-first development
Completed:
- Repository structure and documentation
- Docker Compose local infrastructure
- Retail source data generator
- Object storage abstraction and MinIO client
- Bronze ingestion to object storage
- Local warehouse schemas and audit tables
- Bronze warehouse loader
- Silver transformations
- Gold analytics models
- FastAPI reporting API
- API Dockerization
- Airflow dynamic DAG orchestration
Next:
- Data quality checks (pipeline assertions, row count checks, null rate validation)
- Jenkins CI/CD
- AWS S3 and Redshift support
- GCP GCS and BigQuery replication
- EC2 spot worker orchestration
- Final portfolio documentation and interview notes