Requires Rust basics (ownership, structs, enums, functions, basic error handling).
By the end, students will have built a production-ready stock price aggregator that:
- Fetches prices from multiple APIs every minute
- Processes multiple platforms concurrently
- Stores data in PostgreSQL
- Handles graceful shutdown with signal handling
- Uses structured logging with
tracing
Building Block: Understanding async foundations
-
Concepts:
- What is async/await in Rust? (Future-based execution, cooperative multitasking)
- Why we need a runtime like Tokio
- Difference between sync vs async IO
- How this applies to fetching stock prices
-
Demo Code: Minimal async example with
tokio::main
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Starting stock price simulator...");
let fetch_price = async {
sleep(Duration::from_millis(500)).await;
println!("AAPL: $150.25");
};
fetch_price.await;
}- Exercise:
- Create an async function
fetch_mock_price(symbol: &str) -> f64that sleeps for 500ms and returns a random price - Call it for 3 different stock symbols sequentially
- Observe the total time taken
- Create an async function
When installing tokio, enable all features:
tokio = { version = "1.47.1", features = ["full"] }
Building Block: Fetching real stock prices from multiple sources
-
Concepts:
- Using
reqwestfor async HTTP requests - Parsing JSON with
serde - Error handling with
Resultand? - Running multiple API calls in parallel with
tokio::join!
- Using
-
Demo Code: Fetch stock price from Alpha Vantage API
use reqwest;
use serde::Deserialize;
use std::env;
#[derive(Deserialize, Debug)]
struct GlobalQuote {
#[serde(rename = "Global Quote")]
quote: Quote,
}
#[derive(Deserialize, Debug)]
struct Quote {
#[serde(rename = "01. symbol")]
symbol: String,
#[serde(rename = "05. price")]
price: String,
}
async fn fetch_alpha_vantage(symbol: &str) -> Result<f64, Box<dyn std::error::Error>> {
let api_key = env::var("ALPHA_VANTAGE_KEY")?;
let url = format!(
"https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol={}&apikey={}",
symbol, api_key
);
let resp = reqwest::get(&url)
.await?
.json::<GlobalQuote>()
.await?;
Ok(resp.quote.price.parse()?)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let price = fetch_alpha_vantage("AAPL").await?;
println!("AAPL price: ${:.2}", price);
Ok(())
}- Exercise:
- Create a second function
fetch_finnhub(symbol: &str)for Finnhub API - Create a struct
StockPrice { symbol: String, price: f64, source: String, timestamp: i64 } - Fetch the same stock from both APIs in parallel using
tokio::join! - Compare the results
- Create a second function
When installing reqwest, enable json feature:
reqwest = { version = "0.12.23", features = ["json"] }
Building Block: Persisting stock prices
-
Concepts:
- Using
sqlxfor async database operations - Database connection pooling
- SQL migrations
- Inserting data efficiently
- Using
-
Setup: Create database and table
CREATE TABLE stock_prices (
id SERIAL PRIMARY KEY,
symbol VARCHAR(10) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
source VARCHAR(50) NOT NULL,
timestamp BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_symbol_timestamp ON stock_prices(symbol, timestamp DESC);- Demo Code: Connect to PostgreSQL and insert prices
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
async fn save_price(pool: &PgPool, price: &StockPrice) -> Result<(), sqlx::Error> {
// TODO: use sqlx::query to insert the new price
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgresql://user:password@localhost/stockdb")
.await?;
let price = StockPrice {
symbol: "AAPL".to_string(),
price: 150.25,
source: "alpha_vantage".to_string(),
timestamp: chrono::Utc::now().timestamp(),
};
save_price(&pool, &price).await?;
println!("Saved price to database");
Ok(())
}- Exercise:
- Modify the code to save the price in database
- Fetch prices from 2 APIs for 3 stocks and save all 6 results to the database
- Query the database to verify the data was saved
Building Block: Putting it all together
-
Concepts:
- Using
tokio::time::intervalfor periodic tasks tokio::select!for handling multiple concurrent operations- Graceful shutdown with
tokio::signal::ctrl_c() - Structured logging with
tracing - Proper cleanup of resources
- Using
-
Demo Code: Complete stock price aggregator
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use tokio::time::{interval, Duration};
use tokio::signal;
use tracing::{info, error, instrument};
use chrono::Utc;
#[derive(Debug, Clone)]
struct StockPrice {
symbol: String,
price: f64,
source: String,
timestamp: i64,
}
struct Config {
symbols: Vec<String>,
sources: Vec<String>,
database_url: String,
}
#[instrument(skip(pool))]
async fn save_price(pool: &PgPool, price: &StockPrice) -> Result<(), sqlx::Error> {
// TODO: save the price in the database
}
#[instrument]
async fn fetch_alpha_vantage(symbol: &str) -> Result<StockPrice, Box<dyn std::error::Error>> {
// TODO: Get API key from environment variable
// TODO: Build the Alpha Vantage URL with GLOBAL_QUOTE function
// TODO: Make HTTP GET request using reqwest
// TODO: Parse JSON response and extract price from "05. price" field
// TODO: Return StockPrice struct with current timestamp
todo!("Implement Alpha Vantage API call")
}
#[instrument]
async fn fetch_finnhub(symbol: &str) -> Result<StockPrice, Box<dyn std::error::Error>> {
// TODO: Get API key from environment variable
// TODO: Build the Finnhub URL (/quote endpoint)
// TODO: Make HTTP GET request with X-Finnhub-Token header
// TODO: Parse JSON response and extract "c" (current price) field
// TODO: Return StockPrice struct with current timestamp
todo!("Implement Finnhub API call")
}
#[instrument(skip(pool))]
async fn fetch_and_save_all(pool: &PgPool, symbols: &[String]) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting fetch cycle for {} symbols", symbols.len());
for symbol in symbols {
// Fetch from multiple sources in parallel
let (alpha_result, finnhub_result) = tokio::join!(
fetch_alpha_vantage(symbol),
fetch_finnhub(symbol)
);
// Save results
if let Ok(price) = alpha_result {
if let Err(e) = save_price(pool, &price).await {
error!("Failed to save alpha_vantage price: {}", e);
}
}
if let Ok(price) = finnhub_result {
if let Err(e) = save_price(pool, &price).await {
error!("Failed to save finnhub price: {}", e);
}
}
}
info!("Completed fetch cycle");
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup tracing
tracing_subscriber::fmt()
.with_env_filter("info")
.init();
info!("Starting stock price aggregator");
// Configuration
let config = Config {
symbols: vec!["AAPL".to_string(), "GOOGL".to_string(), "MSFT".to_string()],
sources: vec!["alpha_vantage".to_string(), "finnhub".to_string()],
database_url: "postgresql://user:password@localhost/stockdb".to_string(),
};
// Setup database connection pool
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&config.database_url)
.await?;
info!("Connected to database");
// Create interval for periodic fetching (every minute)
let mut fetch_interval = interval(Duration::from_secs(60));
// Main loop
loop {
tokio::select! {
_ = fetch_interval.tick() => {
if let Err(e) = fetch_and_save_all(&pool, &config.symbols).await {
error!("Error during fetch cycle: {}", e);
}
}
_ = signal::ctrl_c() => {
info!("Shutdown signal received");
break;
}
}
}
// Graceful shutdown
info!("Closing database connections...");
pool.close().await;
info!("Shutdown complete");
Ok(())
}- Exercise:
- Add a third data source (e.g., Yahoo Finance API or mock)
- Implement error recovery: if one source fails, still save data from others
- Add a query endpoint that shows the latest price for each symbol
- Implement a
--fetch-onceCLI flag that fetches once and exits (useful for testing)
Students will have a complete, production-ready application with:
- Multi-source data fetching: Pulls from Alpha Vantage, Finnhub, and custom sources
- Parallel processing: Fetches multiple stocks and sources concurrently
- Periodic execution: Runs every minute using
tokio::time::interval - Database persistence: Stores all prices with timestamps in PostgreSQL
- Graceful shutdown: Properly handles Ctrl+C and cleans up resources
- Structured logging: Uses
tracingfor observability - Error resilience: Continues operating even if individual fetches fail
# Setup database
createdb stockdb
psql stockdb < schema.sql
# Set environment variables
export DATABASE_URL="postgresql://user:password@localhost/stockdb"
export ALPHA_VANTAGE_KEY="your_key"
export FINNHUB_KEY="your_key"
# Run the application
cargo run
# In another terminal, observe logs and database
watch -n 5 'psql stockdb -c "SELECT * FROM stock_prices ORDER BY created_at DESC LIMIT 10;"'- Add a REST API to query historical prices
- Implement a web dashboard with real-time updates
- Add alerts when prices cross thresholds
- Support for cryptocurrency prices
- Rate limiting for API calls