Skip to content
This repository was archived by the owner on Apr 19, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ These functions are available globally but must be called within the context of
- **`broadcasst`**: Sends a message to all other processes. (GLOBAL_POOL)
- **`broadcasst_within_pool`**: Sends a message to all other processes within a specific pool.
- **`send_to`**: Sends a message to a specific process.
- **`send_random_from_pool`**: Sends a message to random process whithin pool.
- **`send_random`**: Sends a message to random process. (from GLOBAL_POOL)
- **`send_random_from_pool`**: Sends a message to random process within specific pool.
- **`schedule_timer_after`**: Schedules a timer interrupt for the current process.
- **`rank`**: Returns the ID of the currently executing process.
- **`now`**: Returns current simulation time.
Expand Down
11 changes: 0 additions & 11 deletions dscale/src/communication/mod.rs

This file was deleted.

20 changes: 9 additions & 11 deletions dscale/src/global/access.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{cell::RefCell, rc::Rc};

use crate::destination::Destination;
use crate::now;

use crate::{
Destination, Message, ProcessId,
Message, ProcessId,
actor::EventSubmitter,
debug_process,
network::NetworkActor,
Expand Down Expand Up @@ -68,14 +69,6 @@ impl SimulationAccess {
));
}

fn broadcast(&mut self, message: impl Message + 'static) {
self.scheduled_messages.push((
self.process_on_execution,
Destination::BroadcastWithinPool(GLOBAL_POOL),
Rc::new(message),
));
}

fn send_to(&mut self, to: ProcessId, message: impl Message + 'static) {
self.scheduled_messages.push((
self.process_on_execution,
Expand Down Expand Up @@ -153,7 +146,7 @@ pub fn schedule_timer_after(after: Jiffies) -> TimerId {

pub fn broadcast(message: impl Message + 'static) {
debug_process!("Access: broadcasting globally");
with_access(|access| access.broadcast(message));
with_access(|access| access.broadcast_within_pool(GLOBAL_POOL, message));
}

pub fn broadcast_within_pool(pool: &'static str, message: impl Message + 'static) {
Expand All @@ -166,8 +159,13 @@ pub fn send_to(to: ProcessId, message: impl Message + 'static) {
with_access(|access| access.send_to(to, message));
}

pub fn send_random(message: impl Message + 'static) {
debug_process!("Access: sending random in GLOBAL_POOL");
with_access(|access| access.send_random_from_pool(GLOBAL_POOL, message));
}

pub fn send_random_from_pool(pool: &'static str, message: impl Message + 'static) {
debug_process!("Access: sending random from pool: {}", pool);
debug_process!("Access: sending random from pool: {pool}");
with_access(|access| access.send_random_from_pool(pool, message));
}

Expand Down
1 change: 1 addition & 0 deletions dscale/src/global/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use access::choose_from_pool;
pub use access::list_pool;
pub use access::rank;
pub use access::schedule_timer_after;
pub use access::send_random;
pub use access::send_random_from_pool;
pub use access::send_to;

Expand Down
14 changes: 8 additions & 6 deletions dscale/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
mod actor;
mod alloc;
mod communication;
mod destination;
mod dscale_message;
pub mod global;
pub mod helpers;
pub mod message;
mod network;
mod nursery;
mod process;
mod process_handle;
mod progress;
mod random;
mod simulation;
mod simulation_builder;
pub mod time;
mod topology;

pub use communication::MessagePtr;
pub use communication::{Destination, Message};
pub use message::Message;
pub use message::MessagePtr;

pub use process::ProcessHandle;
pub use process::ProcessId;
pub use process_handle::ProcessHandle;
pub use process_handle::ProcessId;

pub use simulation::Simulation;
pub use simulation_builder::SimulationBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use std::{any::Any, cmp::Reverse, collections::BinaryHeap, rc::Rc};

use crate::{process::ProcessId, time::Jiffies};
use crate::{process_handle::ProcessId, time::Jiffies};

/// Core trait for all message types in DScale simulations.
///
Expand Down
33 changes: 20 additions & 13 deletions dscale/src/network/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::BinaryHeap;
use log::debug;

use crate::{
communication::{RoutedMessage, TimePriorityMessageQueue},
message::{RoutedMessage, TimePriorityMessageQueue},
network::LatencyQueue,
now,
time::Jiffies,
Expand Down Expand Up @@ -318,18 +318,14 @@ impl BandwidthQueue {
.pop()
.expect("Global queue should not be empty");

if self.bandwidth == usize::MAX {
self.merged_fifo_buffers.push(std::cmp::Reverse(message));
} else {
let new_total =
self.total_pased[message.step.dest] + message.step.message.virtual_size();

if new_total > now().0 * self.bandwidth {
message.arrival_time = Jiffies(new_total / self.bandwidth); // > now()
}
// Only for bounded bandwidth - unbounded case is handled directly in deliver_from_latency_queue
let new_total = self.total_pased[message.step.dest] + message.step.message.virtual_size();

self.merged_fifo_buffers.push(std::cmp::Reverse(message));
if new_total > now().0 * self.bandwidth {
message.arrival_time = Jiffies(new_total / self.bandwidth); // > now()
}

self.merged_fifo_buffers.push(std::cmp::Reverse(message));
}

fn deliver_from_buffer(&mut self) -> Option<RoutedMessage> {
Expand All @@ -343,7 +339,18 @@ impl BandwidthQueue {
}

fn deliver_from_latency_queue(&mut self) -> Option<RoutedMessage> {
self.move_message_from_latency_queue_to_buffers();
None
if self.bandwidth == usize::MAX {
// For unbounded bandwidth, deliver directly from latency queue
// (Fast-Path)
let message = self
.global_queue
.pop()
.expect("Global queue should not be empty");
Some(message)
} else {
// For bounded bandwidth, move to buffers first
self.move_message_from_latency_queue_to_buffers();
None
}
}
}
2 changes: 1 addition & 1 deletion dscale/src/network/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::rc::Rc;

use log::debug;

use crate::communication::{RoutedMessage, TimePriorityMessageQueue};
use crate::message::{RoutedMessage, TimePriorityMessageQueue};
use crate::random::Randomizer;
use crate::topology::Topology;

Expand Down
8 changes: 4 additions & 4 deletions dscale/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ pub(crate) use bandwidth::BandwidthQueue;
pub(crate) use latency::LatencyQueue;
use log::debug;

use crate::Destination;
use crate::Message;
use crate::MessagePtr;
use crate::ProcessId;
use crate::actor::EventSubmitter;
use crate::actor::SimulationActor;
use crate::communication::DScaleMessage;
use crate::communication::ProcessStep;
use crate::communication::RoutedMessage;
use crate::destination::Destination;
use crate::dscale_message::DScaleMessage;
use crate::global::configuration;
use crate::message::ProcessStep;
use crate::message::RoutedMessage;
use crate::now;
use crate::nursery::Nursery;
use crate::random::Randomizer;
Expand Down
3 changes: 2 additions & 1 deletion dscale/src/nursery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{
use log::debug;

use crate::{
ProcessId, communication::DScaleMessage, global::set_process, process::MutableProcessHandle,
ProcessId, dscale_message::DScaleMessage, global::set_process,
process_handle::MutableProcessHandle,
};

pub(crate) type HandlerMap = BTreeMap<ProcessId, MutableProcessHandle>; // btree for deterministic iterators
Expand Down
5 changes: 0 additions & 5 deletions dscale/src/process/mod.rs

This file was deleted.

2 changes: 1 addition & 1 deletion dscale/src/simulation_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use crate::{
ProcessHandle, ProcessId, Simulation,
network::BandwidthDescription,
process::MutableProcessHandle,
process_handle::MutableProcessHandle,
random::Seed,
time::Jiffies,
topology::{GLOBAL_POOL, LatencyDescription, LatencyTopology},
Expand Down
2 changes: 1 addition & 1 deletion dscale/src/time/timer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::debug;
use crate::{
ProcessId,
actor::{EventSubmitter, SimulationActor},
communication::DScaleMessage,
dscale_message::DScaleMessage,
global, now,
nursery::Nursery,
time::Jiffies,
Expand Down
4 changes: 2 additions & 2 deletions systems/examples/src/bin/multidc_pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ fn main() {
elapsed, pings, pongs,
);

assert_eq!(pings, 9380);
assert_eq!(pings, 9380);
assert_eq!(pings, 9381);
assert_eq!(pongs, 9380);
}
6 changes: 6 additions & 0 deletions systems/examples/src/bin/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ fn main() {
anykv::get::<usize>("pings"),
anykv::get::<usize>("pongs"),
);

println!(
"Steps/sec {:.2}",
(anykv::get::<usize>("pings") + anykv::get::<usize>("pongs")) as f64
/ elapsed.as_secs_f64()
);
}