diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..edf474a --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,504 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + +[[package]] +name = "env_logger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50" + +[[package]] +name = "lock_api" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "molecular-rust" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "derive_more", + "env_logger", + "lazy_static", + "log", + "regex", + "serde_json", + "thiserror", + "tokio", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + +[[package]] +name = "proc-macro2" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + +[[package]] +name = "ryu" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "semver" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" + +[[package]] +name = "serde" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" + +[[package]] +name = "serde_json" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "syn" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi", + "winapi", +] + +[[package]] +name = "tokio" +version = "1.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b81d0a9 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "molecular-rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.19" +env_logger = "0.9.0" +log= "0.4.14" +serde_json = "1.0.0" +anyhow = "1.0.53" +tokio = { version = "1.16.1", features = ["full"] } +regex = "1.5.4" +lazy_static = "1.3.0" +derive_more = "0.99.17" +thiserror = "1.0" \ No newline at end of file diff --git a/makefile b/makefile new file mode 100644 index 0000000..d7ea6f4 --- /dev/null +++ b/makefile @@ -0,0 +1,4 @@ +#Run library test while suppressing the warnings +test: + RUSTFLAGS=-Awarnings cargo test --lib -- --nocapture +.PHONY: test \ No newline at end of file diff --git a/src/bin/entrypoint.rs b/src/bin/entrypoint.rs new file mode 100644 index 0000000..f328e4d --- /dev/null +++ b/src/bin/entrypoint.rs @@ -0,0 +1 @@ +fn main() {} diff --git a/src/broker.rs b/src/broker.rs new file mode 100644 index 0000000..9d8d1e2 --- /dev/null +++ b/src/broker.rs @@ -0,0 +1,119 @@ +use std::{ + collections::HashMap, + sync::{ + mpsc::{Receiver, }, + Arc, + }, +}; + +use anyhow::{bail, Result}; + +use chrono:: Utc; + +use crate::{ + registry:: Logger, + service::ServiceSpec, + Registry, Service, +}; + +pub struct ServiceBroker { + reciever: Receiver, + started: bool, + namespace: Option, + metdata: HashMap, + pub node_id: String, + instance: String, + services: Vec, + pub transit: Option, + pub logger: Arc, + /* + local bus + options + logger + metricss + middlewere + cacher + serializer + error generator + validator + tracer + transporter + */ + registry: Registry, +} + +pub struct Transit {} + +impl ServiceBroker { + fn start(&mut self) { + let time = Utc::now(); + self.started = true; + } + + fn add_local_service(&mut self, service: Service) { + self.services.push(service); + } + fn register_local_service(&mut self, service: ServiceSpec) { + self.registry.register_local_service(service); + } + + fn destroy_service(&mut self, name: &str, version: &str) -> Result<()> { + let service_index = self.get_local_service_index(name, version); + if let None = service_index { + bail!( + "no service with the name {} and version {} found", + name, + version + ); + } + let service_index = service_index.unwrap(); + let mut full_name = "".to_string(); + + { + let service = self.services.get_mut(service_index).unwrap(); + full_name = service.full_name.clone(); + service.stop(); + } + { + self.services.remove(service_index); + } + + self.registry + .unregister_service(&full_name, Some(&self.node_id)); + self.services_changed(true); + Ok(()) + } + + fn services_changed(&self, local_service: bool) { + if (self.started && local_service) { + todo!("notifify remote nodes") + } + } + fn get_local_service_index(&self, name: &str, version: &str) -> Option { + self.services.iter().position(|s| { + if s.name == name && s.version == version { + return true; + } + return false; + }) + } +} +#[derive(PartialEq ,Debug)] +pub enum ServiceBrokerMessage { + AddLocalService(Service), + RegisterLocalService(ServiceSpec), + WaitForServices { + dependencies: Vec, + timeout: i64, + interval: i64, + }, +} + +fn remove_from_list(list: &mut Vec, value: &T) { + list.retain(|t| { + if t == value { + return false; + } + return true; + }); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a33a840 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,10 @@ +pub mod registry; +pub mod strategies; +pub mod service; +pub mod broker; +pub mod logger; +pub mod packet; +pub use service::Service; +pub use registry::Registry; +pub use broker::ServiceBrokerMessage; +pub use broker::ServiceBroker; \ No newline at end of file diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..993ea22 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,6 @@ + +trait LoggerTrait { + fn init(); + fn stop(); + fn get_log_handler(); +} diff --git a/src/packet.rs b/src/packet.rs new file mode 100644 index 0000000..74bfdaf --- /dev/null +++ b/src/packet.rs @@ -0,0 +1,19 @@ + +enum PacketType { + Unknown, + Event, + Request, + Response, + Discover, + Info, + Disconnect, + Heartbeat, + Ping, + Pongs, +} + +impl From for String { + fn from(p: PacketType) -> Self { + "as".to_string() + } +} diff --git a/src/registry/action_catalog.rs b/src/registry/action_catalog.rs new file mode 100644 index 0000000..3675d77 --- /dev/null +++ b/src/registry/action_catalog.rs @@ -0,0 +1,84 @@ +use std::collections::HashMap; + +use super::*; + +#[derive(PartialEq, Eq)] +pub struct ActionCatalog { + actions: ActionsMap, +} + +impl ActionCatalog { + pub fn new() -> Self { + Self { + actions: HashMap::new(), + } + } + pub fn add(&mut self, node: Arc, service: Arc, action: Action) { + let list = self.actions.get_mut(&action.name); + match list { + Some(list) => list.add(node, service, action), + None => { + let name = action.name.clone(); + let mut list = EndpointList::new(name, None); + let name = action.name.clone(); + list.add(node, service, action); + self.actions.insert(name, list); + } + } + } + pub fn get(&self, action_name: &str) -> Option<&EndpointList> { + self.actions.get(action_name) + } + fn is_available(&self, action_name: &str) -> bool { + match self.actions.get(action_name) { + Some(el) => el.has_available(), + None => false, + } + } + fn remove_by_service(&mut self, service: &ServiceItem) { + self.actions.iter_mut().for_each(|item| { + let (key, el) = item; + el.remove_by_service(service); + }); + } + pub fn remove(&mut self, action_name: &str, node_id: &str) { + let list = self.actions.get_mut(action_name); + if let Some(el) = list { + el.remove_by_node_id(node_id); + } + } + pub fn list(&self, opts: ListOptions) -> Vec<&EndpointList> { + let res: HashMap<&String, &EndpointList> = self + .actions + .iter() + .filter(|item| { + let (name, ep_list) = item; + if opts.skip_internal && get_internal_service_regex_match(&name) { + return false; + } + if opts.only_local && !ep_list.has_local() { + return false; + } + if opts.only_available && !ep_list.has_available() { + return false; + } + if ep_list.count() > 0 { + let ep = ep_list.endpoints.get(0); + if let Some(ep) = ep { + if ep.action.visibility == Visibility::Protected { + return false; + } + } + } + return true; + }) + .collect(); + let res = res + .values() + .map(|ep| { + return ep.to_owned(); + }) + .collect(); + res + } +} diff --git a/src/registry/action_endpoint.rs b/src/registry/action_endpoint.rs new file mode 100644 index 0000000..948ba8c --- /dev/null +++ b/src/registry/action_endpoint.rs @@ -0,0 +1,44 @@ +use super::*; + +#[derive(PartialEq, Eq, Clone)] +pub struct ActionEndpoint { + endpoint: Endpoint, + pub action: Action, + pub name: String, +} + +impl EndpointTrait for ActionEndpoint { + type Data = Action; + fn update(&mut self, data: Self::Data) { + self.action = data; + } + fn new(node: Arc, service: Arc, data: Self::Data) -> Self { + let endpoint = Endpoint::new(node, service); + let name = format!("{}:{}", endpoint.id, data.name); + Self { + endpoint, + name, + action: data, + } + } + + fn node(&self) -> &Node { + &self.endpoint.node + } + + fn service(&self) -> &ServiceItem { + &self.endpoint.service + } + fn is_local(&self) -> bool { + self.endpoint.local + } + fn is_available(&self) -> bool { + self.endpoint.state + } + fn id(&self) -> &str { + &self.endpoint.id + } + fn service_name(&self) -> &str { + &self.endpoint.service.name + } +} diff --git a/src/registry/endpoint_list.rs b/src/registry/endpoint_list.rs new file mode 100644 index 0000000..f0d2715 --- /dev/null +++ b/src/registry/endpoint_list.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +use super::*; +#[derive(PartialEq, Eq, Clone)] +pub struct EndpointList { + pub name: String, + group: Option, + internal: bool, + pub endpoints: Vec, + local_endpoints: Vec, +} + +impl EndpointList { + pub fn new(name: String, group: Option) -> Self { + let internal = name.starts_with("$"); + let endpoints = Vec::new(); + let local_endpoints = Vec::new(); + + Self { + name, + group, + endpoints, + local_endpoints, + internal, + } + } + + pub fn add(&mut self, node: Arc, service: Arc, data: T::Data) { + let entry = self + .endpoints + .iter_mut() + .find(|x| x.node() == &*node && x.service().name == service.name); + + match entry { + Some(found) => { + found.update(data); + return; + } + None => {} + } + + let ep = T::new(Arc::clone(&node), Arc::clone(&service), data); + + self.endpoints.push(ep.clone()); + if ep.is_local() { + self.local_endpoints.push(ep) + } + } + fn get_first(&self) -> Option<&T> { + self.endpoints.get(0) + } + + fn select(&self) -> &T { + todo!() + } + + fn next(&self) -> &T { + todo!() + } + fn next_local(&self) -> &T { + todo!() + } + + pub fn has_available(&self) -> bool { + for ep in self.endpoints.iter() { + if ep.is_available() { + return true; + } + } + return false; + } + pub fn has_local(&self) -> bool { + self.local_endpoints.len() > 0 + } + + fn update_local_endpoints(&mut self) { + let mut local: Vec = Vec::new(); + for ep in &self.endpoints { + if ep.is_local() { + let e = ep.clone(); + local.push(e); + } + } + std::mem::swap(&mut local, &mut self.local_endpoints); + drop(local); + } + + pub fn count(&self) -> usize { + self.endpoints.len() + } + pub fn get_endpoint_by_node_id(&self, node_id: &str) -> Option<&T> { + self.endpoints + .iter() + .find(|e| e.id() == node_id && e.is_available()) + } + fn has_node_id(&self, node_id: &str) -> bool { + match self.endpoints.iter().find(|e| e.id() == node_id) { + Some(_) => true, + None => false, + } + } + pub fn remove_by_service(&mut self, service: &ServiceItem) { + self.endpoints.retain(|ep| { + let delete = ep.service() == service; + !delete + }); + self.update_local_endpoints(); + } + + pub fn remove_by_node_id(&mut self, node_id: &str) { + self.endpoints.retain(|ep| { + let delete = ep.id() == node_id; + !delete + }); + self.update_local_endpoints(); + } +} diff --git a/src/registry/event_endpoint.rs b/src/registry/event_endpoint.rs new file mode 100644 index 0000000..d8cd7d6 --- /dev/null +++ b/src/registry/event_endpoint.rs @@ -0,0 +1,42 @@ +use super::*; + +#[derive(Clone)] +pub struct EventEndpoint { + endpoint: Endpoint, + event: Event, +} + +impl EndpointTrait for EventEndpoint { + type Data = Event; + fn update(&mut self, data: Self::Data) { + self.event = data; + } + fn new(node: Arc, service: Arc, data: Self::Data) -> Self { + let endpoint = Endpoint::new(node, service); + Self { + endpoint, + + event: data, + } + } + + fn node(&self) -> &Node { + &self.endpoint.node + } + + fn service(&self) -> &ServiceItem { + &self.endpoint.service + } + fn is_local(&self) -> bool { + self.endpoint.local + } + fn is_available(&self) -> bool { + self.endpoint.state + } + fn id(&self) -> &str { + &self.endpoint.id + } + fn service_name(&self) -> &str { + &self.endpoint.service.name + } +} diff --git a/src/registry/mod.rs b/src/registry/mod.rs new file mode 100644 index 0000000..6e7dd8c --- /dev/null +++ b/src/registry/mod.rs @@ -0,0 +1,133 @@ +pub mod action_catalog; +pub mod action_endpoint; +pub mod endpoint_list; +pub mod event_endpoint; +pub mod node; +pub mod node_catalog; +pub mod registry; +pub mod service_catalog; +pub mod service_item; + +use std::collections::HashMap; +use std::sync::Arc; + +use super::service::Service; +use crate::strategies::Strategy; +use action_catalog::ActionCatalog; +pub use action_endpoint::ActionEndpoint; +pub use endpoint_list::EndpointList; +pub use event_endpoint::EventEndpoint; +pub use node::{Client, Node}; +use node_catalog::NodeCatalog; +use regex::Regex; +pub use registry::Registry; +use service_catalog::ServiceCatalog; +use service_item::ServiceItem; +use lazy_static::lazy_static; +// pub use event_endpoint::EventEndpoint; + + +type ActionsMap = HashMap>; + +fn get_internal_service_regex_match(text: &str) -> bool { + lazy_static! { + static ref RE: Regex = Regex::new(r"^\$").unwrap(); + + } + RE.is_match(text) +} + + +#[derive(PartialEq, Eq)] +pub struct Logger {} + +trait FnType {} + +#[derive(PartialEq, Eq, Clone , Debug)] +pub struct Action { + pub name: String, + visibility: Visibility, + handler: fn(), + // service: Option, +} + +impl Action { + pub fn new(name: String, handler: fn()) -> Self { + Self { + name, + visibility: Visibility::Protected, + handler, + // service: None, + } + } + // pub fn set_service(mut self, service: Service) -> Action { + // self.service = Some(service); + // self + // } +} + +#[derive(PartialEq, Eq, Clone , Debug)] +enum Visibility { + Published, + Public, + Protected, + Private, +} +#[derive(PartialEq, Eq, Clone ,Debug)] +pub struct Event {} +impl FnType for Event {} + +///Endpoint trait for endpoint list +pub trait EndpointTrait { + ///Data is eiter an Action struct or Event structs + type Data; + fn new(node: Arc, service: Arc, data: Self::Data) -> Self; + fn node(&self) -> &Node; + fn service(&self) -> &ServiceItem; + fn update(&mut self, data: Self::Data); + fn is_local(&self) -> bool; + fn is_available(&self) -> bool; + fn id(&self) -> &str; + fn service_name(&self) -> &str; +} + +#[derive(PartialEq, Eq, Clone)] +struct Endpoint { + node: Arc, + service: Arc, + state: bool, + id: String, + local: bool, +} + +impl Endpoint { + fn new(node: Arc, service: Arc) -> Self { + let id = node.id.to_string(); + let local = service.local; + Self { + node, + service, + state: true, + id: id, + local, + } + } +} + +enum EndpointType { + Action, + Event, +} + +#[derive(PartialEq, Eq)] +pub struct Opts { + strategy: T, +} +pub struct ListOptions { + only_local: bool, + only_available: bool, + skip_internal: bool, + with_actions: bool, + with_events: bool, + grouping: bool, +} diff --git a/src/registry/node.rs b/src/registry/node.rs new file mode 100644 index 0000000..8fdc6ad --- /dev/null +++ b/src/registry/node.rs @@ -0,0 +1,114 @@ +use std::{net::IpAddr, sync::Arc}; + +use chrono::Duration; +use serde_json::Value; + +use super::ServiceItem; + +#[derive(PartialEq, Eq, Clone)] +pub struct Node { + pub id: String, + instance_id: Option, + pub available: bool, + pub local: bool, + last_heartbeat_time: Duration, + /* feields that need to be added later. + config + + metadata + */ + client: Option, + ip_list: Vec, + port: Option, + hostname: Option, + udp_address: Option, + pub raw_info: Option, + /* + cpu + cpuseq + */ + pub services: Vec>, + pub seq: usize, + offline_since: Option, +} + +impl Node { + pub fn new(id: String) -> Self { + Self { + id: id, + instance_id: None, + available: true, + local: false, + client: None, + raw_info: None, + /* + change this later with actual process uptime. + */ + last_heartbeat_time: Duration::seconds(1), + ip_list: Vec::new(), + port: None, + hostname: None, + udp_address: None, + services: Vec::new(), + seq: 0, + offline_since: None, + } + } + pub fn update(&mut self) { + todo!() + } + pub fn update_local_info(&mut self) { + todo!() + } + pub fn hearbeat(&mut self) { + if !self.available { + self.available = true; + self.offline_since = None; + } + todo!() + } + pub fn disconnect(&mut self) { + if self.available { + self.seq = self.seq.saturating_add(1); + /* update this with process uptime + self.offline_since = + */ + } + self.available = false; + } + + pub fn services_len(&self) -> usize { + self.services.len() + } + pub fn set_local(mut self, value: bool) -> Self { + self.local = value; + self + } + pub fn set_ip_list(mut self, ip_list: Vec) -> Self { + self.ip_list = ip_list; + self + } + pub fn set_instance_id(mut self, instance_id: String) -> Self { + self.instance_id = Some(instance_id); + self + } + pub fn set_hostname(mut self, hostname: String) -> Self { + self.hostname = Some(hostname); + self + } + pub fn set_client(mut self, client: Client) -> Self { + self.client = Some(client); + self + } + pub fn set_seq(mut self, seq: usize) -> Self { + self.seq = seq; + self + } +} +#[derive(PartialEq, Eq, Clone)] + +pub struct Client { + pub(crate) client_type: String, + pub(crate) version: String, + pub(crate) lang_version: String, +} diff --git a/src/registry/node_catalog.rs b/src/registry/node_catalog.rs new file mode 100644 index 0000000..be80799 --- /dev/null +++ b/src/registry/node_catalog.rs @@ -0,0 +1,123 @@ +use std::{collections::HashMap, net::IpAddr, sync::Arc}; + +use std::sync::RwLock; + +use super::{node, Client, Logger, Node, Registry}; + + +pub struct NodeCatalog { + nodes: HashMap, + pub local_node: Option>, +} +impl NodeCatalog { + pub fn new() -> Self { + Self { + nodes: HashMap::new(), + local_node: None, + } + } + ///Create a local node + fn create_local_node(&mut self, version: String, node_id: String, instance_id: String) -> Arc { + let client = Client { + client_type: "rust".to_string(), + lang_version: "1.56.1".to_string(), + version: version, + }; + let node = Node::new(node_id) + .set_local(true) + .set_ip_list(get_ip_list()) + .set_instance_id(instance_id) + .set_hostname(get_hostname()) + .set_seq(1) + .set_client(client); + + self.nodes.insert(node.id.to_string(), node.clone()); + let node = Arc::new(node); + let node_c = Arc::clone(&node); + self.local_node = Some(node); + return node_c; + todo!() + /* + node.metadata = self.broker.metadata.clone() + */ + } + pub fn add(&mut self, id: &str, node: Node) { + self.nodes.insert(id.to_string(), node); + } + pub fn had_node(&self, id: &str) -> bool { + match self.nodes.get(id) { + Some(_) => true, + None => false, + } + } + pub fn get_node(&self, id: &str) -> Option<&Node> { + self.nodes.get(id) + } + pub fn get_node_mut(&mut self, id: &str) -> Option<&mut Node> { + self.nodes.get_mut(id) + } + pub fn delete(&mut self, id: &str) -> Option { + self.nodes.remove(id) + } + pub fn count(&self) -> usize { + self.nodes.len() + } + pub fn online_count(&self) -> usize { + let mut count: usize = 0; + self.nodes.iter().for_each(|node_item| { + let (_, node) = node_item; + if node.available { + count = count.saturating_add(1); + } + }); + count + } + pub fn process_node_info(&self) { + todo!() + } + pub fn disconnect(&mut self) { + todo!() + } + + pub fn list(&self, only_available: bool, with_services: bool) -> Vec<&Node> { + self.nodes + .values() + .filter(|node| { + if only_available && !node.available { + return false; + } + if with_services && node.services_len() <= 0 { + return false; + } + return true; + }) + .collect() + } + pub fn list_mut(&mut self, only_available: bool, with_services: bool) -> Vec<&mut Node> { + self.nodes + .values_mut() + .filter(|node| { + if only_available && !node.available { + return false; + } + if with_services && node.services_len() <= 0 { + return false; + } + return true; + }) + .collect() + } + pub fn nodes_vec(&self) -> Vec<&Node> { + self.nodes.values().collect() + } + pub fn nodes_vec_mut(&mut self) -> Vec<&mut Node> { + self.nodes.values_mut().collect() + } +} +fn get_ip_list() -> Vec { + todo!() +} +fn get_hostname() -> String { + todo!() +} + diff --git a/src/registry/registry.rs b/src/registry/registry.rs new file mode 100644 index 0000000..9c52a3d --- /dev/null +++ b/src/registry/registry.rs @@ -0,0 +1,219 @@ +use crate::{service::ServiceSpec, ServiceBroker, ServiceBrokerMessage}; +use derive_more::Display; + +use super::*; +use anyhow::{bail, Error}; +use serde_json::Value; +use tokio::sync::mpsc::Sender; + +pub struct Registry { + pub logger: Arc, + broker_sender: Sender, + broker: Arc, + nodes: NodeCatalog, + services: ServiceCatalog, + actions: ActionCatalog, + /* + metrics + strategy factor + discoverer + opts + events + */ +} +impl Registry { + pub fn new(broker: Arc, broker_sender: Sender) -> Self { + let logger = &broker.logger; + let logger = Arc::clone(&logger); + let nodes = NodeCatalog::new(); + let services = ServiceCatalog::new(); + let actions = ActionCatalog::new(); + Registry { + logger, + broker_sender, + broker, + nodes, + services, + actions, + } + } + + fn init() { + todo!("initialze discoverer") + } + fn stop() { + todo!("stop discoverre") + } + + fn register_moleculer_metrics(&self) { + todo!("register molecular metrics") + } + fn update_metrics(&self) { + todo!("update metrics") + } + pub fn register_local_service(&mut self, svc: ServiceSpec) { + if !self + .services + .has(&svc.full_name, Some(&self.broker.node_id)) + { + let service = self.services.add( + Arc::clone(&self.nodes.local_node.as_ref().unwrap()), + &svc, + true, + ); + if let Some(actions) = svc.actions { + let local_node = Arc::clone(&self.nodes.local_node.as_ref().unwrap()); + self.register_actions(local_node, Arc::clone(&service), actions); + } + if let Some(events) = svc.events { + self.register_events(); + } + //TODO:Add service to the local node. + //self.nodes.local_node.unwrap().services.push(Arc::clone(&service)); + } + } + pub fn register_services() { + todo!("add remote serice support") + } + fn check_action_visibility(action: &Action, node: &Arc) -> bool { + match action.visibility { + Visibility::Published => true, + Visibility::Public => true, + Visibility::Protected => node.local, + _ => false, + } + } + fn register_actions( + &mut self, + node: Arc, + service: Arc, + actions: Vec, + ) { + actions.iter().for_each(|action| { + if !Registry::check_action_visibility(action, &node) { + return; + } + if node.local { + //TODO:stuff with middleware and handlers. + } else if let Some(_) = self.broker.transit { + //TODO: for remote services + return; + } + let node = Arc::clone(&node); + let service = Arc::clone(&service); + self.actions.add(node, service, action.to_owned()); + //TODO: + //add the action to the service. + }); + } + fn create_private_action_endpoint(&self, action: Action) -> anyhow::Result { + let local_node = match &self.nodes.local_node { + Some(node) => node, + None => bail!("No local node available"), + }; + let node = Arc::clone(local_node); + todo!("add service to action") + // let action_ep = ActionEndpoint::new(node, service, action); + // Ok(action_ep) + } + pub fn has_services(&self, full_name: &str, node_id: Option<&str>) -> bool { + self.services.has(full_name, node_id) + } + pub fn get_action_endpoints(&self, action_name: &str) -> Option<&EndpointList> { + self.actions.get(action_name) + } + pub fn get_action_endpoint_by_node_id( + &self, + action_name: &str, + node_id: &str, + ) -> Option<&EndpointList> { + let list = self.actions.get(action_name); + if let Some(list) = list { + list.get_endpoint_by_node_id(node_id); + } + None + } + pub fn unregister_service(&mut self, full_name: &str, node_id: Option<&str>) { + let id = match node_id { + Some(node_id) => node_id.to_string(), + None => self.broker.node_id.clone(), + }; + self.services.remove(full_name, &id); + match node_id { + Some(id) => { + if id == self.broker.node_id { + self.regenerate_local_raw_info(Some(true)); + } + } + None => { + self.regenerate_local_raw_info(Some(true)); + } + } + } + fn unregister_service_by_node_id(&mut self, node_id: &str) { + self.services.remove_all_by_node_id(node_id); + } + fn unregiste_action(&mut self, node_id: &str, action_name: &str) { + self.actions.remove(action_name, node_id); + } + + fn register_events(&mut self) { + todo!() + } + fn unregister_event(&mut self, node_id: &str, event_name: &str) { + todo!() + } + + fn regenerate_local_raw_info(&self, incSeq: Option) -> Value { + todo!() + } + + fn get_local_node_info(&self, force: bool) -> Result { + if let None = self.nodes.local_node { + return Ok(self.regenerate_local_raw_info(None)); + } + if force { + return Ok(self.regenerate_local_raw_info(None)); + } + if let None = self.nodes.local_node { + return Err(RegistryError::NoLocalNodeFound); + } + let value = self.nodes.local_node.as_ref().unwrap().raw_info.to_owned(); + match value { + Some(value) => Ok(value), + None => Err(RegistryError::NoLocalNodeFound), + } + } + fn get_node_info(&self, node_id: &str) -> Option { + todo!() + } + fn process_node_info(&self) { + todo!() + } + pub fn get_node_list(&self, only_available: bool, with_services: bool) -> Vec<&Node> { + self.nodes.list(only_available, with_services) + } + pub fn get_services_list(&self, opts: ListOptions) -> Vec<&Arc> { + self.services.list(opts) + } + fn get_actions_list(&self, opts: ListOptions) -> Vec<&ActionEndpoint> { + //self.actions.list(opts) + todo!() + } + fn get_event_list(&self) -> Vec<&EventEndpoint> { + todo!() + } + fn get_node_raw_list(&self) { + todo!() + } +} +use thiserror::Error; +#[derive(Error, Debug)] +enum RegistryError { + #[error("No local node found")] + NoLocalNodeFound, +} + +#[cfg(test)] + +mod tests {} diff --git a/src/registry/service_catalog.rs b/src/registry/service_catalog.rs new file mode 100644 index 0000000..2a97001 --- /dev/null +++ b/src/registry/service_catalog.rs @@ -0,0 +1,100 @@ +use crate::service::ServiceSpec; + +use super::*; +use regex::Regex; +#[derive(PartialEq, Eq)] +pub struct ServiceCatalog { + services: Vec>, +} + +impl ServiceCatalog { + pub fn new() -> Self { + Self { + services: Vec::new(), + } + } + ///Add a new service + pub fn add(&mut self, node: Arc, service: &ServiceSpec, local: bool) -> Arc { + let service_item = ServiceItem::new(node, service, local); + let service_item = Arc::new(service_item); + + let item = Arc::clone(&service_item); + self.services.push(service_item); + item + } + ///Check the service exsists + pub fn has(&self, full_name: &str, node_id: Option<&str>) -> bool { + let svc = self + .services + .iter() + .find(|svc| svc.equals(full_name, node_id)); + match svc { + Some(_) => true, + None => false, + } + } + pub fn get(&self, full_name: &str, node_id: Option<&str>) -> Option<&Arc> { + self.services + .iter() + .find(|svc| svc.equals(full_name, node_id)) + } + pub fn get_mut( + &mut self, + full_name: &str, + node_id: Option<&str>, + ) -> Option<&mut Arc> { + self.services + .iter_mut() + .find(|svc| svc.equals(full_name, node_id)) + } + pub fn list( + &self, + opts : ListOptions + ) -> Vec<&Arc> { + + self.services.iter().filter(|svc| { + if opts.skip_internal && get_internal_service_regex_match(&svc.name) { + return false; + } + if opts.only_local && !svc.local { + return false; + } + if opts.only_available && !svc.node.available { + return false; + } + + return true; + }).collect() + // TODO:("implement grouping and all that stuff") + + } + pub fn get_local_node_service(&self) { + todo!() + } + //remove all endpoints by node_id. + pub fn remove_all_by_node_id(&mut self, node_id: &str) { + let services: Vec<&Arc> = self + .services + .iter() + .filter(|svc| { + if svc.node.id == node_id { + todo!("remove actions and events in registry"); + return false; + } + true + }) + .collect(); + todo!("updat the service") + } + + pub fn remove(&mut self, full_name: &str, node_id: &str) { + self.services.retain(|svc| { + if svc.equals(full_name, Some(node_id)) { + todo!("remove actions and events in registry"); + + return false; + } + return true; + }) + } +} diff --git a/src/registry/service_item.rs b/src/registry/service_item.rs new file mode 100644 index 0000000..9e2be8f --- /dev/null +++ b/src/registry/service_item.rs @@ -0,0 +1,56 @@ +use crate::service::ServiceSpec; + +use super::*; + +#[derive(PartialEq, Eq, Clone)] +pub struct ServiceItem { + pub name: String, + pub node: Arc, + pub local: bool, + pub full_name: String, + version: String, + actions: ActionsMap, + /* + eventsmap + metadata + settings + */ +} +impl ServiceItem { + pub fn new(node: Arc, service: &ServiceSpec, local: bool) -> Self { + Self { + node, + local, + actions: HashMap::new(), + full_name: service.full_name.to_string(), + version: service.version.to_string(), + name: service.name.to_string(), + } + } + pub fn equals(&self, full_name: &str, node_id: Option<&str>) -> bool { + match node_id { + Some(id) => self.node.id == id && self.full_name == full_name, + None => self.full_name == full_name, + } + } + + ///Update service properties + pub fn update(&mut self, service: &Service) { + self.full_name = service.full_name.to_string(); + self.version = service.version.to_string(); + /* + settings + metadata + */ + todo!() + } + ///Add action to service + pub fn add_action(&mut self, action: EndpointList) { + let name = action.name.clone(); + self.actions.insert(name, action); + todo!("Decide if we want an arc of action or make a copy of that actions") + } + pub fn add_event(&mut self, event: EndpointList) { + todo!("Implement the events map") + } +} diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..c7dfda6 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,474 @@ +use std::collections::HashMap; + +use crate::{ + registry::{Action, Event}, + ServiceBrokerMessage, +}; +use log::info; +use tokio::sync::mpsc::UnboundedSender; + +#[derive(Clone, Debug)] +pub struct Service { + pub name: String, + pub full_name: String, + pub version: String, + settings: HashMap, + schema: Schema, + original_schema: Option, + metadata: HashMap, + pub actions: Option>, + pub events: Option>, + broker_sender: UnboundedSender, +} + +impl PartialEq for Service { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.full_name == other.full_name + && self.version == other.version + && self.settings == other.settings + && self.schema == other.schema + && self.original_schema == other.original_schema + && self.metadata == other.metadata + && self.actions == other.actions + && self.events == other.events + } +} + +#[derive(PartialEq, Eq, Clone, Debug)] +struct Schema { + mixins: Option>, + actions: Option>, + events: Option>, + merged: SchemaMerged, + name: String, + version: Option, + settings: HashMap, + metadata: Option>, + created: Option, + started: Option, + stopped: Option, + dependencies: Option>, +} + +#[derive(PartialEq, Eq, Clone, Debug)] +struct SchemaMixins {} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct SchemaActions { + name: String, + handler: fn(), +} + +#[derive(PartialEq, Eq, Clone, Debug)] +struct SchemaEvents {} +#[derive(PartialEq, Eq, Clone, Debug)] +enum SchemaMerged { + MergedFn(fn()), + MergedFnVec(Vec), +} +#[derive(PartialEq, Debug)] +pub struct ServiceSpec { + pub(crate) name: String, + pub(crate) version: String, + pub(crate) full_name: String, + settings: HashMap, + /* + pub(crate)metadata + pub(crate)*/ + pub(crate) actions: Option>, + pub(crate) events: Option>, +} + +impl Service { + fn get_service_spec(&self) -> ServiceSpec { + //TODO: do something about actions and events + let service_spec = ServiceSpec { + name: self.name.clone(), + full_name: self.full_name.clone(), + settings: self.get_public_settings(), + version: self.version.clone(), + actions: None, + events: None, + }; + service_spec + } + + fn parse_service_schema(&mut self, schema: Schema) { + self.original_schema = Some(schema.clone()); + + match schema.merged { + SchemaMerged::MergedFn(merged) => merged(), + SchemaMerged::MergedFnVec(func_vec) => { + for func in func_vec { + func() + } + } + } + + self.name = schema.name; + self.version = match schema.version { + Some(v) => v, + None => "0.0.1".to_string(), + }; + self.settings = schema.settings; + self.metadata = match schema.metadata { + Some(metadata) => metadata, + None => HashMap::new(), + }; + //TODO: + //self.schema = schema; + let version = self.settings.get("$noVersionPrefix"); + + self.full_name = Service::get_versioned_full_name(&self.name, version); + //TODO: get the logger from the broker. + //self.logger = + + //TODO:register methods. + + todo!("add service specification") + } + + fn get_public_settings(&self) -> HashMap { + self.settings.clone() + } + + pub async fn init(&self) { + info!("Service {} is createing....", self.full_name); + if let Some(created) = self.schema.created { + created(); + } + let _result = self + .broker_sender + .send(ServiceBrokerMessage::AddLocalService(self.clone())); + info!("Service {} created.", self.full_name); + + // todo!("call broker middlware") + } + + pub async fn start(&self) { + info!("Service {} is starting...", self.full_name); + + if let Some(dependencies) = &self.schema.dependencies { + let timeout: i64 = match self.settings.get("$dependencyTimeout") { + //TODO:raise an error rateher than default. + Some(val) => val.parse().unwrap_or(0), + None => 0, + }; + //TODO: get interal from broker options + let interval: i64 = match self.settings.get("$dependencyInterval") { + Some(val) => val.parse().unwrap_or(0), + None => 0, + }; + + self.wait_for_services(dependencies, timeout, interval) + .await; + } + + if let Some(started) = &self.schema.started { + started(); + } + + let _result = self + .broker_sender + .send(ServiceBrokerMessage::RegisterLocalService( + Service::get_service_spec(&self), + )); + + info!("Service {} started.", self.full_name); + + // todo!("call service starting middleware"); + //todo!("call service started middleware") + } + + pub async fn stop(&self) { + info!("Service {} is stopping...", self.full_name); + + if let Some(stopped) = self.schema.stopped { + stopped(); + } + + info!("Service {} stopped.", self.full_name); + + todo!("call service stopping middlewares"); + todo!("call service stopped middleware"); + } + + fn create_action(&self, action_def: fn(), name: &str) -> Action { + let mut action = Action::new(name.to_string(), action_def); + let name_prefix = self.settings.get("$noServiceNamePrefix"); + if let Some(name_prefix) = name_prefix { + let name_prefix: bool = name_prefix.parse().unwrap(); + if !name_prefix { + action.name = format!("{}.{}", self.full_name.to_string(), action.name); + } + } + //TODO add caching settings from settins + //TODO see if it is even necessary to give action access to the service. + // action = action.set_service(self.clone()); + action + } + + /// create an interal service method. + fn create_method() { + todo!() + } + + ///create an event subscription for broker + fn create_event() { + todo!() + } + + async fn wait_for_services(&self, service_names: &Vec, timeout: i64, interval: i64) { + let _result = self + .broker_sender + .send(ServiceBrokerMessage::WaitForServices { + dependencies: service_names.clone(), + interval, + timeout, + }); + } + + fn get_versioned_full_name(name: &str, version: Option<&String>) -> String { + let mut name = name.to_string(); + if let Some(v) = version { + name = format!("{}.{}", v, name); + } + name + } +} +#[cfg(test)] +mod tests { + use tokio::sync::mpsc::{self, UnboundedSender}; + + use super::*; + fn test_merge_func() { + println!("test merge function"); + } + fn test_started_func() { + println!("test start func"); + } + fn test_created_func() { + println!("test created func"); + } + fn test_stop_func() { + println!("test stop func"); + } + fn action_func() { + println!("action_func"); + } + + fn get_test_schema(dependencies: Option>) -> Schema { + let merged = SchemaMerged::MergedFn(test_merge_func); + let schema = Schema { + mixins: None, + actions: None, + events: None, + merged, + name: "test_service".to_string(), + version: None, + settings: HashMap::new(), + metadata: None, + created: Some(test_created_func), + started: Some(test_started_func), + stopped: Some(test_stop_func), + dependencies: dependencies, + }; + + schema + } + + fn get_test_service( + schema: Option, + settings: Option>, + actions: Option>, + broker_sender: Option>, + ) -> Service { + let name = "test_service".to_string(); + let version = "1.0".to_string(); + let full_name = Service::get_versioned_full_name(&name, Some(&version)); + let settings = match settings { + Some(settings) => settings, + None => HashMap::new(), + }; + let schema = match schema { + Some(schema) => schema, + None => get_test_schema(None), + }; + + let original_schema = get_test_schema(None); + let broker_sender = match broker_sender { + Some(sender) => sender, + None => { + let (sender, recv) = mpsc::unbounded_channel::(); + sender + } + }; + let service = Service { + name, + full_name, + version, + settings, + schema, + original_schema: Some(original_schema), + metadata: HashMap::new(), + actions: actions, + events: None, + broker_sender, + }; + service + } + #[test] + fn service_get_service_spec() { + let service = get_test_service(None, None, None, None); + let service_spec = ServiceSpec { + name: service.name.clone(), + version: service.version.clone(), + full_name: service.full_name.clone(), + settings: service.settings.clone(), + actions: None, + events: None, + }; + + let service_spec_gen = service.get_service_spec(); + assert_eq!(service_spec_gen, service_spec) + } + #[test] + fn service_get_public_settings() { + let mut settings = HashMap::new(); + settings.insert("test".to_string(), "settings".to_string()); + let service = get_test_service(None, Some(settings.clone()), None, None); + assert_eq!(service.get_public_settings(), settings); + } + #[test] + fn service_init() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.init().await }); + rt.block_on(async { + let result = recv.recv().await; + + let expected_result = ServiceBrokerMessage::AddLocalService(service); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_start() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let service_spec = service.get_service_spec(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.start().await }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::RegisterLocalService(service_spec); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_start_with_dependencies() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service_names: Vec = + vec!["Service one".to_string(), " Service two".to_string()]; + + let schema = get_test_schema(Some(service_names.clone())); + let service = get_test_service(Some(schema), None, None, Some(sender)); + let service_spec = service.get_service_spec(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { service.start().await }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::WaitForServices { + dependencies: service_names, + timeout: 0, + interval: 0, + }; + assert_eq!(result, Some(expected_result)); + + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::RegisterLocalService(service_spec); + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_create_action_name() { + let no_service_name_prefix = "$noServiceNamePrefix".to_string(); + let name = "action_func"; + let mut settings = HashMap::new(); + settings.insert(no_service_name_prefix.clone(), "true".to_string()); + let service = get_test_service(None, Some(settings.clone()), None, None); + let action = service.create_action(action_func, name); + let expected_action = Action::new(name.to_string(), action_func); + assert_eq!(action, expected_action); + settings.insert(no_service_name_prefix.clone(), "false".to_string()); + let service = get_test_service(None, Some(settings), None, None); + let action_2 = service.create_action(action_func, name); + let name = format!("{}.{}", service.full_name, name); + let expected_action_2 = Action::new(name, action_func); + assert_eq!(action_2, expected_action_2); + let service = get_test_service(None, None, None, None); + let name = "action_func"; + let expected_action_3 = Action::new(name.to_string(), action_func); + let action_3 = service.create_action(action_func, name); + assert_eq!(action_3, expected_action_3); + } + #[test] + #[should_panic] + fn service_create_action_name_panic() { + let no_service_name_prefix = "$noServiceNamePrefix".to_string(); + let mut settings = HashMap::new(); + let name = "action_func"; + settings.insert(no_service_name_prefix, "non_bool_value".to_string()); + let service = get_test_service(None, Some(settings), None, None); + let action = service.create_action(action_func, name); + } + #[test] + fn service_wait_for_service() { + let (sender, mut recv) = mpsc::unbounded_channel::(); + let service = get_test_service(None, None, None, Some(sender)); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let service_names: Vec = + vec!["Service one".to_string(), " Service two".to_string()]; + let timeout = 10; + let interval = 20; + rt.block_on(async { + let _result = service + .wait_for_services(&service_names, timeout, interval) + .await; + }); + rt.block_on(async { + let result = recv.recv().await; + let expected_result = ServiceBrokerMessage::WaitForServices { + dependencies: service_names, + timeout, + interval, + }; + assert_eq!(result, Some(expected_result)); + }); + } + #[test] + fn service_get_versioned_full_name() { + let version = "1.0"; + let name = "test_service"; + let expected_full_name = format!("{}.{}", version, name); + let version = "1.0".to_string(); + let version = Some(&version); + let full_name = Service::get_versioned_full_name(name, version); + assert_eq!(full_name , expected_full_name); + let full_name_2 = Service::get_versioned_full_name(name, None); + assert_eq!(full_name_2 , name); + } +} diff --git a/src/strategies/mod.rs b/src/strategies/mod.rs new file mode 100644 index 0000000..deb40c0 --- /dev/null +++ b/src/strategies/mod.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use crate::registry::{ActionEndpoint, Registry , }; +use crate::ServiceBroker; +mod round_robin; + + +pub trait Strategy { + fn new(registry: Arc, broker: Arc, opts: StrategyOpts) -> Self; + fn select<'a>( + &mut self, + list: Vec<&'a ActionEndpoint>, + ctx: Option, + ) -> Option<&'a ActionEndpoint>; +} + +pub struct Context {} + +pub struct StrategyOpts{} \ No newline at end of file diff --git a/src/strategies/round_robin.rs b/src/strategies/round_robin.rs new file mode 100644 index 0000000..b61ae23 --- /dev/null +++ b/src/strategies/round_robin.rs @@ -0,0 +1,37 @@ +use super::*; +use std::sync::Arc; + +struct RoundRobinStrategy { + registry: Arc, + broker: Arc, + opts: StrategyOpts, + counter: usize, +} + +impl RoundRobinStrategy { + // fn new() -> Self {} +} +impl Strategy for RoundRobinStrategy { + fn new(registry: Arc, broker: Arc, opts: StrategyOpts) -> Self { + Self { + broker, + registry, + opts, + counter: 0, + } + } + fn select<'a>( + &mut self, + list: Vec<&'a ActionEndpoint>, + ctx: Option, + ) -> Option<&'a ActionEndpoint> { + if self.counter >= list.len() { + self.counter = 0; + } + self.counter = self.counter.saturating_add(1); + if let Some(ep) = list.get(self.counter) { + return Some(*ep); + } + None + } +}