Skip to content

Latest commit

 

History

History
340 lines (272 loc) · 9.86 KB

File metadata and controls

340 lines (272 loc) · 9.86 KB

🚇 Data Engineering Project - Buenos Aires Transport Pipeline

Python Apache Airflow Docker Redshift

📋 Table of Contents

🎯 Overview

This project implements a comprehensive Data Engineering pipeline that continuously extracts transportation data from the Buenos Aires Transport API, processes it, and loads it into a Redshift Data Warehouse. The pipeline focuses on collecting real-time information about buses, subways, and EcoBici (bike-sharing) stations within the Autonomous City of Buenos Aires (CABA).

Key Objectives

  • Real-time Data Collection: Extract dynamic transportation data every hour
  • Data Integration: Combine API data with database information
  • Data Warehouse: Store processed data in Amazon Redshift for analytics
  • Monitoring: Implement email alerts for system status and anomalies
  • Scalability: Containerized deployment with Apache Airflow orchestration

🏗️ Architecture

┌────────────────┐     ┌──────────────┐    ┌──────────────┐
│   Transport    │     │   Apache     │    │   Amazon     │
│   API (CABA)   │───▶│   Airflow    │───▶│   Redshift   │
│                │     │   (Docker)   │    │   (DWH)      │
└────────────────┘     └──────────────┘    └──────────────┘
                              │
                              ▼
                       ┌─────────────────┐
                       │   Email Alerts  │
                       │   (SMTP/Gmail)  │
                       └─────────────────┘

Data Flow

  1. Extraction: Python scripts fetch data from Transport API endpoints
  2. Transformation: Data is cleaned, filtered, and structured
  3. Loading: Processed data is loaded into Redshift tables
  4. Monitoring: Email alerts notify about pipeline status

✨ Features

🚌 Bus Data Collection

  • Real-time vehicle positions from multiple bus companies
  • Route information and trip details
  • Speed and location tracking with timestamps
  • Agency management for different transport companies

🚇 Subway Information

  • Service alerts and status updates
  • Line information and schedules
  • Real-time monitoring of subway operations

🚲 EcoBici Station Data

  • Station information (locations, capacities, neighborhoods)
  • Bike availability (mechanical and electric bikes)
  • Station status and operational data
  • Neighborhood filtering for specific areas

📧 Alert System

  • Email notifications for pipeline failures
  • SMTP integration with Gmail
  • Customizable alert messages
  • Real-time monitoring of data quality

🔧 Prerequisites

Before running this project, ensure you have the following installed:

Required Software

  • Python 3.8+
  • Docker & Docker Compose
  • Git

Required Accounts & Services

  • Amazon Redshift cluster
  • Buenos Aires Transport API credentials
  • Gmail account (for email alerts)

Python Dependencies

pandas>=1.5.0
requests>=2.28.0
sqlalchemy==1.4.51
psycopg2-binary>=2.9.0
apache-airflow>=2.7.3
configparser

🚀 Installation

1. Clone the Repository

git clone https://github.com/yourusername/DataEngineeringCoder-.git
cd DataEngineeringCoder-

2. Set Up Environment

# Create virtual environment (optional but recommended)
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

3. Configure Docker Environment

# Set Airflow user ID (Linux/Mac)
echo -e "AIRFLOW_UID=$(id -u)" > .env

# On Windows, set AIRFLOW_UID=50000
echo "AIRFLOW_UID=50000" > .env

4. Start Airflow Services

# Initialize Airflow database
docker-compose up airflow-init

# Start all services
docker-compose up -d

⚙️ Configuration

1. API Credentials

Create a config/pipeline.conf file with your API credentials:

[api_transporte]
client_id = your_client_id_here
client_secret = your_client_secret_here

[RedShift]
host = your_redshift_cluster.amazonaws.com
port = 5439
dbname = your_database_name
user = your_username
pwd = your_password

2. Airflow Variables

Set up Airflow variables for email alerts:

# Access Airflow web UI at http://localhost:8080
# Go to Admin > Variables
# Add variable: GMAIL_SECRET = your_gmail_app_password

3. Environment Variables

Create a .env file for additional configuration:

AIRFLOW_UID=50000
AIRFLOW_IMAGE_NAME=apache/airflow:2.7.3
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow

📊 Usage

Starting the Pipeline

  1. Start Airflow Services:

    docker-compose up -d
  2. Access Airflow Web UI:

    • Open browser: http://localhost:8080
    • Login: airflow / airflow
  3. Enable DAGs:

    • Navigate to DAGs section
    • Enable my_daily_dag for data ingestion
    • Enable dag_smtp_email_automatico for email alerts

Manual Execution

# Run data ingestion manually
python scripts/main.py

# Or execute specific functions
python -c "from scripts.main import data_ingestion; data_ingestion()"

Monitoring Pipeline

  • Airflow Web UI: Monitor DAG runs and task status
  • Email Alerts: Check your configured email for notifications
  • Logs: View detailed logs in Airflow UI or ./logs directory

📁 Project Structure

DataEngineeringCoder-/
├── dags/                          # Apache Airflow DAGs
│   ├── data_ingestion.py         # Main ETL pipeline DAG
│   └── dag_email.py              # Email alerts DAG
├── scripts/                       # Core Python scripts
│   ├── main.py                   # Main ETL logic
│   ├── utils.py                  # Utility functions
│   ├── email_utils.py            # Email functionality
│   └── alert_utils.py            # Alert processing
├── config/                        # Configuration files
│   └── pipeline.conf             # API and DB credentials
├── logs/                         # Airflow logs (auto-generated)
├── docker-compose.yaml           # Docker services configuration
├── main.ipynb                    # Development notebook
└── README.md                     # This file

🔌 API Documentation

Transport API Endpoints

The pipeline integrates with the Buenos Aires Transport API:

Bus Data

  • Endpoint: /colectivos/vehiclePositionsSimple
  • Parameters: agency_id (9, 145, 155)
  • Data: Real-time bus positions, routes, speeds

EcoBici Data

  • Endpoint: /ecobici/gbfs/stationInformation
  • Data: Station locations, capacities, neighborhoods
  • Endpoint: /ecobici/gbfs/stationStatus
  • Data: Bike availability, station status

Subway Data

  • Endpoint: /subtes/alerts
  • Data: Service alerts, line status

Data Sources

  • Primera Junta (agency_id: 145)
  • La Nueva Metropol (agency_id: 9)
  • TALP (agency_id: 155)
  • EcoBici Stations (filtered by neighborhoods)

🗄️ Database Schema

Redshift Tables

agencies

CREATE TABLE agencies (
    agency_id INTEGER,
    agency_name VARCHAR(100)
);

bus_positions

CREATE TABLE bus_positions (
    id INTEGER,
    agency_id INTEGER,
    route_id INTEGER,
    latitude NUMERIC,
    longitude NUMERIC,
    speed NUMERIC,
    timestamp TIMESTAMP,
    route_short_name VARCHAR,
    trip_headsign VARCHAR
);

ecobici_stations

CREATE TABLE ecobici_stations (
    station_id INTEGER,
    name VARCHAR,
    address VARCHAR,
    capacity INTEGER,
    lat NUMERIC,
    lon NUMERIC,
    neighborhood VARCHAR
);

ecobici_stations_status

CREATE TABLE ecobici_stations_status (
    station_id INTEGER,
    num_bikes_available_mechanical INTEGER,
    num_bikes_available_ebike INTEGER,
    num_bikes_available INTEGER,
    num_bikes_disabled INTEGER,
    status VARCHAR,
    last_reported TIMESTAMP
);

📧 Monitoring & Alerts

Email Alert System

  • SMTP Server: Gmail SMTP
  • Frequency: Configurable (currently every minute)
  • Content: Pipeline status, data quality alerts, error notifications

Alert Types

  • Pipeline Failures: DAG execution errors
  • Data Quality Issues: Missing or invalid data
  • API Connection Problems: Network or authentication issues
  • Database Errors: Connection or query failures

Configuration

# Email settings in scripts/email_utils.py
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587
SENDER_EMAIL = 'your_email@gmail.com'
RECIPIENT_EMAIL = 'your_email@gmail.com'

📄 License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.


Note: This project is designed for educational and development purposes. For production use, additional security measures, error handling, and monitoring should be implemented.