Skip to content

Latest commit

 

History

History
335 lines (248 loc) · 13.8 KB

File metadata and controls

335 lines (248 loc) · 13.8 KB

NYC Taxi Pipeline — GCP Optimization & Static Dashboard Plan

Executive Summary

This document analyzes the BigQuery/Dataproc notebook pipeline for NYC TLC taxi data and proposes optimizations plus a static web dashboard architecture that serves pre-computed data from GCS buckets—no server-side code required for viewing.

Implementation Status

Item Status
pipeline_utils package Done
Stage 05: Export to GCS Done
Static HTML/JS dashboard Done
Unified Bronze→Silver (02) Done
Incremental processing Done
Cloud Composer DAG Done (config/gcp/)
GCS static hosting Manual setup

1. Current Pipeline Architecture (As-Is)

1.1 Data Flow Overview

┌─────────────────────────────────────────────────────────────────────────────────────┐
│                        CURRENT PIPELINE (BigQuery / Dataproc)                        │
└─────────────────────────────────────────────────────────────────────────────────────┘

  [TLC URLs]     [GCS Raw]      [BigQuery]       [BigQuery]       [BigQuery]       [Gradio]
      │              │              │                 │                 │              │
      ▼              ▼              ▼                 ▼                 ▼              ▼
  00_AutoIngest  01_GCS→Bronze  02a-d Bronze→  03 CleanSilver→  04a Fleet Rec   FinalGradio
  TLCFiles       (Spark)        CleanSilver     PreMlGold       04b Anomalies   Dashboard
                     │              │                 │                 │              │
                     │              │                 │                 │              │
  nyc_raw_data   RawBronze     CleanSilver      PreMlGold       PostMlGold     Live BQ
  _bucket        dataset       dataset         dataset         dataset        queries

1.2 Notebook Inventory

Stage File Purpose Compute
00 00_AutoIngestTLCFiles Download parquet from TLC URLs → GCS Cloud Run / VM
01 01_GCStoBronzeIngestion GCS parquet → BigQuery RawBronze Dataproc Spark
02a–d 02*RawBronzeToCleanSilver Clean/transform per taxi type Dataproc Spark
03 03CleanSilverToPreML Aggregate to daily/hourly/hotspot Dataproc Spark
04a 04aXGBostFleetRecommender XGBoost fleet mix predictions Vertex AI / VM
04b 04bAnomalies Autoencoder anomaly detection Vertex AI / VM
LoadInvestigate files Exploration, zone lookup, GeoPandas Ad-hoc
FinalGradioDashboard Interactive dashboard Requires Python server

1.3 Current Pain Points

Issue Impact
Gradio requires live server Dashboard needs Python + Gradio + BigQuery client running
No orchestration Manual notebook execution, no scheduling
Duplicated Spark retry logic Same get_spark_with_retry() in 5+ notebooks
No materialized exports All dashboard data fetched from BQ on every interaction
No static assets No pre-built JSON/GeoJSON for static web consumption
Heavy dependencies Prophet, XGBoost, Keras, Gradio all loaded for dashboard

2. Optimization Plan

2.1 Pipeline Structure Optimizations

A. Extract Shared Utilities

Create a shared Python package or module used by all notebooks:

pipeline_utils/
├── __init__.py
├── spark_utils.py      # get_spark_with_retry, quota handling
├── bq_utils.py         # list_partitions, schema helpers
├── gcs_utils.py        # list_blobs, download helpers
└── config.py           # PROJECT_ID, BUCKET_NAME, TAXI_TYPES, etc.

Benefits: Single source of truth, easier maintenance, consistent retry behavior.

B. Consolidate Bronze→Silver Notebooks

Merge 02a, 02b, 02c, 02d into a single parameterized notebook or script:

  • Input: taxi_type (yellow, green, fhv, fhvhv)
  • Reuse schema map and transformation logic
  • Run via loop or parallel Dataproc jobs

Benefits: Less duplication, easier to add new taxi types.

C. Add Incremental Processing Flags

  • Track last processed year_month per taxi type in a metadata table or GCS object
  • Skip already-processed partitions
  • Support idempotent re-runs

D. Use BigQuery Scheduled Queries Where Possible

For lightweight aggregations (e.g., daily rollups from hourly), consider:

  • BigQuery scheduled queries
  • Materialized views
  • Reduces Dataproc usage for simple SQL transforms

2.2 GCP Orchestration

Recommended: Cloud Composer (Airflow) or Cloud Workflows

┌────────────────────────────────────────────────────────────────────────────┐
│                    ORCHESTRATED PIPELINE (Cloud Composer)                    │
└────────────────────────────────────────────────────────────────────────────┘

  DAG: nyc_taxi_pipeline (daily/weekly)

  [Trigger] → [00_Ingest] → [01_Bronze] → [02_Silver] → [03_PreML] → [04_ML] → [05_Export]
                                                                                      │
                                                                                      ▼
                                                                            GCS dashboard bucket

DAG Tasks (example):

Task Type Description
check_new_data BigQueryOperator Detect new months in GCS not yet in Bronze
ingest_to_gcs PythonOperator / Cloud Run Run 00 notebook logic
gcs_to_bronze DataprocSubmitJobOperator Submit Spark job for 01
bronze_to_silver DataprocSubmitJobOperator Submit Spark jobs for 02a–d
silver_to_preml DataprocSubmitJobOperator Submit Spark job for 03
run_fleet_recommender VertexAITrainingJob / PythonOperator Run 04a
run_anomaly_detection VertexAITrainingJob / PythonOperator Run 04b
export_dashboard_data PythonOperator Export JSON/GeoJSON to GCS (see §3)

Alternative: Cloud Workflows — Simpler, good for linear pipelines; less flexible than Airflow for complex dependencies.


3. Static Dashboard Architecture

3.1 Goal

Serve an optimized dashboard from a static website (HTML/JS) that reads pre-computed data from GCS—no server-side Python, no BigQuery at runtime.

3.2 Data Export Pipeline (New Stage)

Add a Stage 05: Export Dashboard Data that runs after ML pipelines complete.

Export Targets (GCS Bucket Structure)

gs://nyc_dashboard_bucket/
├── index.html                 # Static dashboard SPA
├── assets/
│   ├── app.js
│   └── styles.css
├── data/
│   ├── time_series/
│   │   ├── yellow_daily.json
│   │   ├── yellow_hourly.json
│   │   ├── green_daily.json
│   │   └── ...
│   ├── forecasts/
│   │   ├── yellow_forecast_YYYYMMDD.json
│   │   └── ...
│   ├── anomalies/
│   │   ├── yellow_anomalies.json
│   │   └── ...
│   ├── fleet/
│   │   ├── yellow_fleet_YYYYMMDD.json
│   │   └── ...
│   ├── hotspots/
│   │   ├── yellow_2024_01_hotspots.json
│   │   └── ...
│   └── metadata/
│       ├── available_partitions.json
│       └── last_updated.json
└── geo/
    └── taxi_zones.geojson      # Zone boundaries (from LoadInvestigate)

Export Script Logic (Python, run as Cloud Function or Composer task)

# Pseudocode for export_dashboard_data()
def export_dashboard_data():
    # 1. Query BigQuery for latest partitions
    partitions = get_available_partitions_from_bq()
    
    # 2. Export time series (daily/hourly) per taxi type
    for taxi_type in TAXI_TYPES:
        df = bq_client.query(f"SELECT * FROM PreMlGold.{taxi_type}_*_daily").to_dataframe()
        df.to_json(f"gs://nyc_dashboard_bucket/data/time_series/{taxi_type}_daily.json")
    
    # 3. Export anomalies from PostMlGold
    for taxi_type in TAXI_TYPES:
        df = bq_client.query(f"SELECT * FROM PostMlGold.{taxi_type}_anomalies").to_dataframe()
        df.to_json(f"gs://nyc_dashboard_bucket/data/anomalies/{taxi_type}_anomalies.json")
    
    # 4. Export fleet recommendations
    # 5. Export taxi_zones GeoJSON (one-time or on zone update)
    # 6. Write metadata (last_updated, available_partitions)

3.3 Static Web Dashboard Stack

Component Technology
Frontend Vanilla JS + Plotly.js (or Chart.js) for charts
Maps Leaflet.js + GeoJSON (taxi zones)
Data Fetch from GCS via public URLs or signed URLs
Hosting Cloud Storage static website or Firebase Hosting

GCS Static Website Setup

  1. Create bucket: nyc_dashboard_bucket
  2. Enable static website hosting
  3. Set main page: index.html
  4. Set CORS if needed for cross-origin fetch
  5. Make bucket publicly readable (or use signed URLs for private data)
gsutil cors set cors.json gs://nyc_dashboard_bucket
gsutil -m cp -r dist/* gs://nyc_dashboard_bucket/

Alternative: Firebase Hosting

  • Host static files on Firebase
  • Fetch data from GCS (with appropriate CORS and access rules)
  • Optional: Cloud CDN for faster global delivery

3.4 Dashboard Features (Static Version)

Feature Current (Gradio) Static (Optimized)
Time series explorer Live BQ query Pre-exported JSON
ML forecasting On-demand XGBoost Pre-computed forecasts (scheduled)
Anomaly detection Live from BQ Pre-exported anomaly JSON
Fleet recommendations Live from BQ Pre-exported fleet JSON
Zone maps Live GeoJSON from BQ/GCS Static GeoJSON in bucket

Trade-off: Static dashboard shows last exported state (e.g., daily refresh). Real-time is sacrificed for zero server cost and instant load.


4. GCP Service Mapping

Component GCP Service Notes
Raw file storage Cloud Storage (nyc_raw_data_bucket) Already in use
Data warehouse BigQuery RawBronze, CleanSilver, PreMlGold, PostMlGold
Batch processing Dataproc (Spark) 01, 02a–d, 03
ML training Vertex AI / Compute Engine 04a, 04b (or keep on Dataproc)
Orchestration Cloud Composer (Airflow) Recommended
Export job Cloud Functions / Cloud Run Lightweight Python export
Static dashboard Cloud Storage static site Or Firebase Hosting
Optional API Cloud Run If you need a thin REST API for dynamic filters

5. Implementation Phases

Phase 1: Quick Wins (1–2 weeks)

  1. Extract pipeline_utils — Spark retry, config, BQ helpers
  2. Add export script — BigQuery → JSON to GCS for time series + anomalies
  3. Create minimal static HTML — Single page with Plotly.js, fetch from GCS
  4. Enable GCS static hosting — Serve index.html from bucket

Phase 2: Orchestration (2–3 weeks)

  1. Convert notebooks to scripts.py entry points for each stage
  2. Create Cloud Composer DAG — Chain 00 → 01 → 02 → 03 → 04 → export
  3. Schedule DAG — Daily or weekly based on TLC update frequency

Phase 3: Full Static Dashboard (2–3 weeks)

  1. Build full dashboard UI — Tabs for Explorer, Forecasts, Anomalies, Fleet
  2. Export all required datasets — Forecasts, fleet, hotspots, metadata
  3. Add Leaflet map — Zone choropleth from GeoJSON
  4. Optimize exports — Compress JSON, consider Parquet for large datasets

Phase 4: Cost & Performance Tuning

  1. Right-size Dataproc — Ephemeral clusters, preemptible workers
  2. BigQuery partitioning — Ensure tables partitioned by date
  3. CDN for static assets — Cloud CDN in front of GCS bucket

6. Cost Considerations

Service Current Optimized
BigQuery Pay per query Same; reduce ad-hoc queries via exports
Dataproc Per cluster hour Ephemeral clusters, auto-scaling
Gradio server VM/Cloud Run always-on Eliminated — static only
GCS Storage + egress Minimal for JSON exports
Cloud Composer N/A Base fee; consider Workflows for simpler pipelines

Static dashboard benefit: No compute cost for serving; only storage and egress for JSON files.

Cost estimation: Run a test-phase batch (e.g. 2 specific months of data) end-to-end and monitor billing before scaling to full production.


7. Security Notes

  • Public bucket: Only if data is non-sensitive (aggregated taxi stats are typically public)
  • Private bucket: Use signed URLs or IAM; frontend would need a small proxy (Cloud Function) to generate URLs
  • CORS: Configure cors.json on bucket for web fetch

8. Summary Checklist

  • Create pipeline_utils package
  • Add Stage 05: Export dashboard data to GCS
  • Build static HTML/JS dashboard with Plotly
  • Enable GCS static website hosting (manual)
  • Create Cloud Composer DAG (config/gcp/composer_dag.py)
  • Schedule pipeline (deploy DAG to Composer)
  • Document runbook — see docs/gcp/IMPLEMENTATION_GUIDE.md

Optimized pipeline implemented in pipeline/ and pipeline_utils/. Legacy notebooks in legacy/.