diff --git a/odorobo/src/actors/scheduler_actor.rs b/odorobo/src/actors/scheduler_actor.rs index 11d6a62..91a373f 100644 --- a/odorobo/src/actors/scheduler_actor.rs +++ b/odorobo/src/actors/scheduler_actor.rs @@ -1,12 +1,18 @@ use std::ops::ControlFlow; +use std::sync::Arc; +use std::time::Duration; +use std::cmp::Ordering; -use async_trait::async_trait; +use ahash::AHashSet; +use dashmap::mapref::multiple::RefMulti; use kameo::prelude::*; use libp2p::futures::TryStreamExt; +use tracing::trace; +use ulid::Ulid; use crate::actors::agent_actor::AgentActor; use crate::ch_driver::actor::VMActor; -use crate::utils::actor_cache::ActorCache; -use crate::utils::actor_cache::ActorCacheUpdater; +use crate::types::AffinityStrictness; +use crate::types::AffinityType; use crate::utils::actor_names::VM; use crate::messages::vm::*; use crate::messages::agent::*; @@ -17,12 +23,60 @@ use stable_eyre::eyre::OptionExt; use stable_eyre::{Report, Result, eyre::eyre}; use tracing::info_span; use tracing::{info, warn}; +use dashmap::DashMap; +use tokio::task::JoinHandle; +#[derive(Debug, Clone)] +pub struct CachedAgentActor { + pub actor_ref: RemoteActorRef, + pub metadata: AgentStatus, + /// this is a set of all VMs that may be on an agent. it is used for rules such as affinity to make sure we don't schedule things in ways that arent allowed + /// We don't know for a fact these VMs are scheduled due to latency and boot up delay, but they may be scheduled. + pub extended_vm_set: AHashSet, +} + +#[derive(Debug, Clone)] +pub struct CachedVMActor { + pub actor_ref: Option>, + pub metadata: GetVMInfoReply, +} + + +// todo: i dont like the way this cache is setup. I think we may need to change it later, but it is hard to figure out what the optimal solution is without doing it at least once. +// especially when we haven't fully made decisions about some other things. #[derive(RemoteActor)] pub struct SchedulerActor { - pub agent_actor_cache: ActorCache, - pub vm_actor_cache: ActorCache + pub agent_data_cache: Arc>, + pub agent_keepalive_tasks: Arc>>, + + // todo: we might need a better way to store this. + // we 100% need a way to store vms even if we don't know their actorid (ex: actor hasn't been started or is shutdown) + // we also might want to be able to store them without a ulid, possibly + // so we might need a vec of vms and then to just store maps/indexes of actorid and ulid to vector index + // and then like a freelist or something. + // i dont really love that option either though cause it feels overkill. + // maybe we sure just be using a proper database entirely? + // idk. will figure it out later. + // + // new related problem: i just realized vmid, actorid pairs dont have to be unique. + // if a vm is migrating from one actor to another, there might be two actors with the same vmid. + // + // additional context (05/05/2026): we almost may need a way to store them without a ulid, due to how CH migration works. + // the question becomes if we want to abstract CH migration away entirely from the scheduler. + // we could also possibly ignore it for the non-HA scheduler. + // I (caleb) want to ask cappy (and possibly Lea) about these problems. + // + // the best solution for at least some of this is almost certainly having an external reliable DB (such as etcd) to store some of these things permanently. + // we will need that specifically for what VMs are supposed to be running, because if a large percentage of the cluster goes down, including the manager, we need a way to recover. + // and i dont think leaving that on dashboard which could have high latency is a good idea. + // alternatively we could have the other manager nodes try to keep track of that data, but i think we are going to run into issues with keeping the state consistent between all nodes. + // we may need to make some architecture designs about db consistency vs uptime vs speed in that situation, and im not doing that on my own. + pub vm_actorid_ulid_map: Arc>, + pub vm_data_cache: Arc>, + pub vm_keepalive_tasks: Arc>>, + + pub cache_actor_finder: Option>, } // todo: this might need to be a runtime thing but this makes it easy to write for now and could easily be switched out later. @@ -31,163 +85,355 @@ static VCPU_OVERPROVISIONMENT_DENOMINATOR: u32 = 1; impl SchedulerActor { - async fn lookup_by_actor_id( + async fn lookup_agent_by_actor_id( &mut self, actor_id: &ActorId, ) -> Option> { - self.agent_actor_cache.data_cache.get(actor_id).map(|data| data.actor_ref.clone()) + self.agent_data_cache.get(actor_id).map(|data| data.actor_ref.clone()) } - async fn lookup_by_hostname( + async fn lookup_agent_by_hostname( &mut self, hostname: &str, ) -> Option> { - self.agent_actor_cache.data_cache.iter().find(|data| data.metadata.hostname == hostname).map(|data| data.actor_ref.clone()) + self.agent_data_cache.iter().find(|data| data.metadata.hostname == hostname).map(|data| data.actor_ref.clone()) } - /// current scheduling algo info: - /// this is vaguely based on https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ - /// when a vm is attempted to be scheduled, we loop through every agent and score it based on some rules - /// there are hard rules that will simply throw out an agent entirely. - /// otherwise, we take whatever the best agent we can find is. - /// - /// additionally, because caleb is way too performance brained, he used integer math for the entire scoring algorithm just so we didnt have to convert to floats. - async fn schedule_agent( - &mut self, - msg: &CreateVM - ) -> Result, Report> { - let mut best_agent = None; - let mut best_agent_score = 0u32; - - // todo: this arguably could be done as map-reduce. is that better? - let span = info_span!("schedule_agent"); - span.in_scope(|| { - for agent in self.agent_actor_cache.data_cache.iter() { - let mut agent_score = 0u32; - let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; + // someone should likely give caleb a firm talking to about code duplication due to this section, but things are just different enough that trying to make them one function requires usage of a lot of generics which feels even worse. so i dont know what to do. cappy please fix. i hate this. + async fn vm_actor_finder( + parent_actor_ref: RemoteActorRef, + vm_actorid_ulid_map: Arc>, + data_cache: Arc>, + keepalive_tasks: Arc>> + ) -> Result<(), Report> { + + while let Some(vm_actor) = RemoteActorRef::::lookup_all(VM).try_next().await? { + if !keepalive_tasks.contains_key(&vm_actor.id()) { + trace!(?vm_actor, "starting vm_updater_task"); + + parent_actor_ref.link_remote(&vm_actor).await?; + + let vm_actor_id = vm_actor.id(); + + let vm_actorid_ulid_map_clone = Arc::clone(&vm_actorid_ulid_map); + let data_cache_clone = Arc::clone(&data_cache); + let updater_task = tokio::spawn(async move { + Self::vm_updater_task( + vm_actor, + vm_actorid_ulid_map_clone, + data_cache_clone + ).await; + }); + + keepalive_tasks.insert( + vm_actor_id, + updater_task + ); + } + } + Ok(()) + } - if agent.metadata.used_vcpus >= agent_max_vcpus { - continue; - } + async fn vm_updater_task( + actor_ref: RemoteActorRef, + vm_actorid_ulid_map: Arc>, + data_cache: Arc>, + ) { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut fails = 0; + loop { + if let Ok(metadata) = actor_ref.ask(&GetVMInfo {vmid: None}).await { // todo: replace with kameo stream, (only send full data once, then send changes from there on) + let vmid = metadata.vmid; + + vm_actorid_ulid_map.insert(actor_ref.id(), vmid); // should we be doing this on every loop? idk. but we at least need to do it on the first iteration given we don't know the mapping before that + + data_cache.insert( + vmid, + CachedVMActor { + actor_ref: Some(actor_ref.clone()), + metadata: metadata + } + ); + + fails = 0; + } else { + fails += 1; + } - agent_score += (agent_max_vcpus - agent.metadata.used_vcpus) * 1024 / agent_max_vcpus; + if fails > 5 { + // todo: possibly better error handling + warn!(?actor_ref, "can no longer reach agent actor.") + } + interval.tick().await; + } + } - // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. - let agent_max_ram = agent.metadata.ram; + async fn agent_actor_finder( + parent_actor_ref: RemoteActorRef, + data_cache: Arc>, + keepalive_tasks: Arc>>, + ) -> Result<(), Report> { + info!("running agent_actor_finder"); + while let Some(agent_actor) = RemoteActorRef::::lookup_all(AGENT).try_next().await? { + if !keepalive_tasks.contains_key(&agent_actor.id()) { + trace!(?agent_actor, "starting agent_updater_task"); + + parent_actor_ref.link_remote(&agent_actor).await?; + + let agent_actor_id = agent_actor.id(); + + let data_cache_clone = Arc::clone(&data_cache); + let updater_task = tokio::spawn(async move { + Self::agent_updater_task( + agent_actor, + data_cache_clone + ).await; + }); + + keepalive_tasks.insert( + agent_actor_id, + updater_task + ); + } + } - if agent.metadata.used_ram >= agent_max_ram { - continue; - } + Ok(()) + } - agent_score += ((agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) * 1024 / agent_max_ram.as_u64()) as u32; + async fn agent_updater_task( + actor_ref: RemoteActorRef, + data_cache: Arc>, + ) { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut fails = 0; + loop { + if let Ok(metadata) = actor_ref.ask(&GetAgentStatus).await { // todo: replace with kameo stream + if data_cache.contains_key(&actor_ref.id()) { + data_cache.alter( + &actor_ref.id(), + |_, mut v| { + v.metadata = metadata; + v.extended_vm_set.extend(v.metadata.vms.iter()); - // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + v + } + ); + } else { + data_cache.insert( + actor_ref.id(), + CachedAgentActor { + actor_ref: actor_ref.clone(), + metadata, + extended_vm_set: AHashSet::new() + } + ); + } + fails = 0; + } else { + fails += 1; + } + if fails > 5 { + // todo: possibly better error handling + warn!(?actor_ref, "can no longer reach agent actor.") + } - // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. - // they would have their agent score set to 1, so they can be scheduled to if there is no other avaliable agents. - // rough pseudo code to implement this: - // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { - // agent_score = 1; - // } + interval.tick().await; + } + } + fn start_actor_finder(&mut self, actor_ref: RemoteActorRef) { + let agent_data_cache_arc_clone = Arc::clone(&self.agent_data_cache); + let agent_keepalive_tasks_arc_clone = Arc::clone(&self.agent_keepalive_tasks); + + let vm_actorid_ulid_map_arc_clone = Arc::clone(&self.vm_actorid_ulid_map); + let vm_data_cache_arc_clone = Arc::clone(&self.vm_data_cache); + let vm_keepalive_tasks_arc_clone = Arc::clone(&self.vm_keepalive_tasks); + + self.cache_actor_finder = Some( + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + let vm_join_handle = Self::vm_actor_finder( + actor_ref.clone(), + Arc::clone(&vm_actorid_ulid_map_arc_clone), + Arc::clone(&vm_data_cache_arc_clone), + Arc::clone(&vm_keepalive_tasks_arc_clone), + ); + let agent_join_handle = Self::agent_actor_finder( + actor_ref.clone(), + Arc::clone(&agent_data_cache_arc_clone), + Arc::clone(&agent_keepalive_tasks_arc_clone), + ); - info!(agent=?agent.value(), score=agent_score); + // intentionally ignoring results because we want to keep finding actors even if an attempt fails + let _ = tokio::join!(vm_join_handle, agent_join_handle); - if agent_score > best_agent_score { - best_agent = Some(agent.actor_ref.clone()); - best_agent_score = agent_score; + interval.tick().await; } - } - }); + }) + ); + } + + /// Determine the best agent to schedule a specific VM creation request to. + /// + /// Rough explanation of the algorithm: + /// Loop through every known agent. + /// Go through a set of rules to determine if the VM can be scheduled on this agent at all, and an affinity score and a general score. + /// + /// Based on these scores, pick the best agent. + /// First the affinity score is used, because these are things the customer specifically wanted. + /// If the affinity score is tied, we use the general score as a tie breaker. + /// The general score uses things like resource utilization to not over load any specific agent. + /// + /// + /// Affinity rules are roughly based on https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + /// + /// todo: + /// - the cache likely needs to be updated automatically when a new vm is scheduled for info like used resources, because otherwise we have to deal with latency on that data we are using + /// and then if someone tries to schedule lets say 10 VMs in a batch, we could end up scheduling them all to the same agent because the metadata hasn't updated. + /// - there are a few solutions for this but they all kinda suck, mostly due to also making sure we deal with latency properly. I am ignoring the issue for now. + fn schedule_agent( + &mut self, + msg: &CreateVM + ) -> Result, Report> { + // todo: this could likely be better idiomatic rust. + // I suspect there is a map-reduce operation that does the exact scoring thing I am trying to do. + // I also assume there is a better function for the and_then + let best_agent = self.agent_data_cache.iter() + .map(|agent| (agent.actor_ref.clone(), self.score_agent(msg, &agent))) + .reduce(|best, new| if new.1 > best.1 { new } else { best }) + .and_then(|best| if best.1 == AgentScore::REJECTED { None } else { Some(best.0) }); best_agent.ok_or_eyre("No valid agents found.") } -} + // this function intentionally only checks against the cache. this has some positives and negatives: + // positive: it will never trigger any network requests so its very fast, and having to do network requests for scoring whenever we want to schedule a vm is likely a bad idea + // negative: it technically has a delayed view of the cluster, meaning that some things that happened in the future, may not exist yet. so we need to be careful about how this is done so affinity rules are not accidentally broken. mostly this means, if we do anything that could affect the outcome of an affinity rule (ex: network request to an agent), we need to update the cache, before we do the action. + fn score_agent( + &self, + msg: &CreateVM, + agent: &RefMulti<'_, ActorId, CachedAgentActor> + ) -> AgentScore { + let mut score = AgentScore::default(); + + let agent_max_vcpus = agent.metadata.vcpus * VCPU_OVERPROVISIONMENT_NUMERATOR / VCPU_OVERPROVISIONMENT_DENOMINATOR; + // todo: do we care about VMData.max_vcpus? + let agent_used_vcpus = agent.metadata.used_vcpus + msg.config.data.vcpus; + + if agent_used_vcpus >= agent_max_vcpus { + return AgentScore::REJECTED; + } + score.general += (agent_max_vcpus - agent_used_vcpus) as f32 / agent_max_vcpus as f32; -#[derive(Copy, Clone)] -struct AgentActorCacheUpdater; -#[derive(Debug, Clone)] -pub struct CachedAgentActor { - pub actor_ref: RemoteActorRef, - pub metadata: AgentStatus, -} + // todo: add ram overprovisionment. not adding this to scheduler until it works on the hypervisor side. + let agent_max_ram = agent.metadata.ram; + let agent_used_ram = agent.metadata.used_ram + msg.config.data.memory; + + if agent_used_ram >= agent_max_ram { + return AgentScore::REJECTED; + } -#[async_trait] -impl ActorCacheUpdater for AgentActorCacheUpdater { - async fn get_actor_refs(&self) -> Result>> { - let mut agent_actors_lookup = RemoteActorRef::::lookup_all(AGENT); - let mut actor_ref_vec = Vec::new(); + score.general += (agent_max_ram.as_u64() - agent.metadata.used_ram.as_u64()) as f32 / agent_max_ram.as_u64() as f32; - while let Some(agent_actor) = agent_actors_lookup.try_next().await? { - actor_ref_vec.push(agent_actor); + + // todo: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + + + if let Some(affinity_rules) = &msg.config.affinity { + for rule in affinity_rules { + let mut meets_requirements = false; + + let lhs_values: Vec = Vec::with_capacity(1); + + match &rule.affinity_type { + AffinityType::VirtualMachine(zone) => todo!(), + AffinityType::Agent => lhs_values.push(agent.metadata.), // todo: this should be object metadata, but I just realized that field isnt included in the data we have in the cache + } + + // in theory the next bit of this could would loop through all the lhs values and all the requirements and then actually do the computation, but I got busy with other work. + + for requirement in &rule.requirements { + let requirement_outcome = true; // todo: set this based on requirement + + if requirement_outcome { + meets_requirements = true; + break; + } + } + + let follows_rule = meets_requirements ^ rule.inverse; + + match (rule.strictness, follows_rule) { + (AffinityStrictness::Required, false) => return AgentScore::REJECTED, + (AffinityStrictness::Required, true) => {}, // specifically do nothing + (AffinityStrictness::Preferred { weight }, follows_rule) => { + score.affinity += follows_rule as i64 * weight; + }, + } + } } - Ok(actor_ref_vec) - } - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result { - let output_actor_ref = match previous_value { - Some(value) => value.actor_ref, - _ => actor_ref.clone(), - }; - Ok(CachedAgentActor { - actor_ref: output_actor_ref, - metadata: actor_ref.ask(&GetAgentStatus).await? - }) + // todo (future): possibly keep a percent of agents completely empty, to be able to be converted to dedis automatically. + // they would have their agent score set to like f32::MIN, so they can be scheduled to if there is no other available agents. + // rough pseudo code to implement this: + // if agent.metadata.vms.len() == 0 && hash(agent.config.hostname) % total_chance < threshold { + // agent_score = 1; + // } + + score } } - -// todo: this code is really bad, and we should not have effectively two copies of ths same thing. -#[derive(Copy, Clone)] -struct VMActorCacheUpdater; - -#[derive(Debug, Clone)] -pub struct CachedVMActor { - pub actor_ref: RemoteActorRef, - pub metadata: GetVMInfoReply, +#[derive(Debug, Clone, Copy, PartialEq)] +struct AgentScore { + general: f32, + affinity: i64 } -#[async_trait] -impl ActorCacheUpdater for VMActorCacheUpdater { - async fn get_actor_refs(&self) -> Result>> { - let mut agent_actors_lookup = RemoteActorRef::::lookup_all(VM); - let mut actor_ref_vec = Vec::new(); +impl AgentScore { + pub const REJECTED: Self = Self { + general: f32::NEG_INFINITY, + affinity: i64::MIN + }; +} - while let Some(agent_actor) = agent_actors_lookup.try_next().await? { - actor_ref_vec.push(agent_actor); +impl Default for AgentScore { + fn default() -> Self { + Self { + general: 0.0, + affinity: 0 } - - Ok(actor_ref_vec) } +} - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result { - let output_actor_ref = match previous_value { - Some(value) => value.actor_ref, - _ => actor_ref.clone(), - }; +impl PartialOrd for AgentScore { + fn partial_cmp(&self, other: &Self) -> Option { + let affinity_cmp = self.affinity.cmp(&other.affinity); - Ok(CachedVMActor { - actor_ref: output_actor_ref, - metadata: actor_ref.ask(&GetVMInfo {vmid: None}).await? - }) + if affinity_cmp != Ordering::Equal { + return Some(affinity_cmp); + } + + self.general.partial_cmp(&other.general) } } + impl Actor for SchedulerActor { type Args = (); type Error = Report; @@ -197,29 +443,66 @@ impl Actor for SchedulerActor { info!(?peer_id, "Scheduler Actor started!"); - Ok(Self { - agent_actor_cache: ActorCache::new(actor_ref.clone(), AgentActorCacheUpdater)?, - vm_actor_cache: ActorCache::new(actor_ref, VMActorCacheUpdater)? - }) + let mut scheduler_actor = SchedulerActor { + agent_data_cache: Arc::new(DashMap::new()), + agent_keepalive_tasks: Arc::new(DashMap::new()), + vm_actorid_ulid_map: Arc::new(DashMap::new()), + vm_data_cache: Arc::new(DashMap::new()), + vm_keepalive_tasks: Arc::new(DashMap::new()), + cache_actor_finder: None, + }; + + scheduler_actor.start_actor_finder(actor_ref.into_remote_ref().await); + + Ok(scheduler_actor) } async fn on_link_died( &mut self, actor_ref: WeakActorRef, - id: ActorId, + actor_id: ActorId, reason: ActorStopReason, ) -> Result, Self::Error> { - warn!("Linked actor {id:?} died with reason {reason:?}"); + warn!(?actor_id, ?reason, "Linked actor died"); + // check that scheduler actor is still alive. let Some(_) = actor_ref.upgrade() else { return Ok(ControlFlow::Break(ActorStopReason::Killed)); }; - self.agent_actor_cache.on_link_died(id).await; - self.vm_actor_cache.on_link_died(id).await; - info!(vm_actor_cache=?self.agent_actor_cache.data_cache, agent_actor_cache=?self.vm_actor_cache.data_cache, "data caches post actor removal"); + if let Some((_, keepalive_task)) = self.agent_keepalive_tasks.remove(&actor_id) { + trace!(?actor_id, "Aborting agent keepalive task"); + keepalive_task.abort(); + }; + + self.agent_data_cache.remove(&actor_id); + + + if let Some((_, keepalive_task)) = self.vm_keepalive_tasks.remove(&actor_id) { + trace!(?actor_id, "Aborting vm keepalive task"); + keepalive_task.abort(); + }; + + if let Some((_, vmid)) = self.vm_actorid_ulid_map.remove(&actor_id) { + // todo: this solution definitely isn't optimal. we likely should be storing which agent actor, this specific actor id is scheduled on and specifically removing it from that one. + // especially because things get iffy with migration, but im ignoring that for the moment. + for mut agent in self.agent_data_cache.iter_mut() { + agent.extended_vm_set.remove(&vmid); + } + + // todo: we likely should keep a copy of the VirtualMachine manifest in the cache. + // instead of removing the vm entirely, we should just modify the status to shutdown or crashed or something. + // + // another potential issue is that the link dying doesn't guarantee that the vm is dead. if there is a networking partition, things get iffy. + trace!(?actor_id, ?vmid, "Removing vm from vm_data_cache"); + self.vm_data_cache.remove(&vmid); + } + + + // todo: attempt vm restarts if necessary. + Ok(ControlFlow::Continue(())) } @@ -232,20 +515,42 @@ impl Message for SchedulerActor { type Reply = Result; async fn handle(&mut self, msg: CreateVM, _ctx: &mut Context) -> Self::Reply { - loop { - let target_agent = self.schedule_agent(&msg).await?; - - match target_agent.ask(&msg).await { - Ok(reply) => { - return Ok(reply) - }, - Err(err) => { - warn!( - "CreateVM forwarding failed, trying again: {err}" - ); + let target_agent = self.schedule_agent(&msg)?; + + // we add to cache first, because we want to make sure future requests assume this vm exists. if the message fails, we clean it up afterward. + if let Some(mut cached_data) = self.agent_data_cache.get_mut(&target_agent.id()) { + cached_data.extended_vm_set.insert(msg.vmid); + } else { + return Err(eyre!("target agent is not in data cache")) + } + + self.vm_data_cache.insert( + msg.vmid, + CachedVMActor { + actor_ref: None, + metadata: GetVMInfoReply { + vmid: msg.vmid, + config: Some(msg.config.clone()) } - } + }); + + + + let reply = target_agent.ask(&msg).await; + + + // remove from caches if we fail to schedule + if reply.is_err() { + self.agent_data_cache.alter(&target_agent.id(), |_,mut v| { + v.extended_vm_set.remove(&msg.vmid); + + v + }); + self.vm_data_cache.remove(&msg.vmid); } + + + Ok(reply?) } } @@ -259,7 +564,7 @@ impl Message for SchedulerActor { ) -> Self::Reply { let vm = RemoteActorRef::::lookup(vm_actor_id(msg.vmid)).await?; tracing::trace!(?vm, "DeleteVM"); - if let Some(vm) = vm { + if let Some(vm) = vm { // don't update cache, because we rely on link dying and updater task to remove from cache once the VM is fully down. vm.tell(&msg).send()?; Ok(DeleteVMReply) } else { @@ -278,7 +583,7 @@ impl Message for SchedulerActor { ) -> Self::Reply { let vm = RemoteActorRef::::lookup(vm_actor_id(msg.vmid)).await?; tracing::trace!(?vm, "ShutdownVM"); - if let Some(vm) = vm { + if let Some(vm) = vm { // don't update cache, because we rely on link dying and updater task to remove from cache once the VM is fully down. vm.tell(&msg).send()?; Ok(ShutdownVMReply) } else { @@ -300,7 +605,7 @@ impl Message for SchedulerActor { ) -> Self::Reply { let mut vms = Vec::new(); - for agent in self.agent_actor_cache.data_cache.iter() { + for agent in self.agent_data_cache.iter() { vms.extend_from_slice(agent.metadata.vms.as_slice()); } diff --git a/odorobo/src/ch_driver/actor.rs b/odorobo/src/ch_driver/actor.rs index 414b00b..b3a9f92 100644 --- a/odorobo/src/ch_driver/actor.rs +++ b/odorobo/src/ch_driver/actor.rs @@ -28,16 +28,17 @@ pub struct VMActor { /// path to the Cloud Hypervisor socket, in /run/odorobo/vms//ch.sock pub vm_instance: VMInstance, pub migration_state: Option, + pub manifest: Option } impl Actor for VMActor { - // tuple of VM ID and optional config + // tuple of VM ID and manifest type Args = (ulid::Ulid, Option); type Error = Report; #[tracing::instrument(skip_all)] - async fn on_start((vmid, vm_config): Self::Args, actor_ref: ActorRef) -> Result { - let mut vminstance = VMInstance::spawn(&vmid.to_string(), vm_config.map(VmConfig::from), None).await?; + async fn on_start((vmid, vm_manifest): Self::Args, actor_ref: ActorRef) -> Result { + let mut vminstance = VMInstance::spawn(&vmid.to_string(), vm_manifest.clone().map(VmConfig::from), None).await?; // Take the child process out so we can watch for unexpected death. // destroy() handles a missing child_process gracefully. @@ -69,6 +70,7 @@ impl Actor for VMActor { vmid, vm_instance: vminstance, migration_state: None, + manifest: vm_manifest }) } @@ -157,7 +159,7 @@ impl Message for VMActor { ) -> Self::Reply { GetVMInfoReply { vmid: self.vmid, - config: self.vm_instance.vm_config.clone(), + config: self.manifest.clone(), // we likely dont want to send the entire manifest on every update, but some of this data is required and this is easier for now. } } } diff --git a/odorobo/src/config.rs b/odorobo/src/config.rs index 8b261ea..acb5287 100644 --- a/odorobo/src/config.rs +++ b/odorobo/src/config.rs @@ -144,13 +144,10 @@ pub struct Config { /// The number of VCPUs reserved for the agent. Defaults to 2. #[serde(default = "default_reserved_vcpus")] pub reserved_vcpus: u32, - /// this is just arbitrary data that will be shown but does no config - /// Arbitrary labels that can be used + + /// Arbitrary data that the infra team can set for notes for themselves. odorobo does not directly use these, but does include them in that can be used #[serde(default)] pub labels: AHashMap, - /// Arbitrary annotations that can be used - #[serde(default)] - pub annotations: AHashMap, #[serde(default)] pub network: NetworkConfig, diff --git a/odorobo/src/messages/vm.rs b/odorobo/src/messages/vm.rs index 512a793..8318a5a 100644 --- a/odorobo/src/messages/vm.rs +++ b/odorobo/src/messages/vm.rs @@ -99,5 +99,5 @@ pub struct GetVMInfo { #[derive(Serialize, Deserialize, Reply, Debug, Clone)] pub struct GetVMInfoReply { pub vmid: Ulid, - pub config: Option, + pub config: Option, } diff --git a/odorobo/src/types.rs b/odorobo/src/types.rs index b526cf8..95e53bb 100644 --- a/odorobo/src/types.rs +++ b/odorobo/src/types.rs @@ -175,12 +175,14 @@ pub struct VirtualMachine { pub struct AffinityRule { pub strictness: AffinityStrictness, pub affinity_type: AffinityType, - pub direction: AffinityDirection, + /// if true, the outcome of the requirements is inverted + #[serde(default)] + pub inverse: bool, /// ORed together pub requirements: Vec } -#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone, Copy)] pub enum AffinityStrictness { Required, Preferred { weight: i64 } @@ -188,15 +190,11 @@ pub enum AffinityStrictness { #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum AffinityType { - VirtualMachine, + VirtualMachine(Zone), Agent } -#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] -pub enum AffinityDirection { - Normal, - Anti -} +pub type Zone = String; #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct AffinityRequirement { @@ -212,7 +210,6 @@ pub enum MetadataTable { Annotation } -// todo: possibly replace with std::ops #[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum Operator { In, diff --git a/odorobo/src/utils/actor_cache.rs b/odorobo/src/utils/actor_cache.rs deleted file mode 100644 index eb4b45b..0000000 --- a/odorobo/src/utils/actor_cache.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::{marker::PhantomData, sync::Arc, time::Duration}; - -use async_trait::async_trait; -use dashmap::DashMap; -use kameo::{prelude::*}; -use tokio::task::JoinHandle; -use stable_eyre::{Report, Result}; -use tracing::{info, instrument, trace}; - -use std::fmt; - -// TODO: refactor to use derive macro, but I (caleb) don't know how to write a derive macro. -// -// The best way to write this would be using a derive similar to kameo -// so you would create a struct with #[derive(ActorCache)]. -// Then you would set the types and then write get_actor_refs and on_update. -// This would combine everything into one struct and make it a lot easier to work with. -// -// similar to: https://github.com/tqwewe/kameo/blob/1d498c0566b613b9afe6d54965c4b191c84432e0/src/actor.rs#L122 -// -// -// other things we might want: -// - a default get_actor_refs that just finds all actor_refs with a specific actor string. -// - change get_actor_refs to use an iterator - - -#[async_trait] -pub trait ActorCacheUpdater: Sync + Send + Copy + 'static { - // todo: this could probably be better if it was an iterator, but I am lazy and don't want to right now. - async fn get_actor_refs(&self) -> Result>>; - async fn on_update(&self, actor_ref: &RemoteActorRef, previous_value: Option) -> Result; -} - - -#[derive(Debug)] -pub struct ActorCache { - parent_actor_ref: ActorRef, - pub data_cache: Arc>, - keepalive_tasks: Arc>>, - actor_finder: Option>, - - child_actor_type: PhantomData -} - -// todo: impl Drop to automatically kill all the keepalive_tasks and the actor_finder task. - -impl ActorCache { - pub fn new( - parent_actor_ref: ActorRef, - updater: impl ActorCacheUpdater - ) -> Result { - - let data_cache = Arc::new(DashMap::new()); - let keepalive_tasks = Arc::new(DashMap::new()); - - let actor_cache = ActorCache { - parent_actor_ref: parent_actor_ref.clone(), - data_cache: data_cache, - keepalive_tasks: keepalive_tasks, - actor_finder: None, - - child_actor_type: PhantomData - }; - - actor_cache.start_actor_finder(parent_actor_ref, updater); - - Ok(actor_cache) - } - - /// run this function inside of the on_link_died of the ParentActor - pub async fn on_link_died( - &self, - id: ActorId - ) { - info!("removing agent actor from cache {id:?}"); - - if let Some(actor_keepalive_task) = self.keepalive_tasks.remove(&id) { - trace!("Aborting keepalive task for agent {id:?}"); - actor_keepalive_task.1.abort(); - }; - - self.data_cache.remove(&id); - } - - fn start_actor_finder( - &self, - parent_actor_ref: ActorRef, - updater: impl ActorCacheUpdater - ) { - let keepalive_tasks_clone = Arc::clone(&self.keepalive_tasks); - let data_cache_clone = Arc::clone(&self.data_cache); - - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - let _ = Self::actor_finder( - parent_actor_ref.clone(), - Arc::clone(&keepalive_tasks_clone), - Arc::clone(&data_cache_clone), - updater - ).await; - - interval.tick().await; - } - }); - } - - async fn actor_finder( - parent_actor_ref: ActorRef, - keepalive_tasks: Arc>>, - data_cache: Arc>, - updater: impl ActorCacheUpdater - ) -> Result<(), Report> { - let actor_refs = updater.get_actor_refs().await?; - - info!(?actor_refs, "running actor_finder"); - - for actor_ref in actor_refs { - if !keepalive_tasks.contains_key(&actor_ref.id()) { - trace!(?actor_ref, "starting updater_task"); - - parent_actor_ref.link_remote(&actor_ref).await?; - - let actor_ref_clone = actor_ref.clone(); - let data_cache_clone = Arc::clone(&data_cache); - let updater_task = tokio::spawn(async move { - Self::updater_task( - actor_ref_clone, - data_cache_clone, - updater - ).await; - }); - - keepalive_tasks.insert( - actor_ref.id(), - updater_task - ); - } - } - - Ok(()) - } - - #[instrument(skip_all)] - async fn updater_task( - actor_ref: RemoteActorRef, - data_cache: Arc>, - updater: impl ActorCacheUpdater - ) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - - loop { - let actor_id = actor_ref.id(); - - let mut previous_value_option = None; - - - - if let Some(data_ref) = data_cache.get(&actor_id) { - previous_value_option = Some(data_ref.clone()); - } - - - if let Ok(update) = updater.on_update(&actor_ref, previous_value_option).await { - data_cache.insert( - actor_id, - update.clone() - ); - } - - interval.tick().await; - } - } -} diff --git a/odorobo/src/utils/mod.rs b/odorobo/src/utils/mod.rs index 01c7125..819d3ef 100644 --- a/odorobo/src/utils/mod.rs +++ b/odorobo/src/utils/mod.rs @@ -1,5 +1,4 @@ pub mod actor_names; -pub mod actor_cache; use aide::OperationIo; use stable_eyre::{Result, Report};