From ac0ee47b926e0220eba3c1ce8a0a14db63df2b43 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Fri, 1 May 2026 12:04:04 -0500 Subject: [PATCH 1/6] remove generic actor cache. this was done for two reasons: 1) the vm and agent actor caches have slightly different requirements, specifically around keys (actorid vs vmid) 2) the scheduler needs direct control of the cache so the cache can be updated in response to messages it forwards. being possibly up to 1 second out of date in this data could cause problems. In theory the scheduler could update the generic cache, but I don't think using a generic cache if we need to modify it directly anyway really makes sense. Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 345 +++++++++++++++++++------- odorobo/src/utils/actor_cache.rs | 174 ------------- odorobo/src/utils/mod.rs | 1 - 3 files changed, 250 insertions(+), 270 deletions(-) delete mode 100644 odorobo/src/utils/actor_cache.rs diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 11d6a62..df83778 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -1,12 +1,13 @@ use std::ops::ControlFlow; +use std::sync::Arc; +use std::time::Duration; -use async_trait::async_trait; use kameo::prelude::*; use libp2p::futures::TryStreamExt; +use tracing::trace; +use ulid::Ulid; use crate::actors::agent_actor::AgentActor; use crate::ch_driver::actor::VMActor; -use crate::utils::actor_cache::ActorCache; -use crate::utils::actor_cache::ActorCacheUpdater; use crate::utils::actor_names::VM; use crate::messages::vm::*; use crate::messages::agent::*; @@ -17,12 +18,42 @@ use stable_eyre::eyre::OptionExt; use stable_eyre::{Report, Result, eyre::eyre}; use tracing::info_span; use tracing::{info, warn}; +use dashmap::DashMap; +use tokio::task::JoinHandle; + + +#[derive(Debug, Clone)] +pub struct CachedAgentActor { + pub actor_ref: RemoteActorRef, + pub metadata: AgentStatus, +} + +#[derive(Debug, Clone)] +pub struct CachedVMActor { + pub actor_ref: RemoteActorRef, + pub metadata: GetVMInfoReply, +} + #[derive(RemoteActor)] pub struct SchedulerActor { - pub agent_actor_cache: ActorCache, - pub vm_actor_cache: ActorCache + pub agent_data_cache: Arc>, + pub agent_keepalive_tasks: Arc>>, + + // todo: we might need a better way to store this. + // we are likely going to want to store vms even if we don't know their actorid (ex: actor hasn't been started or is shutdown) + // but we also might want ot be able to store them without a ulid, possibly + // so we might need a vec of vms and then to just store maps/indexes of actorid and ulid to vector index + // and then like a freelist or something. + // i dont really love that option either though cause it feels overkill. + // maybe we sure just be using a proper database entirely? + // idk. will figure it out later. + pub vm_actorid_ulid_map: Arc>, + pub vm_data_cache: Arc>, + pub vm_keepalive_tasks: Arc>>, + + pub cache_actor_finder: Option>, } // todo: this might need to be a runtime thing but this makes it easy to write for now and could easily be switched out later. @@ -31,20 +62,191 @@ static VCPU_OVERPROVISIONMENT_DENOMINATOR: u32 = 1; impl SchedulerActor { - async fn lookup_by_actor_id( + async fn lookup_agent_by_actor_id( &mut self, actor_id: &ActorId, ) -> Option> { - self.agent_actor_cache.data_cache.get(actor_id).map(|data| data.actor_ref.clone()) + self.agent_data_cache.get(actor_id).map(|data| data.actor_ref.clone()) } - async fn lookup_by_hostname( + async fn lookup_agent_by_hostname( &mut self, hostname: &str, ) -> Option> { - self.agent_actor_cache.data_cache.iter().find(|data| data.metadata.hostname == hostname).map(|data| data.actor_ref.clone()) + self.agent_data_cache.iter().find(|data| data.metadata.hostname == hostname).map(|data| data.actor_ref.clone()) + } + + + // someone should likely give caleb a firm talking to about code duplication due to this section, but things are just different enough that trying to make them one function requires usage of a lot of generics which feels even worse. so i dont know what to do. cappy please fix. i hate this. + async fn vm_actor_finder( + parent_actor_ref: RemoteActorRef, + vm_actorid_ulid_map: Arc>, + data_cache: Arc>, + keepalive_tasks: Arc>> + ) -> Result<(), Report> { + + while let Some(vm_actor) = RemoteActorRef::::lookup_all(VM).try_next().await? { + if !keepalive_tasks.contains_key(&vm_actor.id()) { + trace!(?vm_actor, "starting vm_updater_task"); + + parent_actor_ref.link_remote(&vm_actor).await?; + + let vm_actor_id = vm_actor.id(); + + let vm_actorid_ulid_map_clone = Arc::clone(&vm_actorid_ulid_map); + let data_cache_clone = Arc::clone(&data_cache); + let updater_task = tokio::spawn(async move { + Self::vm_updater_task( + vm_actor, + vm_actorid_ulid_map_clone, + data_cache_clone + ).await; + }); + + keepalive_tasks.insert( + vm_actor_id, + updater_task + ); + } + } + + Ok(()) + + } + + async fn vm_updater_task( + actor_ref: RemoteActorRef, + vm_actorid_ulid_map: Arc>, + data_cache: Arc>, + ) { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut fails = 0; + loop { + if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { + let vmid = metadata.vmid; + + vm_actorid_ulid_map.insert(actor_ref.id(), vmid); // should we be doing this on every loop? idk. but we at least need to do it on the first iteration given we don't know the mapping before that + + data_cache.insert( + vmid, + CachedVMActor { + actor_ref: actor_ref.clone(), + metadata: metadata + } + ); + + fails = 0; + } else { + fails += 1; + } + + if fails > 5 { + // todo: possibly better error handling + warn!(?actor_ref, "can no longer reach agent actor.") + } + + interval.tick().await; + } + } + + async fn agent_actor_finder( + parent_actor_ref: RemoteActorRef, + data_cache: Arc>, + keepalive_tasks: Arc>>, + ) -> Result<(), Report> { + info!("running agent_actor_finder"); + while let Some(agent_actor) = RemoteActorRef::::lookup_all(AGENT).try_next().await? { + if !keepalive_tasks.contains_key(&agent_actor.id()) { + trace!(?agent_actor, "starting agent_updater_task"); + + parent_actor_ref.link_remote(&agent_actor).await?; + + let agent_actor_id = agent_actor.id(); + + let data_cache_clone = Arc::clone(&data_cache); + let updater_task = tokio::spawn(async move { + Self::agent_updater_task( + agent_actor, + data_cache_clone + ).await; + }); + + keepalive_tasks.insert( + agent_actor_id, + updater_task + ); + } + } + + Ok(()) + } + + async fn agent_updater_task( + actor_ref: RemoteActorRef, + data_cache: Arc>, + ) { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut fails = 0; + loop { + if let Ok(metadata) = actor_ref.ask(&GetAgentStatus).await { + data_cache.insert( + actor_ref.id(), + CachedAgentActor { + actor_ref: actor_ref.clone(), + metadata: metadata + } + ); + + fails = 0; + } else { + fails += 1; + } + + if fails > 5 { + // todo: possibly better error handling + warn!(?actor_ref, "can no longer reach agent actor.") + } + + interval.tick().await; + } + } + + fn start_actor_finder(&mut self, actor_ref: RemoteActorRef) { + let agent_data_cache_arc_clone = Arc::clone(&self.agent_data_cache); + let agent_keepalive_tasks_arc_clone = Arc::clone(&self.agent_keepalive_tasks); + + let vm_actorid_ulid_map_arc_clone = Arc::clone(&self.vm_actorid_ulid_map); + let vm_data_cache_arc_clone = Arc::clone(&self.vm_data_cache); + let vm_keepalive_tasks_arc_clone = Arc::clone(&self.vm_keepalive_tasks); + + self.cache_actor_finder = Some( + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + let vm_join_handle = Self::vm_actor_finder( + actor_ref.clone(), + Arc::clone(&vm_actorid_ulid_map_arc_clone), + Arc::clone(&vm_data_cache_arc_clone), + Arc::clone(&vm_keepalive_tasks_arc_clone), + ); + + let agent_join_handle = Self::agent_actor_finder( + actor_ref.clone(), + Arc::clone(&agent_data_cache_arc_clone), + Arc::clone(&agent_keepalive_tasks_arc_clone), + ); + + // intentionally ignoring results because we want to keep finding actors even if an attempt fails + let _ = tokio::join!(vm_join_handle, agent_join_handle); + + interval.tick().await; + } + }) + ); + } + /// current scheduling algo info: /// this is vaguely based on https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ /// when a vm is attempted to be scheduled, we loop through every agent and score it based on some rules @@ -62,13 +264,12 @@ impl SchedulerActor { // todo: this arguably could be done as map-reduce. is that better? let span = info_span!("schedule_agent"); span.in_scope(|| { - for agent in self.agent_actor_cache.data_cache.iter() { + for agent in self.agent_data_cache.iter() { let mut agent_score = 0u32; let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; - if agent.metadata.used_vcpus >= agent_max_vcpus { continue; } @@ -114,80 +315,6 @@ impl SchedulerActor { -#[derive(Copy, Clone)] -struct AgentActorCacheUpdater; - -#[derive(Debug, Clone)] -pub struct CachedAgentActor { - pub actor_ref: RemoteActorRef, - pub metadata: AgentStatus, -} - -#[async_trait] -impl ActorCacheUpdater for AgentActorCacheUpdater { - async fn get_actor_refs(&self) -> Result>> { - let mut agent_actors_lookup = RemoteActorRef::::lookup_all(AGENT); - let mut actor_ref_vec = Vec::new(); - - while let Some(agent_actor) = agent_actors_lookup.try_next().await? { - actor_ref_vec.push(agent_actor); - } - - Ok(actor_ref_vec) - } - - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result { - let output_actor_ref = match previous_value { - Some(value) => value.actor_ref, - _ => actor_ref.clone(), - }; - - Ok(CachedAgentActor { - actor_ref: output_actor_ref, - metadata: actor_ref.ask(&GetAgentStatus).await? - }) - } -} - - -// todo: this code is really bad, and we should not have effectively two copies of ths same thing. -#[derive(Copy, Clone)] -struct VMActorCacheUpdater; - -#[derive(Debug, Clone)] -pub struct CachedVMActor { - pub actor_ref: RemoteActorRef, - pub metadata: GetVMInfoReply, -} - -#[async_trait] -impl ActorCacheUpdater for VMActorCacheUpdater { - async fn get_actor_refs(&self) -> Result>> { - let mut agent_actors_lookup = RemoteActorRef::::lookup_all(VM); - let mut actor_ref_vec = Vec::new(); - - while let Some(agent_actor) = agent_actors_lookup.try_next().await? { - actor_ref_vec.push(agent_actor); - } - - Ok(actor_ref_vec) - } - - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result { - let output_actor_ref = match previous_value { - Some(value) => value.actor_ref, - _ => actor_ref.clone(), - }; - - Ok(CachedVMActor { - actor_ref: output_actor_ref, - metadata: actor_ref.ask(&GetVMInfo {vmid: None}).await? - }) - } -} - - - impl Actor for SchedulerActor { type Args = (); type Error = Report; @@ -197,29 +324,57 @@ impl Actor for SchedulerActor { info!(?peer_id, "Scheduler Actor started!"); - Ok(Self { - agent_actor_cache: ActorCache::new(actor_ref.clone(), AgentActorCacheUpdater)?, - vm_actor_cache: ActorCache::new(actor_ref, VMActorCacheUpdater)? - }) + let mut scheduler_actor = SchedulerActor { + agent_data_cache: Arc::new(DashMap::new()), + agent_keepalive_tasks: Arc::new(DashMap::new()), + vm_actorid_ulid_map: Arc::new(DashMap::new()), + vm_data_cache: Arc::new(DashMap::new()), + vm_keepalive_tasks: Arc::new(DashMap::new()), + cache_actor_finder: None, + }; + + scheduler_actor.start_actor_finder(actor_ref.into_remote_ref().await); + + Ok(scheduler_actor) } async fn on_link_died( &mut self, actor_ref: WeakActorRef, - id: ActorId, + actor_id: ActorId, reason: ActorStopReason, ) -> Result, Self::Error> { - warn!("Linked actor {id:?} died with reason {reason:?}"); + warn!(?actor_id, ?reason, "Linked actor died"); + // check that scheduler actor is still alive. let Some(_) = actor_ref.upgrade() else { return Ok(ControlFlow::Break(ActorStopReason::Killed)); }; - self.agent_actor_cache.on_link_died(id).await; - self.vm_actor_cache.on_link_died(id).await; - info!(vm_actor_cache=?self.agent_actor_cache.data_cache, agent_actor_cache=?self.vm_actor_cache.data_cache, "data caches post actor removal"); + if let Some((_, keepalive_task)) = self.agent_keepalive_tasks.remove(&actor_id) { + trace!(?actor_id, "Aborting agent keepalive task"); + keepalive_task.abort(); + }; + + + self.agent_data_cache.remove(&actor_id); + + // todo: attempt vm migration or restart or whatever on agent death. + + if let Some((_, keepalive_task)) = self.vm_keepalive_tasks.remove(&actor_id) { + trace!(?actor_id, "Aborting vm keepalive task"); + keepalive_task.abort(); + }; + + if let Some((_, vmid)) = self.vm_actorid_ulid_map.remove(&actor_id) { + // todo: we likely should keep a copy of the VirtualMachine manifest in the cache. + // instead of removing the vm entirely, we should just modify the status to shutdown or crashed or something. + trace!(?actor_id, ?vmid, "Removing vm from vm_data_cache"); + self.vm_data_cache.remove(&vmid); + } + Ok(ControlFlow::Continue(())) } @@ -300,7 +455,7 @@ impl Message for SchedulerActor { ) -> Self::Reply { let mut vms = Vec::new(); - for agent in self.agent_actor_cache.data_cache.iter() { + for agent in self.agent_data_cache.iter() { vms.extend_from_slice(agent.metadata.vms.as_slice()); } diff --git a/odorobo/src/utils/actor_cache.rs b/odorobo/src/utils/actor_cache.rs deleted file mode 100644 index eb4b45b..0000000 --- a/odorobo/src/utils/actor_cache.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::{marker::PhantomData, sync::Arc, time::Duration}; - -use async_trait::async_trait; -use dashmap::DashMap; -use kameo::{prelude::*}; -use tokio::task::JoinHandle; -use stable_eyre::{Report, Result}; -use tracing::{info, instrument, trace}; - -use std::fmt; - -// TODO: refactor to use derive macro, but I (caleb) don't know how to write a derive macro. -// -// The best way to write this would be using a derive similar to kameo -// so you would create a struct with #[derive(ActorCache)]. -// Then you would set the types and then write get_actor_refs and on_update. -// This would combine everything into one struct and make it a lot easier to work with. -// -// similar to: https://github.com/tqwewe/kameo/blob/1d498c0566b613b9afe6d54965c4b191c84432e0/src/actor.rs#L122 -// -// -// other things we might want: -// - a default get_actor_refs that just finds all actor_refs with a specific actor string. -// - change get_actor_refs to use an iterator - - -#[async_trait] -pub trait ActorCacheUpdater: Sync + Send + Copy + 'static { - // todo: this could probably be better if it was an iterator, but I am lazy and don't want to right now. - async fn get_actor_refs(&self) -> Result>>; - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result; -} - - -#[derive(Debug)] -pub struct ActorCache { - parent_actor_ref: ActorRef, - pub data_cache: Arc>, - keepalive_tasks: Arc>>, - actor_finder: Option>, - - child_actor_type: PhantomData -} - -// todo: impl Drop to automatically kill all the keepalive_tasks and the actor_finder task. - -impl ActorCache { - pub fn new( - parent_actor_ref: ActorRef, - updater: impl ActorCacheUpdater - ) -> Result { - - let data_cache = Arc::new(DashMap::new()); - let keepalive_tasks = Arc::new(DashMap::new()); - - let actor_cache = ActorCache { - parent_actor_ref: parent_actor_ref.clone(), - data_cache: data_cache, - keepalive_tasks: keepalive_tasks, - actor_finder: None, - - child_actor_type: PhantomData - }; - - actor_cache.start_actor_finder(parent_actor_ref, updater); - - Ok(actor_cache) - } - - /// run this function inside of the on_link_died of the ParentActor - pub async fn on_link_died( - &self, - id: ActorId - ) { - info!("removing agent actor from cache {id:?}"); - - if let Some(actor_keepalive_task) = self.keepalive_tasks.remove(&id) { - trace!("Aborting keepalive task for agent {id:?}"); - actor_keepalive_task.1.abort(); - }; - - self.data_cache.remove(&id); - } - - fn start_actor_finder( - &self, - parent_actor_ref: ActorRef, - updater: impl ActorCacheUpdater - ) { - let keepalive_tasks_clone = Arc::clone(&self.keepalive_tasks); - let data_cache_clone = Arc::clone(&self.data_cache); - - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - let _ = Self::actor_finder( - parent_actor_ref.clone(), - Arc::clone(&keepalive_tasks_clone), - Arc::clone(&data_cache_clone), - updater - ).await; - - interval.tick().await; - } - }); - } - - async fn actor_finder( - parent_actor_ref: ActorRef, - keepalive_tasks: Arc>>, - data_cache: Arc>, - updater: impl ActorCacheUpdater - ) -> Result<(), Report> { - let actor_refs = updater.get_actor_refs().await?; - - info!(?actor_refs, "running actor_finder"); - - for actor_ref in actor_refs { - if !keepalive_tasks.contains_key(&actor_ref.id()) { - trace!(?actor_ref, "starting updater_task"); - - parent_actor_ref.link_remote(&actor_ref).await?; - - let actor_ref_clone = actor_ref.clone(); - let data_cache_clone = Arc::clone(&data_cache); - let updater_task = tokio::spawn(async move { - Self::updater_task( - actor_ref_clone, - data_cache_clone, - updater - ).await; - }); - - keepalive_tasks.insert( - actor_ref.id(), - updater_task - ); - } - } - - Ok(()) - } - - #[instrument(skip_all)] - async fn updater_task( - actor_ref: RemoteActorRef, - data_cache: Arc>, - updater: impl ActorCacheUpdater - ) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - - loop { - let actor_id = actor_ref.id(); - - let mut previous_value_option = None; - - - - if let Some(data_ref) = data_cache.get(&actor_id) { - previous_value_option = Some(data_ref.clone()); - } - - - if let Ok(update) = updater.on_update(&actor_ref, previous_value_option).await { - data_cache.insert( - actor_id, - update.clone() - ); - } - - interval.tick().await; - } - } -} diff --git a/odorobo/src/utils/mod.rs b/odorobo/src/utils/mod.rs index 01c7125..819d3ef 100644 --- a/odorobo/src/utils/mod.rs +++ b/odorobo/src/utils/mod.rs @@ -1,5 +1,4 @@ pub mod actor_names; -pub mod actor_cache; use aide::OperationIo; use stable_eyre::{Result, Report}; From 09c41f063c2e0a7f56852175c3ecbc7e8af5a565 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Fri, 1 May 2026 12:31:31 -0500 Subject: [PATCH 2/6] improve scheduling algorithm to use estimated future values. Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index df83778..1ae3119 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -252,39 +252,39 @@ impl SchedulerActor { /// when a vm is attempted to be scheduled, we loop through every agent and score it based on some rules /// there are hard rules that will simply throw out an agent entirely. /// otherwise, we take whatever the best agent we can find is. - /// - /// additionally, because caleb is way too performance brained, he used integer math for the entire scoring algorithm just so we didnt have to convert to floats. async fn schedule_agent( &mut self, msg: &CreateVM ) -> Result, Report> { let mut best_agent = None; - let mut best_agent_score = 0u32; + let mut best_agent_score = 0.0f32; // todo: this arguably could be done as map-reduce. is that better? let span = info_span!("schedule_agent"); - span.in_scope(|| { + span.in_scope(|| { for agent in self.agent_data_cache.iter() { - let mut agent_score = 0u32; + let mut agent_score = 0.0f32; let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; + // todo: do we care about VMData.max_vcpus? + let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; - - if agent.metadata.used_vcpus >= agent_max_vcpus { + if agent_used_vcpus >= agent_max_vcpus { continue; } - agent_score += (agent_max_vcpus - agent.metadata.used_vcpus) * 1024 / agent_max_vcpus; + agent_score += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. let agent_max_ram = agent.metadata.ram; + let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; - if agent.metadata.used_ram >= agent_max_ram { + if agent_used_ram >= agent_max_ram { continue; } - agent_score += ((agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) * 1024 / agent_max_ram.as_u64()) as u32; + agent_score += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ From d0dd375d5fa19350a8e47fdba8eaac37462d7128 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Fri, 1 May 2026 13:36:11 -0500 Subject: [PATCH 3/6] committing before lunch so i dont have code just sitting on my machine Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 36 +++++++++++++++++---------- odorobo/src/ch_driver/actor.rs | 12 +++++---- odorobo/src/messages/vm.rs | 3 +-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 1ae3119..78569b2 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -387,20 +387,26 @@ impl Message for SchedulerActor { type Reply = Result; async fn handle(&mut self, msg: CreateVM, _ctx: &mut Context) -> Self::Reply { - loop { - let target_agent = self.schedule_agent(&msg).await?; - - match target_agent.ask(&msg).await { - Ok(reply) => { - return Ok(reply) - }, - Err(err) => { - warn!( - "CreateVM forwarding failed, trying again: {err}" - ); - } - } + let target_agent = self.schedule_agent(&msg).await?; + + // we add to cache first, because we want to make sure future requests assume this vm exists. if the message fails, we clean it up afterward. + + // massive problem for me to fix later: the metadata can be auto updated. so this change won't be persisted and we need to continue to know this vm is likely scheduled on this device. + // we likely need to create a new vec of likey? vms or something and then match against that too during scheduling for example. + + if let Some(mut cached_data) = self.agent_data_cache.get_mut(&target_agent.id()) { + cached_data.metadata.vms.push(msg.vmid); + } else { + return Err(eyre!("target agent is not in data cache")) + } + + let reply = target_agent.ask(&msg).await; + + if reply.is_err() { + // remove from caches } + + Ok(reply?) } } @@ -416,6 +422,7 @@ impl Message for SchedulerActor { tracing::trace!(?vm, "DeleteVM"); if let Some(vm) = vm { vm.tell(&msg).send()?; + // todo: update cache Ok(DeleteVMReply) } else { Err(eyre!("VM not found")) @@ -435,6 +442,9 @@ impl Message for SchedulerActor { tracing::trace!(?vm, "ShutdownVM"); if let Some(vm) = vm { vm.tell(&msg).send()?; + + // todo: update cache + Ok(ShutdownVMReply) } else { Err(eyre!("VM not found")) diff --git a/odorobo/src/ch_driver/actor.rs b/odorobo/src/ch_driver/actor.rs index 414b00b..8889b6d 100644 --- a/odorobo/src/ch_driver/actor.rs +++ b/odorobo/src/ch_driver/actor.rs @@ -28,16 +28,17 @@ pub struct VMActor { /// path to the Cloud Hypervisor socket, in /run/odorobo/vms//ch.sock pub vm_instance: VMInstance, pub migration_state: Option, + pub manifest: VirtualMachine } impl Actor for VMActor { - // tuple of VM ID and optional config - type Args = (ulid::Ulid, Option); + // tuple of VM ID and manifest + type Args = (ulid::Ulid, VirtualMachine); type Error = Report; #[tracing::instrument(skip_all)] - async fn on_start((vmid, vm_config): Self::Args, actor_ref: ActorRef) -> Result { - let mut vminstance = VMInstance::spawn(&vmid.to_string(), vm_config.map(VmConfig::from), None).await?; + async fn on_start((vmid, vm_manifest): Self::Args, actor_ref: ActorRef) -> Result { + let mut vminstance = VMInstance::spawn(&vmid.to_string(), Some(VmConfig::from(vm_manifest.clone())), None).await?; // Take the child process out so we can watch for unexpected death. // destroy() handles a missing child_process gracefully. @@ -69,6 +70,7 @@ impl Actor for VMActor { vmid, vm_instance: vminstance, migration_state: None, + manifest: vm_manifest }) } @@ -157,7 +159,7 @@ impl Message for VMActor { ) -> Self::Reply { GetVMInfoReply { vmid: self.vmid, - config: self.vm_instance.vm_config.clone(), + config: self.manifest.clone(), // we likely dont want to send the entire manifest on every update, but some of this data is required and this is easier for now. } } } diff --git a/odorobo/src/messages/vm.rs b/odorobo/src/messages/vm.rs index 512a793..4e979c8 100644 --- a/odorobo/src/messages/vm.rs +++ b/odorobo/src/messages/vm.rs @@ -1,5 +1,4 @@ //! VM-related messages -use cloud_hypervisor_client::models::VmConfig; use kameo::prelude::*; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -99,5 +98,5 @@ pub struct GetVMInfo { #[derive(Serialize, Deserialize, Reply, Debug, Clone)] pub struct GetVMInfoReply { pub vmid: Ulid, - pub config: Option, + pub config: VirtualMachine, } From 444950203e219b418f5178955d16eaf0f1ec13e8 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Fri, 1 May 2026 17:58:06 -0500 Subject: [PATCH 4/6] continue to work on scheduler actor caches and placement groups Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 66 ++++++++++++++++++++------- odorobo/src/ch_driver/actor.rs | 6 +-- odorobo/src/messages/vm.rs | 3 +- 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 78569b2..6ff9c3c 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -26,11 +26,14 @@ use tokio::task::JoinHandle; pub struct CachedAgentActor { pub actor_ref: RemoteActorRef, pub metadata: AgentStatus, + /// this is a list of all VMs that may be on an agent. it is used for rules such as affinity to make sure we don't schedule things in ways that arent allowed + /// We don't know for a fact these VMs are scheduled due to latency and boot up delay, but they may be scheduled. + pub extended_vm_list: Vec, } #[derive(Debug, Clone)] pub struct CachedVMActor { - pub actor_ref: RemoteActorRef, + pub actor_ref: Option>, pub metadata: GetVMInfoReply, } @@ -49,6 +52,9 @@ pub struct SchedulerActor { // i dont really love that option either though cause it feels overkill. // maybe we sure just be using a proper database entirely? // idk. will figure it out later. + // + // new related problem: i just realized vmids/ulids and actorids dont have to be unique. + // if a vm is migrating from one actor to another, there might be two actors with the same vmid. pub vm_actorid_ulid_map: Arc>, pub vm_data_cache: Arc>, pub vm_keepalive_tasks: Arc>>, @@ -130,7 +136,7 @@ impl SchedulerActor { data_cache.insert( vmid, CachedVMActor { - actor_ref: actor_ref.clone(), + actor_ref: Some(actor_ref.clone()), metadata: metadata } ); @@ -189,13 +195,25 @@ impl SchedulerActor { let mut fails = 0; loop { if let Ok(metadata) = actor_ref.ask(&GetAgentStatus).await { - data_cache.insert( - actor_ref.id(), - CachedAgentActor { - actor_ref: actor_ref.clone(), - metadata: metadata - } - ); + if data_cache.contains_key(&actor_ref.id()) { + data_cache.alter( + &actor_ref.id(), + |_, mut v| { + v.metadata = metadata; + + v + } + ); + } else { + data_cache.insert( + actor_ref.id(), + CachedAgentActor { + actor_ref: actor_ref.clone(), + metadata, + extended_vm_list: vec![] + } + ); + } fails = 0; } else { @@ -292,7 +310,7 @@ impl SchedulerActor { // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. - // they would have their agent score set to 1, so they can be scheduled to if there is no other avaliable agents. + // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other avaliable agents. // rough pseudo code to implement this: // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { // agent_score = 1; @@ -390,21 +408,37 @@ impl Message for SchedulerActor { let target_agent = self.schedule_agent(&msg).await?; // we add to cache first, because we want to make sure future requests assume this vm exists. if the message fails, we clean it up afterward. - - // massive problem for me to fix later: the metadata can be auto updated. so this change won't be persisted and we need to continue to know this vm is likely scheduled on this device. - // we likely need to create a new vec of likey? vms or something and then match against that too during scheduling for example. - if let Some(mut cached_data) = self.agent_data_cache.get_mut(&target_agent.id()) { - cached_data.metadata.vms.push(msg.vmid); + cached_data.extended_vm_list.push(msg.vmid); } else { return Err(eyre!("target agent is not in data cache")) } + + self.vm_data_cache.insert( + msg.vmid, + CachedVMActor { + actor_ref: None, + metadata: GetVMInfoReply { + vmid: msg.vmid, + config: Some(msg.config.clone()) + } + }); + + let reply = target_agent.ask(&msg).await; + + // remove from caches if we fail to schedule if reply.is_err() { - // remove from caches + self.agent_data_cache.alter(&target_agent.id(), |_,mut v| { + v.extended_vm_list.retain(|&vmid| vmid != msg.vmid); + + v + }); + self.vm_data_cache.remove(&msg.vmid); } + Ok(reply?) } diff --git a/odorobo/src/ch_driver/actor.rs b/odorobo/src/ch_driver/actor.rs index 8889b6d..b3a9f92 100644 --- a/odorobo/src/ch_driver/actor.rs +++ b/odorobo/src/ch_driver/actor.rs @@ -28,17 +28,17 @@ pub struct VMActor { /// path to the Cloud Hypervisor socket, in /run/odorobo/vms//ch.sock pub vm_instance: VMInstance, pub migration_state: Option, - pub manifest: VirtualMachine + pub manifest: Option } impl Actor for VMActor { // tuple of VM ID and manifest - type Args = (ulid::Ulid, VirtualMachine); + type Args = (ulid::Ulid, Option); type Error = Report; #[tracing::instrument(skip_all)] async fn on_start((vmid, vm_manifest): Self::Args, actor_ref: ActorRef) -> Result { - let mut vminstance = VMInstance::spawn(&vmid.to_string(), Some(VmConfig::from(vm_manifest.clone())), None).await?; + let mut vminstance = VMInstance::spawn(&vmid.to_string(), vm_manifest.clone().map(VmConfig::from), None).await?; // Take the child process out so we can watch for unexpected death. // destroy() handles a missing child_process gracefully. diff --git a/odorobo/src/messages/vm.rs b/odorobo/src/messages/vm.rs index 4e979c8..8318a5a 100644 --- a/odorobo/src/messages/vm.rs +++ b/odorobo/src/messages/vm.rs @@ -1,4 +1,5 @@ //! VM-related messages +use cloud_hypervisor_client::models::VmConfig; use kameo::prelude::*; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -98,5 +99,5 @@ pub struct GetVMInfo { #[derive(Serialize, Deserialize, Reply, Debug, Clone)] pub struct GetVMInfoReply { pub vmid: Ulid, - pub config: VirtualMachine, + pub config: Option, } From 7a635181a56282c35707f2f70ef62d2e6d8fee66 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Tue, 5 May 2026 14:23:07 -0500 Subject: [PATCH 5/6] continue to work on scheduler actor (still not finished) Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 236 ++++++++++++++++++-------- odorobo/src/types.rs | 12 +- 2 files changed, 168 insertions(+), 80 deletions(-) diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 6ff9c3c..9ed9536 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -1,13 +1,17 @@ use std::ops::ControlFlow; use std::sync::Arc; use std::time::Duration; +use std::cmp::Ordering; +use ahash::AHashSet; +use dashmap::mapref::multiple::RefMulti; use kameo::prelude::*; use libp2p::futures::TryStreamExt; use tracing::trace; use ulid::Ulid; use crate::actors::agent_actor::AgentActor; use crate::ch_driver::actor::VMActor; +use crate::types::AffinityStrictness; use crate::utils::actor_names::VM; use crate::messages::vm::*; use crate::messages::agent::*; @@ -26,9 +30,9 @@ use tokio::task::JoinHandle; pub struct CachedAgentActor { pub actor_ref: RemoteActorRef, pub metadata: AgentStatus, - /// this is a list of all VMs that may be on an agent. it is used for rules such as affinity to make sure we don't schedule things in ways that arent allowed + /// this is a set of all VMs that may be on an agent. it is used for rules such as affinity to make sure we don't schedule things in ways that arent allowed /// We don't know for a fact these VMs are scheduled due to latency and boot up delay, but they may be scheduled. - pub extended_vm_list: Vec, + pub extended_vm_set: AHashSet, } #[derive(Debug, Clone)] @@ -38,23 +42,35 @@ pub struct CachedVMActor { } - +// todo: i dont like the way this cache is setup. I think we may need to change it later, but it is hard to figure out what the optimal solution is without doing it at least once. +// especially when we haven't fully made decisions about some other things. #[derive(RemoteActor)] pub struct SchedulerActor { pub agent_data_cache: Arc>, pub agent_keepalive_tasks: Arc>>, // todo: we might need a better way to store this. - // we are likely going to want to store vms even if we don't know their actorid (ex: actor hasn't been started or is shutdown) - // but we also might want ot be able to store them without a ulid, possibly + // we 100% need a way to store vms even if we don't know their actorid (ex: actor hasn't been started or is shutdown) + // we also might want to be able to store them without a ulid, possibly // so we might need a vec of vms and then to just store maps/indexes of actorid and ulid to vector index // and then like a freelist or something. // i dont really love that option either though cause it feels overkill. // maybe we sure just be using a proper database entirely? // idk. will figure it out later. // - // new related problem: i just realized vmids/ulids and actorids dont have to be unique. + // new related problem: i just realized vmid, actorid pairs dont have to be unique. // if a vm is migrating from one actor to another, there might be two actors with the same vmid. + // + // additional context (05/05/2026): we almost may need a way to store them without a ulid, due to how CH migration works. + // the question becomes if we want to abstract CH migration away entirely from the scheduler. + // we could also possibly ignore it for the non-HA scheduler. + // I (caleb) want to ask cappy (and possibly Lea) about these problems. + // + // the best solution for at least some of this is almost certainly having an external reliable DB (such as etcd) to store some of these things permanently. + // we will need that specifically for what VMs are supposed to be running, because if a large percentage of the cluster goes down, including the manager, we need a way to recover. + // and i dont think leaving that on dashboard which could have high latency is a good idea. + // alternatively we could have the other manager nodes try to keep track of that data, but i think we are going to run into issues with keeping the state consistent between all nodes. + // we may need to make some architecture designs about db consistency vs uptime vs speed in that situation, and im not doing that on my own. pub vm_actorid_ulid_map: Arc>, pub vm_data_cache: Arc>, pub vm_keepalive_tasks: Arc>>, @@ -128,7 +144,7 @@ impl SchedulerActor { let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut fails = 0; loop { - if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { + if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { // todo: replace with kameo stream let vmid = metadata.vmid; vm_actorid_ulid_map.insert(actor_ref.id(), vmid); // should we be doing this on every loop? idk. but we at least need to do it on the first iteration given we don't know the mapping before that @@ -194,13 +210,15 @@ impl SchedulerActor { let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut fails = 0; loop { - if let Ok(metadata) = actor_ref.ask(&GetAgentStatus).await { + if let Ok(metadata) = actor_ref.ask(&GetAgentStatus).await { // todo: replace with kameo stream if data_cache.contains_key(&actor_ref.id()) { data_cache.alter( &actor_ref.id(), |_, mut v| { v.metadata = metadata; + v.extended_vm_set.extend(v.metadata.vms.iter()); + v } ); @@ -210,7 +228,7 @@ impl SchedulerActor { CachedAgentActor { actor_ref: actor_ref.clone(), metadata, - extended_vm_list: vec![] + extended_vm_set: AHashSet::new() } ); } @@ -261,74 +279,143 @@ impl SchedulerActor { } }) ); - } - - /// current scheduling algo info: - /// this is vaguely based on https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ - /// when a vm is attempted to be scheduled, we loop through every agent and score it based on some rules - /// there are hard rules that will simply throw out an agent entirely. - /// otherwise, we take whatever the best agent we can find is. - async fn schedule_agent( + /// Determine the best agent to schedule a specific VM creation request to. + /// + /// Rough explanation of the algorithm: + /// Loop through every known agent. + /// Go through a set of rules to determine if the VM can be scheduled on this agent at all, and an affinity score and a general score. + /// + /// Based on these scores, pick the best agent. + /// First the affinity score is used, because these are things the customer specifically wanted. + /// If the affinity score is tied, we use the general score as a tie breaker. + /// The general score uses things like resource utilization to not over load any specific agent. + /// + /// + /// Affinity rules are roughly based on https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + /// + /// todo: + /// - the cache likely needs to be updated automatically when a new vm is scheduled for info like used resources, because otherwise we have to deal with latency on that data we are using + /// and then if someone tries to schedule lets say 10 VMs in a batch, we could end up scheduling them all to the same agent because the metadata hasn't updated. + /// - there are a few solutions for this but they all kinda suck, mostly due to also making sure we deal with latency properly. I am ignoring the issue for now. + fn schedule_agent( &mut self, msg: &CreateVM ) -> Result, Report> { - let mut best_agent = None; - let mut best_agent_score = 0.0f32; + // todo: this could likely be better idiomatic rust. + // I suspect there is a map-reduce operation that does the exact scoring thing I am trying to do. + // I also assume there is a better function for the and_then + let best_agent = self.agent_data_cache.iter() + .map(|agent| (agent.actor_ref.clone(), score_agent(msg, &agent))) + .reduce(|best, new| if new.1 > best.1 { new } else { best }) + .and_then(|best| if best.1 == AgentScore::REJECTED { None } else { Some(best.0) }); - // todo: this arguably could be done as map-reduce. is that better? - let span = info_span!("schedule_agent"); - span.in_scope(|| { - for agent in self.agent_data_cache.iter() { - let mut agent_score = 0.0f32; + best_agent.ok_or_eyre("No valid agents found.") + } +} - let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; - // todo: do we care about VMData.max_vcpus? - let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; +#[derive(Debug, Clone, Copy, PartialEq)] +struct AgentScore { + general: f32, + affinity: i64 +} - if agent_used_vcpus >= agent_max_vcpus { - continue; - } +impl AgentScore { + pub const REJECTED: Self = Self { + general: f32::NEG_INFINITY, + affinity: i64::MIN + }; +} - agent_score += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; +impl Default for AgentScore { + fn default() -> Self { + Self { + general: 0.0, + affinity: 0 + } + } +} +impl PartialOrd for AgentScore { + fn partial_cmp(&self, other: &Self) -> Option { + let affinity_cmp = self.affinity.cmp(&other.affinity); - // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. - let agent_max_ram = agent.metadata.ram; - let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; + if affinity_cmp != Ordering::Equal { + return Some(affinity_cmp); + } - if agent_used_ram >= agent_max_ram { - continue; - } + self.general.partial_cmp(&other.general) + } +} + + +fn score_agent( + msg: &CreateVM, + agent: &RefMulti<'_, ActorId, CachedAgentActor> +) -> AgentScore { + let mut score = AgentScore::default(); - agent_score += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; + let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; + // todo: do we care about VMData.max_vcpus? + let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; + if agent_used_vcpus >= agent_max_vcpus { + return AgentScore::REJECTED; + } + + score.general += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; - // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. + let agent_max_ram = agent.metadata.ram; + let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; + if agent_used_ram >= agent_max_ram { + return AgentScore::REJECTED; + } - // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. - // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other avaliable agents. - // rough pseudo code to implement this: - // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { - // agent_score = 1; - // } + score.general += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; + // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ - info!(agent=?agent.value(), score=agent_score); - if agent_score > best_agent_score { - best_agent = Some(agent.actor_ref.clone()); - best_agent_score = agent_score; + if let Some(affinity_rules) = &msg.config.affinity { + for rule in affinity_rules { + let mut meets_requirements = false; + + for requirement in &rule.requirements { + let requirement_outcome = true; // todo: set this based on requirement + + if requirement_outcome { + meets_requirements = true; + break; } } - }); - best_agent.ok_or_eyre("No valid agents found.") + let follows_rule = meets_requirements ^ rule.inverse; + + match (rule.strictness, follows_rule) { + (AffinityStrictness::Required, false) => return AgentScore::REJECTED, + (AffinityStrictness::Required, true) => {}, // specifically do nothing + (AffinityStrictness::Preferred { weight }, follows_rule) => { + score.affinity += follows_rule as i64 * weight; + }, + } + } } + + + + // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. + // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other available agents. + // rough pseudo code to implement this: + // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { + // agent_score = 1; + // } + + score } @@ -376,10 +463,8 @@ impl Actor for SchedulerActor { keepalive_task.abort(); }; - self.agent_data_cache.remove(&actor_id); - // todo: attempt vm migration or restart or whatever on agent death. if let Some((_, keepalive_task)) = self.vm_keepalive_tasks.remove(&actor_id) { trace!(?actor_id, "Aborting vm keepalive task"); @@ -387,13 +472,24 @@ impl Actor for SchedulerActor { }; if let Some((_, vmid)) = self.vm_actorid_ulid_map.remove(&actor_id) { + // todo: this solution definitely isn't optimal. we likely should be storing which agent actor, this specific actor id is scheduled on and specifically removing it from that one. + // especially because things get iffy with migration, but im ignoring that for the moment. + for mut agent in self.agent_data_cache.iter_mut() { + agent.extended_vm_set.remove(&vmid); + } + // todo: we likely should keep a copy of the VirtualMachine manifest in the cache. // instead of removing the vm entirely, we should just modify the status to shutdown or crashed or something. + // + // another potential issue is that the link dying doesn't guarantee that the vm is dead. if there is a networking partition, things get iffy. trace!(?actor_id, ?vmid, "Removing vm from vm_data_cache"); self.vm_data_cache.remove(&vmid); } + // todo: attempt vm restarts if necessary. + + Ok(ControlFlow::Continue(())) } } @@ -405,40 +501,40 @@ impl Message for SchedulerActor { type Reply = Result; async fn handle(&mut self, msg: CreateVM, _ctx: &mut Context) -> Self::Reply { - let target_agent = self.schedule_agent(&msg).await?; + let target_agent = self.schedule_agent(&msg)?; // we add to cache first, because we want to make sure future requests assume this vm exists. if the message fails, we clean it up afterward. if let Some(mut cached_data) = self.agent_data_cache.get_mut(&target_agent.id()) { - cached_data.extended_vm_list.push(msg.vmid); + cached_data.extended_vm_set.insert(msg.vmid); } else { return Err(eyre!("target agent is not in data cache")) } - + self.vm_data_cache.insert( msg.vmid, - CachedVMActor { - actor_ref: None, - metadata: GetVMInfoReply { - vmid: msg.vmid, + CachedVMActor { + actor_ref: None, + metadata: GetVMInfoReply { + vmid: msg.vmid, config: Some(msg.config.clone()) } }); - - + + let reply = target_agent.ask(&msg).await; - + // remove from caches if we fail to schedule if reply.is_err() { self.agent_data_cache.alter(&target_agent.id(), |_,mut v| { - v.extended_vm_list.retain(|&vmid| vmid != msg.vmid); - + v.extended_vm_set.remove(&msg.vmid); + v }); self.vm_data_cache.remove(&msg.vmid); } - + Ok(reply?) } @@ -454,9 +550,8 @@ impl Message for SchedulerActor { ) -> Self::Reply { let vm = RemoteActorRef::::lookup(vm_actor_id(msg.vmid)).await?; tracing::trace!(?vm, "DeleteVM"); - if let Some(vm) = vm { + if let Some(vm) = vm { // don't update cache, because we rely on link dying and updater task to remove from cache once the VM is fully down. vm.tell(&msg).send()?; - // todo: update cache Ok(DeleteVMReply) } else { Err(eyre!("VM not found")) @@ -474,11 +569,8 @@ impl Message for SchedulerActor { ) -> Self::Reply { let vm = RemoteActorRef::::lookup(vm_actor_id(msg.vmid)).await?; tracing::trace!(?vm, "ShutdownVM"); - if let Some(vm) = vm { + if let Some(vm) = vm { // don't update cache, because we rely on link dying and updater task to remove from cache once the VM is fully down. vm.tell(&msg).send()?; - - // todo: update cache - Ok(ShutdownVMReply) } else { Err(eyre!("VM not found")) diff --git a/odorobo/src/types.rs b/odorobo/src/types.rs index b526cf8..902708a 100644 --- a/odorobo/src/types.rs +++ b/odorobo/src/types.rs @@ -175,12 +175,14 @@ pub struct VirtualMachine { pub struct AffinityRule { pub strictness: AffinityStrictness, pub affinity_type: AffinityType, - pub direction: AffinityDirection, + /// if true, the outcome of the requirements is inverted + #[serde(default)] + pub inverse: bool, /// ORed together pub requirements: Vec } -#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone, Copy)] pub enum AffinityStrictness { Required, Preferred { weight: i64 } @@ -192,12 +194,6 @@ pub enum AffinityType { Agent } -#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] -pub enum AffinityDirection { - Normal, - Anti -} - #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct AffinityRequirement { pub key: String, From 11186cd562d7ea659623b6f550e77f7fb9329d67 Mon Sep 17 00:00:00 2001 From: Caleb Jones Date: Tue, 5 May 2026 15:43:11 -0500 Subject: [PATCH 6/6] more work on scheduler affinity stuff. still not finished. Signed-off-by: Caleb Jones --- odorobo/src/actors/scheduler_actor.rs | 154 ++++++++++++++------------ odorobo/src/config.rs | 7 +- odorobo/src/types.rs | 5 +- 3 files changed, 89 insertions(+), 77 deletions(-) diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 9ed9536..91a373f 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -12,6 +12,7 @@ use ulid::Ulid; use crate::actors::agent_actor::AgentActor; use crate::ch_driver::actor::VMActor; use crate::types::AffinityStrictness; +use crate::types::AffinityType; use crate::utils::actor_names::VM; use crate::messages::vm::*; use crate::messages::agent::*; @@ -144,7 +145,7 @@ impl SchedulerActor { let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut fails = 0; loop { - if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { // todo: replace with kameo stream + if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { // todo: replace with kameo stream, (only send full data once, then send changes from there on) let vmid = metadata.vmid; vm_actorid_ulid_map.insert(actor_ref.id(), vmid); // should we be doing this on every loop? idk. but we at least need to do it on the first iteration given we don't know the mapping before that @@ -307,12 +308,93 @@ impl SchedulerActor { // I suspect there is a map-reduce operation that does the exact scoring thing I am trying to do. // I also assume there is a better function for the and_then let best_agent = self.agent_data_cache.iter() - .map(|agent| (agent.actor_ref.clone(), score_agent(msg, &agent))) + .map(|agent| (agent.actor_ref.clone(), self.score_agent(msg, &agent))) .reduce(|best, new| if new.1 > best.1 { new } else { best }) .and_then(|best| if best.1 == AgentScore::REJECTED { None } else { Some(best.0) }); best_agent.ok_or_eyre("No valid agents found.") } + + // this function intentionally only checks against the cache. this has some positives and negatives: + // positive: it will never trigger any network requests so its very fast, and having to do network requests for scoring whenever we want to schedule a vm is likely a bad idea + // negative: it technically has a delayed view of the cluster, meaning that some things that happened in the future, may not exist yet. so we need to be careful about how this is done so affinity rules are not accidentally broken. mostly this means, if we do anything that could affect the outcome of an affinity rule (ex: network request to an agent), we need to update the cache, before we do the action. + fn score_agent( + &self, + msg: &CreateVM, + agent: &RefMulti<'_, ActorId, CachedAgentActor> + ) -> AgentScore { + let mut score = AgentScore::default(); + + let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; + // todo: do we care about VMData.max_vcpus? + let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; + + if agent_used_vcpus >= agent_max_vcpus { + return AgentScore::REJECTED; + } + + score.general += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; + + + // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. + let agent_max_ram = agent.metadata.ram; + let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; + + if agent_used_ram >= agent_max_ram { + return AgentScore::REJECTED; + } + + score.general += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; + + + // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + + + if let Some(affinity_rules) = &msg.config.affinity { + for rule in affinity_rules { + let mut meets_requirements = false; + + let lhs_values: Vec = Vec::with_capacity(1); + + match &rule.affinity_type { + AffinityType::VirtualMachine(zone) => todo!(), + AffinityType::Agent => lhs_values.push(agent.metadata.), // todo: this should be object metadata, but I just realized that field isnt included in the data we have in the cache + } + + // in theory the next bit of this could would loop through all the lhs values and all the requirements and then actually do the computation, but I got busy with other work. + + for requirement in &rule.requirements { + let requirement_outcome = true; // todo: set this based on requirement + + if requirement_outcome { + meets_requirements = true; + break; + } + } + + let follows_rule = meets_requirements ^ rule.inverse; + + match (rule.strictness, follows_rule) { + (AffinityStrictness::Required, false) => return AgentScore::REJECTED, + (AffinityStrictness::Required, true) => {}, // specifically do nothing + (AffinityStrictness::Preferred { weight }, follows_rule) => { + score.affinity += follows_rule as i64 * weight; + }, + } + } + } + + + + // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. + // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other available agents. + // rough pseudo code to implement this: + // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { + // agent_score = 1; + // } + + score + } } #[derive(Debug, Clone, Copy, PartialEq)] @@ -350,74 +432,6 @@ impl PartialOrd for AgentScore { } -fn score_agent( - msg: &CreateVM, - agent: &RefMulti<'_, ActorId, CachedAgentActor> -) -> AgentScore { - let mut score = AgentScore::default(); - - let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; - // todo: do we care about VMData.max_vcpus? - let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; - - if agent_used_vcpus >= agent_max_vcpus { - return AgentScore::REJECTED; - } - - score.general += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; - - - // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. - let agent_max_ram = agent.metadata.ram; - let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; - - if agent_used_ram >= agent_max_ram { - return AgentScore::REJECTED; - } - - score.general += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; - - - // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ - - - if let Some(affinity_rules) = &msg.config.affinity { - for rule in affinity_rules { - let mut meets_requirements = false; - - for requirement in &rule.requirements { - let requirement_outcome = true; // todo: set this based on requirement - - if requirement_outcome { - meets_requirements = true; - break; - } - } - - let follows_rule = meets_requirements ^ rule.inverse; - - match (rule.strictness, follows_rule) { - (AffinityStrictness::Required, false) => return AgentScore::REJECTED, - (AffinityStrictness::Required, true) => {}, // specifically do nothing - (AffinityStrictness::Preferred { weight }, follows_rule) => { - score.affinity += follows_rule as i64 * weight; - }, - } - } - } - - - - // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. - // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other available agents. - // rough pseudo code to implement this: - // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { - // agent_score = 1; - // } - - score -} - impl Actor for SchedulerActor { diff --git a/odorobo/src/config.rs b/odorobo/src/config.rs index 8b261ea..acb5287 100644 --- a/odorobo/src/config.rs +++ b/odorobo/src/config.rs @@ -144,13 +144,10 @@ pub struct Config { /// The number of VCPUs reserved for the agent. Defaults to 2. #[serde(default = "default_reserved_vcpus")] pub reserved_vcpus: u32, - /// this is just arbitrary data that will be shown but does no config - /// Arbitrary labels that can be used + + /// Arbitrary data that the infra team can set for notes for themselves. odorobo does not directly use these, but does include them in that can be used #[serde(default)] pub labels: AHashMap, - /// Arbitrary annotations that can be used - #[serde(default)] - pub annotations: AHashMap, #[serde(default)] pub network: NetworkConfig, diff --git a/odorobo/src/types.rs b/odorobo/src/types.rs index 902708a..95e53bb 100644 --- a/odorobo/src/types.rs +++ b/odorobo/src/types.rs @@ -190,10 +190,12 @@ pub enum AffinityStrictness { #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum AffinityType { - VirtualMachine, + VirtualMachine(Zone), Agent } +pub type Zone = String; + #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct AffinityRequirement { pub key: String, @@ -208,7 +210,6 @@ pub enum MetadataTable { Annotation } -// todo: possibly replace with std::ops #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum Operator { In,