Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions task-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "task-scheduler"
version = "0.1.0"
edition = "2024"

[dependencies]
crossbeam-channel = "0.5.15"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
59 changes: 59 additions & 0 deletions task-scheduler/examples/phase_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/// Phase 1 — Synchronous Foundation (std::sync::mpsc)
///
/// A single producer thread sends 100 Jobs down a std::sync::mpsc channel.
/// The main thread receives and prints each one.
///
/// Key things to observe:
/// 1. tx can be cloned; rx cannot — try it and watch the compile error.
/// 2. When the producer thread finishes, tx is dropped.
/// The main thread's for-loop detects the closed channel and exits cleanly.
/// 3. send() blocks if the channel is full (unbounded here, so it won't).
/// try_send() is the non-blocking alternative — swap it in to experiment.
use std::thread;
use std::{sync::mpsc, time};

use tracing::info;

use task_scheduler::job::{Job, JobType};

fn main() {
// Initialise tracing so info!() calls print to stdout.
// Set RUST_LOG=info (or debug/warn) to control verbosity.
tracing_subscriber::fmt::init();

let (tx, rx) = mpsc::channel::<Job>();

// start the producer thread
thread::spawn(move || {
for i in 1..=10 {
let job_type = match i % 3 {
0 => JobType::Metric,
1 => JobType::HTTPRequest,
_ => JobType::Reconcile,
};

let job = Job::new(i, job_type);
info!(id = job.id, payload = %job.payload, "producer: sending job");

// send() returns Err only if the receiver has been dropped.
// Here we just unwrap — if rx is gone there's nothing useful to do.
tx.send(job).unwrap();

// Small sleep so the output is readable when running live.
thread::sleep(time::Duration::from_millis(20));
}

// tx is dropped here when the closure ends.
// The receiver will see the channel close after draining remaining items.
info!("producer: done, dropping tx");
});

// main thread (consumer)
// Iterating over rx blocks until the next item arrives.
// The loop exits automatically when tx is dropped and the channel is empty.
for job in rx {
info!(id = job.id, payload = %job.payload, "consumer: received job");
}

info!("consumer: channel closed, exiting");
}
68 changes: 68 additions & 0 deletions task-scheduler/examples/phase_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/// Phase 2 — Multi-Consumer with crossbeam-channel
///
/// Swap std::sync::mpsc for crossbeam-channel.
/// Now multiple dispatcher threads can compete for jobs from the same channel, giving you a true MPMC setup
use std::thread;
use std::time;

use crossbeam_channel as crossbeam;
use tracing::info;

use task_scheduler::job::{Job, JobType};

fn main() {
tracing_subscriber::fmt::init();

let (tx, rx) = crossbeam::unbounded::<Job>();
let mut producer_handles = vec![];

// start multiple producers
for i in 1..=3 {
let tx = tx.clone();
let handle = thread::spawn(move || {
for i in 1..=10 {
let job_type = match i % 3 {
0 => JobType::Metric,
1 => JobType::HTTPRequest,
_ => JobType::Reconcile,
};

let job = Job::new(i, job_type);
info!(id = job.id, payload = %job.payload, "producer: sending job");

tx.send(job).unwrap();
thread::sleep(time::Duration::from_millis(20));
}

info!("producer-{i}: done, dropping tx");
});

producer_handles.push(handle);
}
drop(tx);

let mut consumer_handles = vec![];
// start multiple consumers
for i in 1..5 {
let rx = rx.clone();
let handle = thread::spawn(move || {
for job in rx {
info!(id = job.id, payload = %job.payload, "consumer-{i}: received job");
}
});

consumer_handles.push(handle);
}

// join on the producer threads
for handle in producer_handles {
handle.join().unwrap();
}

// join on the consumer threads
for handle in consumer_handles {
handle.join().unwrap();
}

info!("consumer channels closed, exiting");
}
25 changes: 25 additions & 0 deletions task-scheduler/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/// The type of work this job represents.
#[derive(Debug)]
pub enum JobType {
Reconcile,
HTTPRequest,
Metric,
}

/// A unit of work flowing through the pipeline. #[derive(Debug]
#[derive(Debug)]
pub struct Job {
pub id: u8,
pub job_type: JobType,
pub payload: String,
}

impl Job {
pub fn new(id: u8, job_type: JobType) -> Self {
Job {
payload: format!("{:?} job #{id}", job_type),
id,
job_type,
}
}
}
1 change: 1 addition & 0 deletions task-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod job;