This document analyzes the BigQuery/Dataproc notebook pipeline for NYC TLC taxi data and proposes optimizations plus a static web dashboard architecture that serves pre-computed data from GCS buckets—no server-side code required for viewing.
| Item | Status |
|---|---|
pipeline_utils package |
Done |
| Stage 05: Export to GCS | Done |
| Static HTML/JS dashboard | Done |
| Unified Bronze→Silver (02) | Done |
| Incremental processing | Done |
| Cloud Composer DAG | Done (config/gcp/) |
| GCS static hosting | Manual setup |
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ CURRENT PIPELINE (BigQuery / Dataproc) │
└─────────────────────────────────────────────────────────────────────────────────────┘
[TLC URLs] [GCS Raw] [BigQuery] [BigQuery] [BigQuery] [Gradio]
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
00_AutoIngest 01_GCS→Bronze 02a-d Bronze→ 03 CleanSilver→ 04a Fleet Rec FinalGradio
TLCFiles (Spark) CleanSilver PreMlGold 04b Anomalies Dashboard
│ │ │ │ │
│ │ │ │ │
nyc_raw_data RawBronze CleanSilver PreMlGold PostMlGold Live BQ
_bucket dataset dataset dataset dataset queries
| Stage | File | Purpose | Compute |
|---|---|---|---|
| 00 | 00_AutoIngestTLCFiles |
Download parquet from TLC URLs → GCS | Cloud Run / VM |
| 01 | 01_GCStoBronzeIngestion |
GCS parquet → BigQuery RawBronze | Dataproc Spark |
| 02a–d | 02*RawBronzeToCleanSilver |
Clean/transform per taxi type | Dataproc Spark |
| 03 | 03CleanSilverToPreML |
Aggregate to daily/hourly/hotspot | Dataproc Spark |
| 04a | 04aXGBostFleetRecommender |
XGBoost fleet mix predictions | Vertex AI / VM |
| 04b | 04bAnomalies |
Autoencoder anomaly detection | Vertex AI / VM |
| — | LoadInvestigate files |
Exploration, zone lookup, GeoPandas | Ad-hoc |
| — | FinalGradioDashboard |
Interactive dashboard | Requires Python server |
| Issue | Impact |
|---|---|
| Gradio requires live server | Dashboard needs Python + Gradio + BigQuery client running |
| No orchestration | Manual notebook execution, no scheduling |
| Duplicated Spark retry logic | Same get_spark_with_retry() in 5+ notebooks |
| No materialized exports | All dashboard data fetched from BQ on every interaction |
| No static assets | No pre-built JSON/GeoJSON for static web consumption |
| Heavy dependencies | Prophet, XGBoost, Keras, Gradio all loaded for dashboard |
Create a shared Python package or module used by all notebooks:
pipeline_utils/
├── __init__.py
├── spark_utils.py # get_spark_with_retry, quota handling
├── bq_utils.py # list_partitions, schema helpers
├── gcs_utils.py # list_blobs, download helpers
└── config.py # PROJECT_ID, BUCKET_NAME, TAXI_TYPES, etc.
Benefits: Single source of truth, easier maintenance, consistent retry behavior.
Merge 02a, 02b, 02c, 02d into a single parameterized notebook or script:
- Input:
taxi_type(yellow, green, fhv, fhvhv) - Reuse schema map and transformation logic
- Run via loop or parallel Dataproc jobs
Benefits: Less duplication, easier to add new taxi types.
- Track last processed
year_monthper taxi type in a metadata table or GCS object - Skip already-processed partitions
- Support idempotent re-runs
For lightweight aggregations (e.g., daily rollups from hourly), consider:
- BigQuery scheduled queries
- Materialized views
- Reduces Dataproc usage for simple SQL transforms
┌────────────────────────────────────────────────────────────────────────────┐
│ ORCHESTRATED PIPELINE (Cloud Composer) │
└────────────────────────────────────────────────────────────────────────────┘
DAG: nyc_taxi_pipeline (daily/weekly)
[Trigger] → [00_Ingest] → [01_Bronze] → [02_Silver] → [03_PreML] → [04_ML] → [05_Export]
│
▼
GCS dashboard bucket
DAG Tasks (example):
| Task | Type | Description |
|---|---|---|
check_new_data |
BigQueryOperator | Detect new months in GCS not yet in Bronze |
ingest_to_gcs |
PythonOperator / Cloud Run | Run 00 notebook logic |
gcs_to_bronze |
DataprocSubmitJobOperator | Submit Spark job for 01 |
bronze_to_silver |
DataprocSubmitJobOperator | Submit Spark jobs for 02a–d |
silver_to_preml |
DataprocSubmitJobOperator | Submit Spark job for 03 |
run_fleet_recommender |
VertexAITrainingJob / PythonOperator | Run 04a |
run_anomaly_detection |
VertexAITrainingJob / PythonOperator | Run 04b |
export_dashboard_data |
PythonOperator | Export JSON/GeoJSON to GCS (see §3) |
Alternative: Cloud Workflows — Simpler, good for linear pipelines; less flexible than Airflow for complex dependencies.
Serve an optimized dashboard from a static website (HTML/JS) that reads pre-computed data from GCS—no server-side Python, no BigQuery at runtime.
Add a Stage 05: Export Dashboard Data that runs after ML pipelines complete.
gs://nyc_dashboard_bucket/
├── index.html # Static dashboard SPA
├── assets/
│ ├── app.js
│ └── styles.css
├── data/
│ ├── time_series/
│ │ ├── yellow_daily.json
│ │ ├── yellow_hourly.json
│ │ ├── green_daily.json
│ │ └── ...
│ ├── forecasts/
│ │ ├── yellow_forecast_YYYYMMDD.json
│ │ └── ...
│ ├── anomalies/
│ │ ├── yellow_anomalies.json
│ │ └── ...
│ ├── fleet/
│ │ ├── yellow_fleet_YYYYMMDD.json
│ │ └── ...
│ ├── hotspots/
│ │ ├── yellow_2024_01_hotspots.json
│ │ └── ...
│ └── metadata/
│ ├── available_partitions.json
│ └── last_updated.json
└── geo/
└── taxi_zones.geojson # Zone boundaries (from LoadInvestigate)
# Pseudocode for export_dashboard_data()
def export_dashboard_data():
# 1. Query BigQuery for latest partitions
partitions = get_available_partitions_from_bq()
# 2. Export time series (daily/hourly) per taxi type
for taxi_type in TAXI_TYPES:
df = bq_client.query(f"SELECT * FROM PreMlGold.{taxi_type}_*_daily").to_dataframe()
df.to_json(f"gs://nyc_dashboard_bucket/data/time_series/{taxi_type}_daily.json")
# 3. Export anomalies from PostMlGold
for taxi_type in TAXI_TYPES:
df = bq_client.query(f"SELECT * FROM PostMlGold.{taxi_type}_anomalies").to_dataframe()
df.to_json(f"gs://nyc_dashboard_bucket/data/anomalies/{taxi_type}_anomalies.json")
# 4. Export fleet recommendations
# 5. Export taxi_zones GeoJSON (one-time or on zone update)
# 6. Write metadata (last_updated, available_partitions)| Component | Technology |
|---|---|
| Frontend | Vanilla JS + Plotly.js (or Chart.js) for charts |
| Maps | Leaflet.js + GeoJSON (taxi zones) |
| Data | Fetch from GCS via public URLs or signed URLs |
| Hosting | Cloud Storage static website or Firebase Hosting |
- Create bucket:
nyc_dashboard_bucket - Enable static website hosting
- Set main page:
index.html - Set CORS if needed for cross-origin fetch
- Make bucket publicly readable (or use signed URLs for private data)
gsutil cors set cors.json gs://nyc_dashboard_bucket
gsutil -m cp -r dist/* gs://nyc_dashboard_bucket/- Host static files on Firebase
- Fetch data from GCS (with appropriate CORS and access rules)
- Optional: Cloud CDN for faster global delivery
| Feature | Current (Gradio) | Static (Optimized) |
|---|---|---|
| Time series explorer | Live BQ query | Pre-exported JSON |
| ML forecasting | On-demand XGBoost | Pre-computed forecasts (scheduled) |
| Anomaly detection | Live from BQ | Pre-exported anomaly JSON |
| Fleet recommendations | Live from BQ | Pre-exported fleet JSON |
| Zone maps | Live GeoJSON from BQ/GCS | Static GeoJSON in bucket |
Trade-off: Static dashboard shows last exported state (e.g., daily refresh). Real-time is sacrificed for zero server cost and instant load.
| Component | GCP Service | Notes |
|---|---|---|
| Raw file storage | Cloud Storage (nyc_raw_data_bucket) |
Already in use |
| Data warehouse | BigQuery | RawBronze, CleanSilver, PreMlGold, PostMlGold |
| Batch processing | Dataproc (Spark) | 01, 02a–d, 03 |
| ML training | Vertex AI / Compute Engine | 04a, 04b (or keep on Dataproc) |
| Orchestration | Cloud Composer (Airflow) | Recommended |
| Export job | Cloud Functions / Cloud Run | Lightweight Python export |
| Static dashboard | Cloud Storage static site | Or Firebase Hosting |
| Optional API | Cloud Run | If you need a thin REST API for dynamic filters |
- Extract
pipeline_utils— Spark retry, config, BQ helpers - Add export script — BigQuery → JSON to GCS for time series + anomalies
- Create minimal static HTML — Single page with Plotly.js, fetch from GCS
- Enable GCS static hosting — Serve
index.htmlfrom bucket
- Convert notebooks to scripts —
.pyentry points for each stage - Create Cloud Composer DAG — Chain 00 → 01 → 02 → 03 → 04 → export
- Schedule DAG — Daily or weekly based on TLC update frequency
- Build full dashboard UI — Tabs for Explorer, Forecasts, Anomalies, Fleet
- Export all required datasets — Forecasts, fleet, hotspots, metadata
- Add Leaflet map — Zone choropleth from GeoJSON
- Optimize exports — Compress JSON, consider Parquet for large datasets
- Right-size Dataproc — Ephemeral clusters, preemptible workers
- BigQuery partitioning — Ensure tables partitioned by date
- CDN for static assets — Cloud CDN in front of GCS bucket
| Service | Current | Optimized |
|---|---|---|
| BigQuery | Pay per query | Same; reduce ad-hoc queries via exports |
| Dataproc | Per cluster hour | Ephemeral clusters, auto-scaling |
| Gradio server | VM/Cloud Run always-on | Eliminated — static only |
| GCS | Storage + egress | Minimal for JSON exports |
| Cloud Composer | N/A | Base fee; consider Workflows for simpler pipelines |
Static dashboard benefit: No compute cost for serving; only storage and egress for JSON files.
Cost estimation: Run a test-phase batch (e.g. 2 specific months of data) end-to-end and monitor billing before scaling to full production.
- Public bucket: Only if data is non-sensitive (aggregated taxi stats are typically public)
- Private bucket: Use signed URLs or IAM; frontend would need a small proxy (Cloud Function) to generate URLs
- CORS: Configure
cors.jsonon bucket for web fetch
- Create
pipeline_utilspackage - Add Stage 05: Export dashboard data to GCS
- Build static HTML/JS dashboard with Plotly
- Enable GCS static website hosting (manual)
- Create Cloud Composer DAG (config/gcp/composer_dag.py)
- Schedule pipeline (deploy DAG to Composer)
- Document runbook — see docs/gcp/IMPLEMENTATION_GUIDE.md
Optimized pipeline implemented in pipeline/ and pipeline_utils/. Legacy notebooks in legacy/.