Skip to content

goth-coder/stream-bit

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

2 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

โ‚ฟ Stream-Bit: Near Real-Time Bitcoin Data Pipeline

Python 3.8+ Flask AWS

๐ŸŽฏ Overview

Professional real-time data pipeline for Bitcoin, combining Data Engineering, Cloud Computing, and Web Development. The project implements data streaming, data lake storage, advanced analytics, and a responsive web dashboard with live updates.

๐Ÿš€ Demo: Dashboard with real-time synchronization between Current Price, Chart, and Statistics

๐Ÿ† Key Achievements

  • โœ… Real-time Dashboard with Server-Sent Events + Chart.js
  • โœ… Complete AWS Pipeline (Firehose โ†’ S3 โ†’ Athena)
  • โœ… Intelligent Caching with TTL optimized by query type
  • โœ… Scalable MVC Architecture with clean separation of concerns
  • โœ… Smart Synchronization - updates only on real price changes

๐Ÿ—๏ธ Technical Architecture

High-Level Architecture

graph LR
    A[CoinGecko API] --> B[Bitcoin Extractor]
    B --> C[Firehose Stream]
    C --> D[S3 Data Lake]
    D --> E[Athena Analytics]
    E --> F[Flask API + Cache]
    F --> G[Real-time Dashboard]
    
    H[User] --> G
    G --> H
Loading

MVC-ETL Structure

src/
โ”œโ”€โ”€ controllers/                 # ๐ŸŽฎ Orchestration Layer
โ”‚   โ”œโ”€โ”€ streaming_controller.py     # Stream pipeline coordination
โ”‚   โ””โ”€โ”€ web/api_controller.py       # REST API endpoints + SSE
โ”œโ”€โ”€ models/                      # ๐Ÿ“Š Data & Config Layer  
โ”‚   โ”œโ”€โ”€ config.py                   # Centralized configuration
โ”‚   โ””โ”€โ”€ data_schemas.py             # Pydantic data schemas
โ”œโ”€โ”€ views/                       # ๐Ÿ–ฅ๏ธ Presentation Layer
โ”‚   โ””โ”€โ”€ web/                        # Web interface
โ”‚       โ”œโ”€โ”€ templates/              # Jinja2 HTML templates
โ”‚       โ”œโ”€โ”€ static/js/              # JavaScript (Chart.js + SSE)
โ”‚       โ””โ”€โ”€ static/css/             # Custom CSS styling
โ””โ”€โ”€ services/                    # โš™๏ธ Business Logic Layer
    โ”œโ”€โ”€ extractors/                 # Data extraction services
    โ”‚   โ””โ”€โ”€ bitcoin_extractor.py    # CoinGecko API integration
    โ”œโ”€โ”€ loaders/                    # Data loading services
    โ”‚   โ””โ”€โ”€ firehose_loader.py      # AWS Firehose streaming
    โ””โ”€โ”€ web/                        # Web-specific services
        โ”œโ”€โ”€ cache_service.py        # TTL cache management
        โ””โ”€โ”€ athena_service.py       # AWS Athena queries

โšก Quick Start

1. Prerequisites

# Python 3.8+ (recommended 3.11+)
python --version

# AWS CLI configured (optional - for advanced features)
aws configure list

2. Installation

# Clone repository
git clone <repo-url>
cd stream-bit

# Option 1: uv (recommended - faster)
uv sync

# Option 2: traditional pip
pip install -r requirements.txt

3. Configuration

# Copy environment file
cp .env.example .env

# Edit configurations (optional)
# AWS_REGION=us-east-1
# FLASK_DEBUG=True

4. Run

๐ŸŒ Web Dashboard (Main)

python app.py --mode web --port 8080

Access:

๐Ÿ“ก Streaming Pipeline

# Continuous streaming to AWS
python app.py --mode stream

# Single test (demo)
python app.py --mode test

๐ŸŒŸ Features and Highlights

๐ŸŽฏ Real-time Dashboard

  • ๐Ÿ“Š Live Chart: Chart.js with smart updates (only on price changes)
  • โšก Server-Sent Events: Data streaming with automatic fallback to polling
  • ๐Ÿ“ˆ Dynamic Statistics: Automatic sync between Current Price, Chart, and Statistics

โš™๏ธ Data Pipeline

  • ๐Ÿ”„ Continuous Extraction: CoinGecko API with retry logic and rate limiting
  • โ˜๏ธ AWS Streaming: Kinesis Firehose for robust ingestion
  • ๐Ÿ—„๏ธ Data Lake: S3 with automatic Hive-style partitioning
  • ๐Ÿ” Analytics: AWS Athena + Glue for optimized SQL queries
  • ๐Ÿ“ฆ Format Conversion: Automatic JSON โ†’ Parquet via Firehose

๐Ÿง  Intelligent Architecture

  • โšก TTL Cache: Optimized by type
  • ๐Ÿ”€ Synchronization: Coordinated updates between all components
  • ๐Ÿ“Š Performance: Sliding window (150 points) for fluid charts

๐Ÿ”’ Quality and Reliability

  • โš ๏ธ Error Handling: Comprehensive error pages and API responses
  • ๐Ÿ“ Structured Logging: Structured logging with appropriate levels
  • ๐Ÿ“Š Monitoring: Health checks and status page with system metrics

๐Ÿ’ป Detailed Tech Stack

๐Ÿ Backend Core

Technology Version Purpose
Python 3.8+ Main language
Flask 2.3+ Web framework + REST API
TTLCache 5.3+ In-memory cache with TTL
Pydantic 2.0+ Data validation and schemas
asyncio Built-in Async operations

โ˜๏ธ Cloud & Data

Service Functionality
AWS Kinesis Firehose Robust streaming pipeline
AWS S3 Data lake with Hive partitioning
AWS Athena Serverless query engine
AWS Glue Data catalog + format conversion
CoinGecko API Bitcoin data source

๐ŸŽจ Frontend & UI

Technology Version Purpose
Bootstrap 5.3 Responsive CSS framework
Chart.js 4.4 Interactive charts
Server-Sent Events HTML5 Real-time updates
Jinja2 3.1+ Template engine
Vanilla JavaScript ES6+ DOM manipulation

๏ฟฝ Development & Quality

Tool Functionality
structlog Structured logging
mypy Static type checking

๐ŸŽฏ Use Cases and Examples

๐Ÿ‘จโ€๐Ÿ’ป For Developers

# Development mode with hot-reload
python app.py --mode web --port 8080 --debug

# Application health check  
curl http://localhost:8080/api/health
# Response: {"status": "healthy", "timestamp": "2025-09-16T10:30:00Z"}

# Config page (debug info)
curl http://localhost:8080/config

๐Ÿ“Š For Data Analysis

# Latest Bitcoin price
curl http://localhost:8080/api/bitcoin/latest
# Response: {"price_brl": 617094.0, "timestamp": "2025-09-16T10:30:00Z"}

# Historical hourly data (last 24h)
curl http://localhost:8080/api/bitcoin/hourly?hours=24

# Statistics by period
curl "http://localhost:8080/api/bitcoin/statistics?hours=6"
# Response: {"avg": 615000, "min": 610000, "max": 620000, "count": 360}

๐Ÿ” For Monitoring

# Complete status page
curl http://localhost:8080/status

# Cache metrics
curl http://localhost:8080/api/cache/stats
# Response: {"hits": 245, "misses": 12, "hit_rate": 95.3}

๐Ÿ“ก For DevOps

# Stream mode (production)
python app.py --mode stream --log-level INFO

# Test mode (validation)  
python app.py --mode test

๐Ÿš€ Performance and Metrics

โšก Performance Benchmarks

Metric Value Observation
API Latency <200ms TTL cache optimized
Stream Throughput 100+ req/min CoinGecko rate limits
Chart Update <50ms Only real changes (>R$ 0.01)
Memory Usage ~50MB TTL cache + sliding window

๐ŸŽฏ Implemented Optimizations

  • โœ… Smart Caching: TTL differentiated by query type
  • โœ… Sliding Window: Maximum 150 points in chart for fluidity
  • โœ… Price Change Detection: Only updates on changes >R$ 0.01
  • โœ… Async Operations: ConcurrentFutures for I/O operations
  • โœ… Query Optimization: Partition projection in Athena

๐Ÿ“š Complete Documentation

Document Content
README.md Setup, features, and usage
API_DOCUMENTATION.md Endpoints and examples

๐Ÿ’ก Tip: Use python app.py --help to see all available options

Developed with โค๏ธ to demonstrate competencies in Data Engineering, Cloud Computing, and APIs

About

Real-time Bitcoin Data Pipeline with AWS, Athena, Flask and SSE Dashboard

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors