diff --git a/task-scheduler/Cargo.toml b/task-scheduler/Cargo.toml new file mode 100644 index 0000000..c87717a --- /dev/null +++ b/task-scheduler/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "task-scheduler" +version = "0.1.0" +edition = "2024" + +[dependencies] +crossbeam-channel = "0.5.15" +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } diff --git a/task-scheduler/examples/phase_1.rs b/task-scheduler/examples/phase_1.rs new file mode 100644 index 0000000..69de78f --- /dev/null +++ b/task-scheduler/examples/phase_1.rs @@ -0,0 +1,59 @@ +/// Phase 1 — Synchronous Foundation (std::sync::mpsc) +/// +/// A single producer thread sends 100 Jobs down a std::sync::mpsc channel. +/// The main thread receives and prints each one. +/// +/// Key things to observe: +/// 1. tx can be cloned; rx cannot — try it and watch the compile error. +/// 2. When the producer thread finishes, tx is dropped. +/// The main thread's for-loop detects the closed channel and exits cleanly. +/// 3. send() blocks if the channel is full (unbounded here, so it won't). +/// try_send() is the non-blocking alternative — swap it in to experiment. +use std::thread; +use std::{sync::mpsc, time}; + +use tracing::info; + +use task_scheduler::job::{Job, JobType}; + +fn main() { + // Initialise tracing so info!() calls print to stdout. + // Set RUST_LOG=info (or debug/warn) to control verbosity. + tracing_subscriber::fmt::init(); + + let (tx, rx) = mpsc::channel::(); + + // start the producer thread + thread::spawn(move || { + for i in 1..=10 { + let job_type = match i % 3 { + 0 => JobType::Metric, + 1 => JobType::HTTPRequest, + _ => JobType::Reconcile, + }; + + let job = Job::new(i, job_type); + info!(id = job.id, payload = %job.payload, "producer: sending job"); + + // send() returns Err only if the receiver has been dropped. + // Here we just unwrap — if rx is gone there's nothing useful to do. + tx.send(job).unwrap(); + + // Small sleep so the output is readable when running live. + thread::sleep(time::Duration::from_millis(20)); + } + + // tx is dropped here when the closure ends. + // The receiver will see the channel close after draining remaining items. + info!("producer: done, dropping tx"); + }); + + // main thread (consumer) + // Iterating over rx blocks until the next item arrives. + // The loop exits automatically when tx is dropped and the channel is empty. + for job in rx { + info!(id = job.id, payload = %job.payload, "consumer: received job"); + } + + info!("consumer: channel closed, exiting"); +} diff --git a/task-scheduler/examples/phase_2.rs b/task-scheduler/examples/phase_2.rs new file mode 100644 index 0000000..e93476e --- /dev/null +++ b/task-scheduler/examples/phase_2.rs @@ -0,0 +1,68 @@ +/// Phase 2 — Multi-Consumer with crossbeam-channel +/// +/// Swap std::sync::mpsc for crossbeam-channel. +/// Now multiple dispatcher threads can compete for jobs from the same channel, giving you a true MPMC setup +use std::thread; +use std::time; + +use crossbeam_channel as crossbeam; +use tracing::info; + +use task_scheduler::job::{Job, JobType}; + +fn main() { + tracing_subscriber::fmt::init(); + + let (tx, rx) = crossbeam::unbounded::(); + let mut producer_handles = vec![]; + + // start multiple producers + for i in 1..=3 { + let tx = tx.clone(); + let handle = thread::spawn(move || { + for i in 1..=10 { + let job_type = match i % 3 { + 0 => JobType::Metric, + 1 => JobType::HTTPRequest, + _ => JobType::Reconcile, + }; + + let job = Job::new(i, job_type); + info!(id = job.id, payload = %job.payload, "producer: sending job"); + + tx.send(job).unwrap(); + thread::sleep(time::Duration::from_millis(20)); + } + + info!("producer-{i}: done, dropping tx"); + }); + + producer_handles.push(handle); + } + drop(tx); + + let mut consumer_handles = vec![]; + // start multiple consumers + for i in 1..5 { + let rx = rx.clone(); + let handle = thread::spawn(move || { + for job in rx { + info!(id = job.id, payload = %job.payload, "consumer-{i}: received job"); + } + }); + + consumer_handles.push(handle); + } + + // join on the producer threads + for handle in producer_handles { + handle.join().unwrap(); + } + + // join on the consumer threads + for handle in consumer_handles { + handle.join().unwrap(); + } + + info!("consumer channels closed, exiting"); +} diff --git a/task-scheduler/src/job.rs b/task-scheduler/src/job.rs new file mode 100644 index 0000000..c3946ae --- /dev/null +++ b/task-scheduler/src/job.rs @@ -0,0 +1,25 @@ +/// The type of work this job represents. +#[derive(Debug)] +pub enum JobType { + Reconcile, + HTTPRequest, + Metric, +} + +/// A unit of work flowing through the pipeline. #[derive(Debug] +#[derive(Debug)] +pub struct Job { + pub id: u8, + pub job_type: JobType, + pub payload: String, +} + +impl Job { + pub fn new(id: u8, job_type: JobType) -> Self { + Job { + payload: format!("{:?} job #{id}", job_type), + id, + job_type, + } + } +} diff --git a/task-scheduler/src/lib.rs b/task-scheduler/src/lib.rs new file mode 100644 index 0000000..80daa3e --- /dev/null +++ b/task-scheduler/src/lib.rs @@ -0,0 +1 @@ +pub mod job;