From 83c8f52afb8b580f780a836a53230591ac8dde56 Mon Sep 17 00:00:00 2001 From: Gaurav Gahlot Date: Fri, 20 Feb 2026 12:07:50 +0100 Subject: [PATCH 1/3] cargo init Signed-off-by: Gaurav Gahlot --- task-scheduler/Cargo.toml | 6 ++++++ task-scheduler/src/main.rs | 3 +++ 2 files changed, 9 insertions(+) create mode 100644 task-scheduler/Cargo.toml create mode 100644 task-scheduler/src/main.rs diff --git a/task-scheduler/Cargo.toml b/task-scheduler/Cargo.toml new file mode 100644 index 0000000..5af27ce --- /dev/null +++ b/task-scheduler/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "task-scheduler" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/task-scheduler/src/main.rs b/task-scheduler/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/task-scheduler/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} From c10018c1df02cbe117d1e7ee0136f0b2c3480d56 Mon Sep 17 00:00:00 2001 From: Gaurav Gahlot Date: Fri, 20 Feb 2026 16:03:27 +0100 Subject: [PATCH 2/3] phase_1: Synchronous Foundation Signed-off-by: Gaurav Gahlot --- task-scheduler/Cargo.toml | 2 + task-scheduler/examples/phase_1.rs | 59 ++++++++++++++++++++++++++++++ task-scheduler/src/job.rs | 25 +++++++++++++ task-scheduler/src/lib.rs | 1 + task-scheduler/src/main.rs | 3 -- 5 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 task-scheduler/examples/phase_1.rs create mode 100644 task-scheduler/src/job.rs create mode 100644 task-scheduler/src/lib.rs delete mode 100644 task-scheduler/src/main.rs diff --git a/task-scheduler/Cargo.toml b/task-scheduler/Cargo.toml index 5af27ce..737af3e 100644 --- a/task-scheduler/Cargo.toml +++ b/task-scheduler/Cargo.toml @@ -4,3 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } diff --git a/task-scheduler/examples/phase_1.rs b/task-scheduler/examples/phase_1.rs new file mode 100644 index 0000000..69de78f --- /dev/null +++ b/task-scheduler/examples/phase_1.rs @@ -0,0 +1,59 @@ +/// Phase 1 — Synchronous Foundation (std::sync::mpsc) +/// +/// A single producer thread sends 100 Jobs down a std::sync::mpsc channel. +/// The main thread receives and prints each one. +/// +/// Key things to observe: +/// 1. tx can be cloned; rx cannot — try it and watch the compile error. +/// 2. When the producer thread finishes, tx is dropped. +/// The main thread's for-loop detects the closed channel and exits cleanly. +/// 3. send() blocks if the channel is full (unbounded here, so it won't). +/// try_send() is the non-blocking alternative — swap it in to experiment. +use std::thread; +use std::{sync::mpsc, time}; + +use tracing::info; + +use task_scheduler::job::{Job, JobType}; + +fn main() { + // Initialise tracing so info!() calls print to stdout. + // Set RUST_LOG=info (or debug/warn) to control verbosity. + tracing_subscriber::fmt::init(); + + let (tx, rx) = mpsc::channel::(); + + // start the producer thread + thread::spawn(move || { + for i in 1..=10 { + let job_type = match i % 3 { + 0 => JobType::Metric, + 1 => JobType::HTTPRequest, + _ => JobType::Reconcile, + }; + + let job = Job::new(i, job_type); + info!(id = job.id, payload = %job.payload, "producer: sending job"); + + // send() returns Err only if the receiver has been dropped. + // Here we just unwrap — if rx is gone there's nothing useful to do. + tx.send(job).unwrap(); + + // Small sleep so the output is readable when running live. + thread::sleep(time::Duration::from_millis(20)); + } + + // tx is dropped here when the closure ends. + // The receiver will see the channel close after draining remaining items. + info!("producer: done, dropping tx"); + }); + + // main thread (consumer) + // Iterating over rx blocks until the next item arrives. + // The loop exits automatically when tx is dropped and the channel is empty. + for job in rx { + info!(id = job.id, payload = %job.payload, "consumer: received job"); + } + + info!("consumer: channel closed, exiting"); +} diff --git a/task-scheduler/src/job.rs b/task-scheduler/src/job.rs new file mode 100644 index 0000000..c3946ae --- /dev/null +++ b/task-scheduler/src/job.rs @@ -0,0 +1,25 @@ +/// The type of work this job represents. +#[derive(Debug)] +pub enum JobType { + Reconcile, + HTTPRequest, + Metric, +} + +/// A unit of work flowing through the pipeline. #[derive(Debug] +#[derive(Debug)] +pub struct Job { + pub id: u8, + pub job_type: JobType, + pub payload: String, +} + +impl Job { + pub fn new(id: u8, job_type: JobType) -> Self { + Job { + payload: format!("{:?} job #{id}", job_type), + id, + job_type, + } + } +} diff --git a/task-scheduler/src/lib.rs b/task-scheduler/src/lib.rs new file mode 100644 index 0000000..80daa3e --- /dev/null +++ b/task-scheduler/src/lib.rs @@ -0,0 +1 @@ +pub mod job; diff --git a/task-scheduler/src/main.rs b/task-scheduler/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/task-scheduler/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} From 44e7f48828ee49ae18be193545a4fe8cf94b2bcd Mon Sep 17 00:00:00 2001 From: Gaurav Gahlot Date: Fri, 20 Feb 2026 23:38:22 +0100 Subject: [PATCH 3/3] phase_2: multi-consumer Signed-off-by: Gaurav Gahlot --- task-scheduler/Cargo.toml | 1 + task-scheduler/examples/phase_2.rs | 68 ++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 task-scheduler/examples/phase_2.rs diff --git a/task-scheduler/Cargo.toml b/task-scheduler/Cargo.toml index 737af3e..c87717a 100644 --- a/task-scheduler/Cargo.toml +++ b/task-scheduler/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] +crossbeam-channel = "0.5.15" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } diff --git a/task-scheduler/examples/phase_2.rs b/task-scheduler/examples/phase_2.rs new file mode 100644 index 0000000..e93476e --- /dev/null +++ b/task-scheduler/examples/phase_2.rs @@ -0,0 +1,68 @@ +/// Phase 2 — Multi-Consumer with crossbeam-channel +/// +/// Swap std::sync::mpsc for crossbeam-channel. +/// Now multiple dispatcher threads can compete for jobs from the same channel, giving you a true MPMC setup +use std::thread; +use std::time; + +use crossbeam_channel as crossbeam; +use tracing::info; + +use task_scheduler::job::{Job, JobType}; + +fn main() { + tracing_subscriber::fmt::init(); + + let (tx, rx) = crossbeam::unbounded::(); + let mut producer_handles = vec![]; + + // start multiple producers + for i in 1..=3 { + let tx = tx.clone(); + let handle = thread::spawn(move || { + for i in 1..=10 { + let job_type = match i % 3 { + 0 => JobType::Metric, + 1 => JobType::HTTPRequest, + _ => JobType::Reconcile, + }; + + let job = Job::new(i, job_type); + info!(id = job.id, payload = %job.payload, "producer: sending job"); + + tx.send(job).unwrap(); + thread::sleep(time::Duration::from_millis(20)); + } + + info!("producer-{i}: done, dropping tx"); + }); + + producer_handles.push(handle); + } + drop(tx); + + let mut consumer_handles = vec![]; + // start multiple consumers + for i in 1..5 { + let rx = rx.clone(); + let handle = thread::spawn(move || { + for job in rx { + info!(id = job.id, payload = %job.payload, "consumer-{i}: received job"); + } + }); + + consumer_handles.push(handle); + } + + // join on the producer threads + for handle in producer_handles { + handle.join().unwrap(); + } + + // join on the consumer threads + for handle in consumer_handles { + handle.join().unwrap(); + } + + info!("consumer channels closed, exiting"); +}