From 9f3778a0068035f52244d6cd752f460f8bff49ab Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 2 Nov 2022 17:12:27 +0000 Subject: [PATCH 1/8] Initial tls support --- Cargo.toml | 2 ++ src/server/actix_server.rs | 17 ++++++++++++++++ src/server/config.rs | 25 +++++++++++++++++++++++- src/server/mod.rs | 3 ++- src/server/tls.rs | 40 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 src/server/tls.rs diff --git a/Cargo.toml b/Cargo.toml index e2ac7bc..b25bf8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ halfbrown = "0.1.15" tokio-postgres = { version = "0.7.7", features = ["with-serde_json-1" ] } num_cpus = "1.13.1" extreme = "666.666.666666" +rustls = "0.20.7" +rustls-pemfile = "1.0.1" [target.'cfg(not(target_os = "linux"))'.dependencies] mimalloc-rust = { version = "0.2" } diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index 5a22f41..fb35bb4 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -147,6 +147,23 @@ async fn create_sever(config: ServerConfig) -> std::io::Result<()> { srv.await } +async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { + let pool_size = config.pool_per_worker_size; + + let srv = Server::build() + .backlog(config.backlog as u32) + .bind("walker_server_h1", &config.url, move || { + HttpService::build().finish(AppFactory(pool_size)).tcp() + })? + .workers(config.worker_threads) + .run(); + + attach_server_handle(srv.handle()); + + srv.await +} + + fn run_server(config: ServerConfig) -> std::io::Result<()> { // Lets set net reciever priority here try_pin_priority(); diff --git a/src/server/config.rs b/src/server/config.rs index 031cb28..2f45a94 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -12,6 +12,9 @@ pub struct ServerConfig { pub pool_per_worker_size: usize, pub backlog: usize, pub debug: bool, + pub tls: bool, + pub key_location: Option, + pub cert_location: Option, } #[cold] @@ -30,6 +33,9 @@ impl ServerConfig { pool_per_worker_size: 10_000, backlog: 1024, debug: false, + tls: false, + key_location: None, + cert_location: None, } } @@ -59,17 +65,34 @@ impl ServerConfig { None => Ok(fallback), } }; + + let mut tls = false; + let mut key_location = None; + let mut cert_location = None; + if get_bool_with_default("tls", false)? { + tls = true; + key_location = config.get("key_location").cloned(); + cert_location = config.get("cert_location").cloned(); + + if key_location.is_none() || cert_location.is_none() { + return Err(make_js_error("We need both the key and cert location for TLS")); + } + } + Ok(Self { url, worker_threads: get_number_with_deault("worker_threads", guess_optimal_worker_count())?, pool_per_worker_size: get_number_with_deault("pool_per_worker_size", 10_000)?, backlog: get_number_with_deault("backlog", 1024)?, debug: get_bool_with_default("debug", false)?, + tls, + key_location, + cert_location, }) } pub fn get_pool_size(&self) -> usize { self.worker_threads * self.pool_per_worker_size } -} \ No newline at end of file +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 586d48f..a1ad7ba 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,4 +2,5 @@ pub mod node_functions; mod config; mod actix_server; mod helpers; -mod shutdown; \ No newline at end of file +mod shutdown; +mod tls; diff --git a/src/server/tls.rs b/src/server/tls.rs new file mode 100644 index 0000000..bbf74ee --- /dev/null +++ b/src/server/tls.rs @@ -0,0 +1,40 @@ +use std::{fs::File, io::BufReader}; + +use rustls::{Certificate, PrivateKey, ServerConfig}; +use rustls_pemfile::{certs, pkcs8_private_keys}; + +use napi::Result; + +use crate::request::helpers::make_js_error; + +pub fn load_tls_certs(config: &super::config::ServerConfig) -> Result { + // init server config builder with safe defaults + let config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + + // load TLS key/cert files + let cert_file = &mut BufReader::new(File::open(config).map_err(|_| make_js_error("Error loading cert file"))?); + let key_file = &mut BufReader::new(File::open("key.pem").map_err(|_| make_js_error("Error loading key file"))?); + + // convert files to key/cert objects + let cert_chain = certs(cert_file) + .map_err(|_| make_js_error("Error loading files"))? + .into_iter() + .map(Certificate) + .collect(); + let mut keys: Vec = pkcs8_private_keys(key_file) + .map_err(|_| make_js_error("Error loading files"))? + .into_iter() + .map(PrivateKey) + .collect(); + + // exit if no keys could be parsed + if keys.is_empty() { + return Err(make_js_error("No keys found")) + } + + let res = config.with_single_cert(cert_chain, keys.remove(0)).map_err(|_| make_js_error("Error loading files")); + + res +} From 8d38f8744e51f0570660c2576f116d2146133152 Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 2 Nov 2022 20:28:53 +0000 Subject: [PATCH 2/8] TLS created server --- Cargo.toml | 2 +- examples/certs/cert.pem | 25 +++++++++++++++++++++++++ examples/certs/key.pem | 28 ++++++++++++++++++++++++++++ examples/tls.js | 16 ++++++++++++++++ src/server/actix_server.rs | 9 +++++++-- src/server/tls.rs | 6 +++--- 6 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 examples/certs/cert.pem create mode 100644 examples/certs/key.pem create mode 100644 examples/tls.js diff --git a/Cargo.toml b/Cargo.toml index b25bf8c..06b4c5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ bytes = "1.2.1" serde_json = "1.0.85" lazy_static = "1.4.0" tokio = { version = "1", features = ["full"] } -actix-http = { version = "3.2", features = ["http2"]} +actix-http = { version = "3.2", features = ["http2", "rustls"]} actix-service = "2.0.2" futures = "0.3.24" http = "0.2.8" diff --git a/examples/certs/cert.pem b/examples/certs/cert.pem new file mode 100644 index 0000000..8a972c7 --- /dev/null +++ b/examples/certs/cert.pem @@ -0,0 +1,25 @@ +-----BEGIN CERTIFICATE----- +MIIEMzCCApugAwIBAgIQLDkJeN2TgRSdTqyim8Yq5zANBgkqhkiG9w0BAQsFADB1 +MR4wHAYDVQQKExVta2NlcnQgZGV2ZWxvcG1lbnQgQ0ExJTAjBgNVBAsMHGphY2tA +amFja3MtcGMgKEphY2sgVGhvbXNvbikxLDAqBgNVBAMMI21rY2VydCBqYWNrQGph +Y2tzLXBjIChKYWNrIFRob21zb24pMB4XDTIyMTEwMjE3NDMwM1oXDTI1MDIwMjE3 +NDMwM1owUDEnMCUGA1UEChMebWtjZXJ0IGRldmVsb3BtZW50IGNlcnRpZmljYXRl +MSUwIwYDVQQLDBxqYWNrQGphY2tzLXBjIChKYWNrIFRob21zb24pMIIBIjANBgkq +hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5JX+pB8B/lCT40augS4isxU+ZX4bFC+d +9S8u2vnMv6cF9NtsVzGp9ZruVd6mF5qkAH0EWz0xVW8paPrJKE+8cMhLIRbwL+pt +Teljs8VNV/qxbQQQaXWFJw/tUSdhErMCKgp0XSy6WfS39+5L+gsl+hpda65w5DQs +bAxyVZ4cAUugNI2Va9eVnNfqGo88UzaFunY4pi0uXkJX8E+QdcNiizvslsBifjsp +iwewPBfRb6Fz7bXefL8Fa/dC42riLEKTibGU1ymEsKYET3Pmprg0VmCwQX+qNLG4 +MsrxE4IYVlH+/BCVfakmkUi4MRZNBWXT0xZNmfORWSvDBgrt131gCQIDAQABo2Qw +YjAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwHwYDVR0jBBgw +FoAU2hmilf5DgQ1mWcDUpd7Nu5S6wxQwGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/ +AAABMA0GCSqGSIb3DQEBCwUAA4IBgQBtwVdwdq15K9IWOcoGNqFzy7S4iCBfOsEM +9/F7mPWD8t5QvvRFRFrCQkTBL74enfZ/BKEAXFxQCPT8dM6LgXQH435Pe/pHVExs +F3RzoPG/WGrPIxM7eW/BXT0z08wbeeMFbSQdy+dD2c5oMFkKGbYT9qkDv1Ic/p9Z +0QjqiytY3xLOdUh0JCvCw8QrS7+4M9/5U1Le0gxLmwvWgh8EE1B5fXfSgu3vyjfr +P4N6TMNiYdeNEQhMTc6rkL9vh1Il6Ue612Z/S44ttobKoanCnL79qjDYFMetSxlQ +YXp/qeHGGgG5c0kthlaPlnf4qKDs5hn3MblozO2zo7CqLu8YIcJ6zUi7GWdth2td +V3KjvUrDg6xkrAZY1HxebNDBN6lRYNefhvZojOUfZgJSX8Smxzwp3V3W4jZqBI1X +IMmsjSD7qQGMPkTjFqdZTsKtgQR3UA1etkuC5tiDoMRzdr+S/cyZRXhIlTAPxIot +5NB0MEezFhDFOVrzi2iuFgTBYAgou1E= +-----END CERTIFICATE----- diff --git a/examples/certs/key.pem b/examples/certs/key.pem new file mode 100644 index 0000000..7685541 --- /dev/null +++ b/examples/certs/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDklf6kHwH+UJPj +Rq6BLiKzFT5lfhsUL531Ly7a+cy/pwX022xXMan1mu5V3qYXmqQAfQRbPTFVbylo ++skoT7xwyEshFvAv6m1N6WOzxU1X+rFtBBBpdYUnD+1RJ2ESswIqCnRdLLpZ9Lf3 +7kv6CyX6Gl1rrnDkNCxsDHJVnhwBS6A0jZVr15Wc1+oajzxTNoW6djimLS5eQlfw +T5B1w2KLO+yWwGJ+OymLB7A8F9FvoXPttd58vwVr90LjauIsQpOJsZTXKYSwpgRP +c+amuDRWYLBBf6o0sbgyyvETghhWUf78EJV9qSaRSLgxFk0FZdPTFk2Z85FZK8MG +Cu3XfWAJAgMBAAECggEBAIaLnzGdKsA1T4b0QJy6uiPsuihlHK06BeCeYBb198VL +G19vlAqSqfZttiGHBv5XwHalH15Q22vtHVO0YZi/riw4SLh2VvPtKV81o8DZvlet +sKd5P+vDB5fhcQ9WfXXTNc/nDW0Wea2fNHXTppbL3xOiVyCdsccwyoDipp2sjdFY +RAgcumRMFmkNh1V9xHM3N/mT5KD8kkhNmClMGag9BH6RSy/ur+93o2kTXatapdSI +ufRKIsE6ti1wX1buudNBXXfB7qHOKZOFyqsmqaDtX3Jg31VfzCQuQu4g1fXlqgZl +RkJlz/QzFo7fMqpyk7Fqg+9AaHh+68/HcJodrNG4gy0CgYEA/1pn1mJqAMgKUlDu +G/1magImqjQY2yUzCANV8cRywMtM8kuv14Xk6N/4IvYI2dai/D+ptgyHsJreUa/K +PQ1VkiyGrQLKVZb/ZIJe6Lv+Dhsi5IKwuDo15FHaLJP4snYOu8g1bpbDFf62iD8p +EttkwhBT7X96GEHgPzY5fDu3Ay8CgYEA5So7EoXZ5XgxNa5mRK0mg2ONHwOl4vf4 +eSN6cJZQQxBjZTzX9irL5bRknoRW1O80Pu6FDaoiv8MBMSDYlBC4+/e73neixEuz +SQYL/ozSXqtQr199EyyKIN8yWR2yy+W49+hrW+1IW8ANVES9sIgHErEBK8u3rJXB +yWQYCSzU4kcCgYAUWbkizc8S5t5jtw1y75wE4M0CDYrZlDpf7hwgW36lvM4SFVQb +QhF/ObJF3wPPkJqGrfAxkQgTXBRRwEfG29QxBRlqmmlTrugFs5oqxCi4KRN70KqS +1BjNbHSNzvEoD96Wupr/iEZd72HOs/HrDc8W5EQ8DkfFvSJJB2JHc17fgwKBgGl/ +CXa7e9XxgOXvhVD0TRiLSboPWdykvxQ1SvZtnEFjV/6RBfJjPT5fzzpSPg8El46p +C0OBIUIaKYBOWKXBFQeZwjjl5l5rYvuo9viWJ3pBb4Fseg3LoUMEvQ7dZyQKex5V +Nqc5Euh5/yjeBF0yId9u8NUSkVm4D2AA3jfLFa2dAoGAcwNIN0CykHbC9dKhk5ml +iXT8Mwgyz5fHjsw6q6YlIUC0vR4FpuKEA89P7AlNm2C46QCdrDuskTRR2wiNr9jX +ebK9DqmWwLJseJBHG7r31bBwi01mpx6sQKNzt39NFJLnv4ijxwv/ckRqzjyoeFlT +5S7+RPsxPoJK8qN8yCMB0q8= +-----END PRIVATE KEY----- diff --git a/examples/tls.js b/examples/tls.js new file mode 100644 index 0000000..bc67c3f --- /dev/null +++ b/examples/tls.js @@ -0,0 +1,16 @@ +const Walker = require('..'); + +const config = { + url: "0.0.0.0:8081", + worker_threads: "1", + pool_per_worker_size: "100", + tls: "true", + cert_location: './certs/cert.pem', + key_location: './certs/key.pem' +} + +Walker.get("/", (res) => { + res.sendTextUnchecked("Hello world!"); +}); + +Walker.startWithConfig(config); diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index fb35bb4..b147ede 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -149,11 +149,12 @@ async fn create_sever(config: ServerConfig) -> std::io::Result<()> { async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { let pool_size = config.pool_per_worker_size; + let certs = super::tls::load_tls_certs(&config).unwrap(); let srv = Server::build() .backlog(config.backlog as u32) .bind("walker_server_h1", &config.url, move || { - HttpService::build().finish(AppFactory(pool_size)).tcp() + HttpService::build().finish(AppFactory(pool_size)).rustls(certs.clone()) })? .workers(config.worker_threads) .run(); @@ -168,7 +169,11 @@ fn run_server(config: ServerConfig) -> std::io::Result<()> { // Lets set net reciever priority here try_pin_priority(); - actix_rt::System::new().block_on(create_sever(config)) + if config.tls { + actix_rt::System::new().block_on(create_tls_server(config)) + } else { + actix_rt::System::new().block_on(create_sever(config)) + } } #[cold] diff --git a/src/server/tls.rs b/src/server/tls.rs index bbf74ee..0f51af1 100644 --- a/src/server/tls.rs +++ b/src/server/tls.rs @@ -7,15 +7,15 @@ use napi::Result; use crate::request::helpers::make_js_error; -pub fn load_tls_certs(config: &super::config::ServerConfig) -> Result { +pub fn load_tls_certs(user_config: &super::config::ServerConfig) -> Result { // init server config builder with safe defaults let config = ServerConfig::builder() .with_safe_defaults() .with_no_client_auth(); // load TLS key/cert files - let cert_file = &mut BufReader::new(File::open(config).map_err(|_| make_js_error("Error loading cert file"))?); - let key_file = &mut BufReader::new(File::open("key.pem").map_err(|_| make_js_error("Error loading key file"))?); + let cert_file = &mut BufReader::new(File::open(user_config.cert_location.as_ref().unwrap()).map_err(|_| make_js_error("Error loading cert file"))?); + let key_file = &mut BufReader::new(File::open(user_config.key_location.as_ref().unwrap()).map_err(|_| make_js_error("Error loading key file"))?); // convert files to key/cert objects let cert_chain = certs(cert_file) From aaab03ce91fab2dbf864b7204ed8d4c7a35fcf06 Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 9 Nov 2022 10:12:26 +0000 Subject: [PATCH 3/8] Tidy up --- src/request/helpers.rs | 9 --------- src/server/actix_server.rs | 3 --- src/templates/mod.rs | 12 ++++++------ 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/request/helpers.rs b/src/request/helpers.rs index ae364f9..ab5bcf7 100644 --- a/src/request/helpers.rs +++ b/src/request/helpers.rs @@ -6,15 +6,6 @@ use serde_json::Value; use crate::napi::halfbrown::HalfBrown; -#[cold] -#[inline(never)] -pub fn make_generic_error() -> Error { - Error::new( - Status::GenericFailure, - "Unable to send response".to_string(), - ) -} - #[cold] #[inline(never)] pub fn make_js_error(reason: &'static str) -> Error { diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index b147ede..da628d4 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -5,7 +5,6 @@ use actix_server::Server; use actix_service::{Service, ServiceFactory}; use bytes::Bytes; use futures::future::LocalBoxFuture; -use http::HeaderValue; use napi::sys; use tokio::sync::oneshot; @@ -21,7 +20,6 @@ use super::{ }; struct ActixHttpServer { - _hdr_srv: HeaderValue, object_pool: Rc>>, } @@ -124,7 +122,6 @@ impl ServiceFactory for AppFactory { Box::pin(async move { Ok(ActixHttpServer { - _hdr_srv: HeaderValue::from_static("Walker"), object_pool: Rc::new(UnsafeCell::new(get_stored_chunk(chunk_size))), }) }) diff --git a/src/templates/mod.rs b/src/templates/mod.rs index 8f84595..48f0f73 100644 --- a/src/templates/mod.rs +++ b/src/templates/mod.rs @@ -7,7 +7,7 @@ use parking_lot::RwLock; use serde_json::Value; use tera::{Context, Tera}; -use crate::request::{helpers::{make_generic_error, make_js_error}}; +use crate::request::helpers::make_js_error; lazy_static! { pub static ref TEMPLATES: RwLock> = { @@ -20,7 +20,7 @@ lazy_static! { #[inline(never)] #[napi] pub fn load_new_template(group_name: String, directory: String) -> Result<()> { - let tera = Tera::new(&format!("{}/**/*", directory)).map_err(|_| make_generic_error())?; + let tera = Tera::new(&format!("{}/**/*", directory)).map_err(|_| make_js_error("Error loading directory."))?; let mut templates = TEMPLATES.write(); templates.insert(group_name, tera); @@ -46,8 +46,8 @@ pub(crate) fn render_value_to_writer( writer: &mut Writer, ) -> Result<()> { let reader = TEMPLATES.read(); - let found_template = reader.get(group_name).ok_or_else(make_generic_error)?; - let context = &Context::from_value(data).map_err(|_| make_generic_error())?; + let found_template = reader.get(group_name).ok_or_else(|| make_js_error("Error finding the template file."))?; + let context = &Context::from_value(data).map_err(|_| make_js_error("Error reading data value."))?; found_template .render_to(file_name, context, writer) @@ -61,7 +61,7 @@ pub(crate) fn render_string_to_writer( data: &str, writer: &mut Writer, ) -> Result<()> { - let parsed: Value = serde_json::from_str(data).map_err(|_| make_generic_error())?; + let parsed: Value = serde_json::from_str(data).map_err(|_| make_js_error("Error parsing json data."))?; render_value_to_writer(group_name, file_name, parsed, writer) } @@ -75,4 +75,4 @@ pub(crate) fn store_in_bytes_buffer( let mut writer = buffer.writer(); render_string_to_writer(group_name, file_name, data, &mut writer)?; Ok(writer.into_inner()) -} \ No newline at end of file +} From cc7b264c7ab7cb34f2ff2073c97ada7986cfef7e Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 9 Nov 2022 13:53:25 +0000 Subject: [PATCH 4/8] Add ability to work on worker threads --- examples/testClose.js | 6 +-- examples/worker_threads.js | 62 ++++++++++++++++++++++++++++ index.d.ts | 18 ++++++++- index.js | 4 +- src/lib.rs | 5 ++- src/object_pool/mod.rs | 52 +++++++++++++++++++++--- src/request/request_blob.rs | 5 +-- src/router/mod.rs | 5 ++- src/router/node_functions.rs | 4 +- src/router/route_node.rs | 19 +++++++++ src/server/actix_server.rs | 30 +++++++------- src/server/config.rs | 71 ++++----------------------------- src/server/node_functions.rs | 9 ++--- src/thread/mod.rs | 4 ++ src/thread/node_functions.rs | 14 +++++++ src/thread/thread_identifier.rs | 13 ++++++ src/types.rs | 4 +- 17 files changed, 221 insertions(+), 104 deletions(-) create mode 100644 examples/worker_threads.js create mode 100644 src/router/route_node.rs create mode 100644 src/thread/mod.rs create mode 100644 src/thread/node_functions.rs create mode 100644 src/thread/thread_identifier.rs diff --git a/examples/testClose.js b/examples/testClose.js index 98cc6f6..37a98f0 100644 --- a/examples/testClose.js +++ b/examples/testClose.js @@ -2,12 +2,12 @@ const Walker = require('..'); const config = { url: "0.0.0.0:8081", - worker_threads: "1", - pool_per_worker_size: "100" + worker_threads: 1, + pool_per_worker_size: 100 } Walker.get("/", (res) => { - res.sendText("Hello world"); + res.sendText('Hello world'); }); Walker.startWithConfig(config); diff --git a/examples/worker_threads.js b/examples/worker_threads.js new file mode 100644 index 0000000..ed9db51 --- /dev/null +++ b/examples/worker_threads.js @@ -0,0 +1,62 @@ +const { + Worker, + isMainThread, + setEnvironmentData, + getEnvironmentData, +} = require('node:worker_threads'); + +const Walker = require('..'); + +if (isMainThread) { + setEnvironmentData('Hello', 'World!'); + const result = Walker.getWorkerId(); + + Walker.initialisePoolForWorker(10000); + console.log(`Result is ${result}`); + const worker = new Worker(__filename); + + Walker.get(`/${result}`, (res) => { + res.sendText(`Hello from main thread our id is ${result}`); + }) + +} else { + let result = Walker.getWorkerId(); + console.log(`Result is ${result}`); + + result = Walker.getWorkerId(); + console.log(`Result is ${result}`); + + result = Walker.getWorkerId(); + console.log(`Result is ${result}`); + + result = Walker.getWorkerId(); + console.log(`Result is ${result}`); + + + Walker.get(`/${result}`, (res) => { + res.sendText(`Hello from worker thread our id is ${result}`); + }) + + Walker.get(`/b`, (res) => { + res.sendText(`Hello from worker thread our id is ${result}`); + }) + + Walker.get(`/c`, (res) => { + res.sendText(`Hello from worker thread our id is ${result}`); + }) + + Walker.get(`/d`, (res) => { + res.sendText(`Hello from worker thread our id is ${result}`); + }) + + const config = { + url: "0.0.0.0:8081", + workerThreads: 1, + poolPerWorkerSize: 100, + backlog: 1000, + debug: true, + tls: false, + } + Walker.initialisePoolForWorker(10000); + Walker.startWithConfig(config); +} diff --git a/index.d.ts b/index.d.ts index 6098705..517e707 100644 --- a/index.d.ts +++ b/index.d.ts @@ -66,14 +66,30 @@ export function startWithWorkerCount(address: string, workers: number): void * * debug: Whether to enable debug mode */ -export function startWithConfig(config: HalfBrown): void +export function startWithConfig(config: ServerConfig): void /** * Attempts to stop the server, returns if it woreked * Experimental at the moment */ export function stop(): boolean +/** + * This allows you to configure the server with more + * granular control, some options are required. + */ +export interface ServerConfig { + url: string + workerThreads: number + poolPerWorkerSize: number + backlog: number + debug: boolean + tls: boolean + keyLocation?: string + certLocation?: string +} export function loadNewTemplate(groupName: string, directory: string): void export function reloadGroup(groupName: string): void +export function getWorkerId(): number +export function initialisePoolForWorker(poolSize: number): void export function getThreadAffinity(): Array export class DbConnection { query(query: FastStr): object diff --git a/index.js b/index.js index faffe52..364247f 100644 --- a/index.js +++ b/index.js @@ -236,7 +236,7 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { DbConnection, connectDb, PreparedStatement, Methods, newRoute, get, post, put, patch, RequestBlob, start, startWithWorkerCount, startWithConfig, stop, loadNewTemplate, reloadGroup, getThreadAffinity } = nativeBinding +const { DbConnection, connectDb, PreparedStatement, Methods, newRoute, get, post, put, patch, RequestBlob, start, startWithWorkerCount, startWithConfig, stop, loadNewTemplate, reloadGroup, getWorkerId, initialisePoolForWorker, getThreadAffinity } = nativeBinding module.exports.DbConnection = DbConnection module.exports.connectDb = connectDb @@ -254,4 +254,6 @@ module.exports.startWithConfig = startWithConfig module.exports.stop = stop module.exports.loadNewTemplate = loadNewTemplate module.exports.reloadGroup = reloadGroup +module.exports.getWorkerId = getWorkerId +module.exports.initialisePoolForWorker = initialisePoolForWorker module.exports.getThreadAffinity = getThreadAffinity diff --git a/src/lib.rs b/src/lib.rs index 5b905ac..10b3434 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,13 +15,16 @@ mod types; mod response; mod server; mod templates; +mod thread; mod object_pool; mod tokio_workers; mod extras; + pub use db::node_functions::*; pub use request::node_functions::*; pub use router::node_functions::*; pub use server::node_functions::*; pub use templates::{load_new_template, reload_group}; -pub use extras::node_functions::*; \ No newline at end of file +pub use extras::node_functions::*; +pub use thread::node_functions::*; diff --git a/src/object_pool/mod.rs b/src/object_pool/mod.rs index f1f051a..cd634c2 100644 --- a/src/object_pool/mod.rs +++ b/src/object_pool/mod.rs @@ -1,7 +1,7 @@ use std::{ffi::c_void, sync::atomic::Ordering}; use napi::{sys, Result}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use crate::request::{helpers::make_js_error, RequestBlob}; @@ -12,6 +12,8 @@ unsafe impl Sync for StoredPair {} static POOL: Mutex> = Mutex::new(vec![]); +static WORKER_POOL: RwLock>>> = RwLock::new(vec![]); + pub fn get_stored_chunk(count: usize) -> Vec { let mut locked = POOL.lock(); let split_point = locked.len() - count; @@ -19,6 +21,38 @@ pub fn get_stored_chunk(count: usize) -> Vec { locked.split_off(split_point) } +pub unsafe fn build_pool_for_id(env: sys::napi_env, pool_size: usize, thread_id: usize) -> Result<()> { + let mut pool_list = WORKER_POOL.write(); + + if pool_list.len() >= thread_id { + pool_list.resize_with(thread_id + 1, Default::default); + } + + let found = pool_list.get_mut(thread_id).ok_or_else(|| make_js_error("Error building pool"))?; + let mut pool = found.lock(); + build_pool_into_vec(env, pool_size, &mut pool) +} + +#[inline(always)] +pub fn get_pair_for_thread(thread_id: usize) -> Option { + let reader = WORKER_POOL.read(); + let threads_pool = reader.get(thread_id)?; + let mut locked = threads_pool.lock(); + + locked.pop() +} + +#[inline(always)] +pub fn replace_for_thread(thread_id: usize, pair: StoredPair) -> Option<()> { + let reader = WORKER_POOL.read(); + let threads_pool = reader.get(thread_id)?; + let mut locked = threads_pool.lock(); + + locked.push(pair); + + Some(()) +} + unsafe fn get_obj_constructor() -> Result { let ctor_ref = napi::__private::get_class_constructor("RequestBlob\0") .ok_or_else(|| make_js_error("Error caching contructor."))?; @@ -29,16 +63,15 @@ unsafe fn get_obj_constructor() -> Result { Ok(ctor_ref) } -pub unsafe fn build_up_pool(env: sys::napi_env, pool_size: usize) -> Result<()> { +unsafe fn build_pool_into_vec(env: sys::napi_env, pool_size: usize, pool: &mut Vec) -> Result<()> { + pool.reserve(pool_size); + let ctor_ref = get_obj_constructor()?; let mut ctor = std::ptr::null_mut(); if sys::napi_get_reference_value(env, ctor_ref, &mut ctor) != napi::sys::Status::napi_ok { return Err(make_js_error("Error getting constructor.")); } - let mut locked_pool = POOL.lock(); - locked_pool.reserve(pool_size); - for _ in 0..pool_size { let mut result = std::ptr::null_mut(); if sys::napi_new_instance(env, ctor, 0, std::ptr::null_mut(), &mut result) @@ -65,8 +98,15 @@ pub unsafe fn build_up_pool(env: sys::napi_env, pool_size: usize) -> Result<()> ); let recovered = Box::from_raw(raw_obj); - locked_pool.push(StoredPair((recovered, reffering))); + pool.push(StoredPair((recovered, reffering))); } Ok(()) } + +pub unsafe fn build_up_pool(env: sys::napi_env, pool_size: usize) -> Result<()> { + let mut locked_pool = POOL.lock(); + build_pool_into_vec(env, pool_size, &mut locked_pool) +} + + diff --git a/src/request/request_blob.rs b/src/request/request_blob.rs index d8b2fe1..56e2415 100644 --- a/src/request/request_blob.rs +++ b/src/request/request_blob.rs @@ -7,7 +7,6 @@ use napi::Result; use super::helpers::make_js_error; use crate::response::{JsResponse, InnerResp}; - #[napi] pub struct RequestBlob { pub(crate) data: MaybeUninit, @@ -77,7 +76,7 @@ impl RequestBlob { let res = oneshot.send(js_resp); if checked && res.is_err() { - eprintln!("Error sending response, the reciever may have dropped."); + return Err(make_js_error("Error with sending the response.")) } Ok(()) @@ -87,4 +86,4 @@ impl RequestBlob { pub fn send_result(&mut self, response: InnerResp) -> Result<()> { self.send_result_checked(response, true) } -} \ No newline at end of file +} diff --git a/src/router/mod.rs b/src/router/mod.rs index 65266f2..2dd7bc9 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,3 +1,6 @@ pub mod node_functions; pub mod read_only; -pub mod store; \ No newline at end of file +pub mod store; +pub mod route_node; + +pub use route_node::RouteNode; diff --git a/src/router/node_functions.rs b/src/router/node_functions.rs index 478c2d5..58ecea8 100644 --- a/src/router/node_functions.rs +++ b/src/router/node_functions.rs @@ -47,7 +47,9 @@ impl Methods { pub fn new_route(route: String, method: Methods, callback: JsFunction) -> Result<()> { let tsfn = ThreadsafeFunction::create(callback.0.env, callback.0.value, 1024)?; - add_new_route(&route, method, tsfn) + println!("Thread {} is adding a route...", crate::thread::get_id()); + + add_new_route(&route, method, super::RouteNode::new_with_fn(tsfn)) } #[cold] diff --git a/src/router/route_node.rs b/src/router/route_node.rs new file mode 100644 index 0000000..020b94d --- /dev/null +++ b/src/router/route_node.rs @@ -0,0 +1,19 @@ +use crate::napi::tsfn::ThreadsafeFunction; + +#[derive(Clone)] +pub struct RouteNode { + pub threads_id: usize, + pub function: ThreadsafeFunction +} + +impl RouteNode { + + pub fn new_with_fn(function: ThreadsafeFunction) -> Self { + let thread_id = crate::thread::get_id(); + + Self { + threads_id: thread_id as usize, + function + } + } +} diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index da628d4..69f8f9a 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use crate::{ extras::scheduler::{pin_js_thread, try_pin_priority, reset_thread_affinity}, - object_pool::{build_up_pool, get_stored_chunk, StoredPair}, + object_pool::{build_up_pool, get_stored_chunk, StoredPair, get_pair_for_thread, replace_for_thread}, router::{read_only::get_route, store::initialise_reader}, request::helpers::make_js_error, }; @@ -55,7 +55,7 @@ impl Service for ActixHttpServer { let vec_ref = self.object_pool.clone(); Box::pin(async move { - let result = match get_route(req.path(), req.method().clone()) { + let router = match get_route(req.path(), req.method().clone()) { Some(res) => res, None => { return get_failed_message(); @@ -73,16 +73,19 @@ impl Service for ActixHttpServer { }; } - let to_add_back = Self::get_mut_from_unsafe(&vec_ref); - let mut js_obj = match to_add_back.pop() { + let mut js_obj = match get_pair_for_thread(router.threads_id) { Some(res) => res, - None => Self::backoff_get_object(to_add_back).await, + None => { + println!("Thread not been initialized!!"); + return get_failed_message(); + } }; let (send, rec) = oneshot::channel(); + js_obj.0 .0.store_self_data(req, send, body); - result.call( + router.function.call( js_obj.0 .1, crate::napi::tsfn::ThreadsafeFunctionCallMode::NonBlocking, ); @@ -92,12 +95,7 @@ impl Service for ActixHttpServer { Err(_) => get_failed_message(), }; - // Saves a check check for length we can be sure that the vec is not full - if to_add_back.len() == to_add_back.capacity() { - unsafe { std::hint::unreachable_unchecked() } - } - - to_add_back.push(js_obj); + replace_for_thread(router.threads_id, js_obj); result }) @@ -134,9 +132,9 @@ async fn create_sever(config: ServerConfig) -> std::io::Result<()> { let srv = Server::build() .backlog(config.backlog as u32) .bind("walker_server_h1", &config.url, move || { - HttpService::build().finish(AppFactory(pool_size)).tcp() + HttpService::build().finish(AppFactory(pool_size as usize)).tcp() })? - .workers(config.worker_threads) + .workers(config.worker_threads as usize) .run(); attach_server_handle(srv.handle()); @@ -151,9 +149,9 @@ async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { let srv = Server::build() .backlog(config.backlog as u32) .bind("walker_server_h1", &config.url, move || { - HttpService::build().finish(AppFactory(pool_size)).rustls(certs.clone()) + HttpService::build().finish(AppFactory(pool_size as usize)).rustls(certs.clone()) })? - .workers(config.worker_threads) + .workers(config.worker_threads as usize) .run(); attach_server_handle(srv.handle()); diff --git a/src/server/config.rs b/src/server/config.rs index 2f45a94..f4e05b7 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -1,16 +1,14 @@ use std::cmp; -use napi::Result; -use halfbrown::HashMap; - -use crate::request::helpers::{make_js_error, make_js_error_string}; - +#[napi(object)] #[derive(Debug)] +/// This allows you to configure the server with more +/// granular control, some options are required. pub struct ServerConfig { pub url: String, - pub worker_threads: usize, - pub pool_per_worker_size: usize, - pub backlog: usize, + pub worker_threads: u32, + pub pool_per_worker_size: u32, + pub backlog: u32, pub debug: bool, pub tls: bool, pub key_location: Option, @@ -29,7 +27,7 @@ impl ServerConfig { pub fn default_with_url(url: String) -> Self { Self { url, - worker_threads: guess_optimal_worker_count(), + worker_threads: guess_optimal_worker_count() as u32, pool_per_worker_size: 10_000, backlog: 1024, debug: false, @@ -39,60 +37,7 @@ impl ServerConfig { } } - #[cold] - pub fn from_config_blob(config: HashMap) -> Result { - let url = match config.get("url") { - Some(res) => res.clone(), - None => return Err(make_js_error("No URL provided")), - }; - - let get_number_with_deault = |key: &'static str, fallback: usize| { - match config.get(key) { - Some(res) => match res.parse::() { - Ok(res) => Ok(res), - Err(_) => Err(make_js_error_string(format!("Invalid number provided for {}", key))), - }, - None => Ok(fallback), - } - }; - - let get_bool_with_default = |key: &'static str, fallback: bool| { - match config.get(key) { - Some(res) => match res.parse::() { - Ok(res) => Ok(res), - Err(_) => Err(make_js_error_string(format!("Invalid number provided for {}", key))), - }, - None => Ok(fallback), - } - }; - - let mut tls = false; - let mut key_location = None; - let mut cert_location = None; - - if get_bool_with_default("tls", false)? { - tls = true; - key_location = config.get("key_location").cloned(); - cert_location = config.get("cert_location").cloned(); - - if key_location.is_none() || cert_location.is_none() { - return Err(make_js_error("We need both the key and cert location for TLS")); - } - } - - Ok(Self { - url, - worker_threads: get_number_with_deault("worker_threads", guess_optimal_worker_count())?, - pool_per_worker_size: get_number_with_deault("pool_per_worker_size", 10_000)?, - backlog: get_number_with_deault("backlog", 1024)?, - debug: get_bool_with_default("debug", false)?, - tls, - key_location, - cert_location, - }) - } - pub fn get_pool_size(&self) -> usize { - self.worker_threads * self.pool_per_worker_size + (self.worker_threads * self.pool_per_worker_size) as usize } } diff --git a/src/server/node_functions.rs b/src/server/node_functions.rs index 1f922ab..c32b823 100644 --- a/src/server/node_functions.rs +++ b/src/server/node_functions.rs @@ -1,6 +1,5 @@ use napi::bindgen_prelude::*; -use crate::napi::halfbrown::HalfBrown; use super::{actix_server::start_server, config::ServerConfig, shutdown::stop_server}; #[cold] @@ -18,7 +17,7 @@ pub fn start(env: Env, address: String) -> Result<()> { /// This allows you to configure the number of workers pub fn start_with_worker_count(env: Env, address: String, workers: u32) -> Result<()> { let mut config = ServerConfig::default_with_url(address); - config.worker_threads = workers as usize; + config.worker_threads = workers; start_server(config, env.raw()) } @@ -38,9 +37,7 @@ pub fn start_with_worker_count(env: Env, address: String, workers: u32) -> Resul /// pool_per_worker_size: The size of the pool per worker /// /// debug: Whether to enable debug mode -pub fn start_with_config(env: Env, config: HalfBrown) -> Result<()> { - let config = ServerConfig::from_config_blob(config.0)?; - +pub fn start_with_config(env: Env, config: ServerConfig) -> Result<()> { start_server(config, env.raw()) } @@ -50,4 +47,4 @@ pub fn start_with_config(env: Env, config: HalfBrown) -> Result< /// Experimental at the moment pub fn stop() -> bool { stop_server(true) -} \ No newline at end of file +} diff --git a/src/thread/mod.rs b/src/thread/mod.rs new file mode 100644 index 0000000..2d701de --- /dev/null +++ b/src/thread/mod.rs @@ -0,0 +1,4 @@ +pub mod node_functions; +pub mod thread_identifier; + +pub use thread_identifier::get_id; diff --git a/src/thread/node_functions.rs b/src/thread/node_functions.rs new file mode 100644 index 0000000..0093de3 --- /dev/null +++ b/src/thread/node_functions.rs @@ -0,0 +1,14 @@ +use napi::{sys, Result, Env}; + +use crate::object_pool::build_pool_for_id; +use super::thread_identifier::get_id; + +#[napi] +pub fn get_worker_id() -> u32 { + get_id() +} + +#[napi] +pub fn initialise_pool_for_worker(env: Env, pool_size: u32) -> Result<()> { + unsafe { build_pool_for_id(env.raw(), pool_size as usize, get_worker_id() as usize) } +} diff --git a/src/thread/thread_identifier.rs b/src/thread/thread_identifier.rs new file mode 100644 index 0000000..953d35d --- /dev/null +++ b/src/thread/thread_identifier.rs @@ -0,0 +1,13 @@ +use std::sync::atomic::AtomicU32; + +static COUNTER: AtomicU32 = AtomicU32::new(0); + +pub fn get_id() -> u32 { + THREAD_ID.with(|a| *a) +} + +thread_local! { + static THREAD_ID: u32 = COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst) +} + + diff --git a/src/types.rs b/src/types.rs index 58a6674..ca35b70 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,3 +1,3 @@ -use crate::napi::tsfn::ThreadsafeFunction; +use crate::router::route_node::RouteNode; -pub type CallBackFunction = ThreadsafeFunction; +pub type CallBackFunction = RouteNode; From 6548125f0be5e09bf519893e98bd8bb18b8e735f Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 9 Nov 2022 17:14:59 +0000 Subject: [PATCH 5/8] Local pools of objects --- examples/worker_threads.js | 59 +++++++++++------------ src/object_pool/mod.rs | 32 +++++++++++-- src/router/route_node.rs | 1 - src/router/store.rs | 95 ++++++++++++++++++++------------------ src/server/actix_server.rs | 47 ++++++++++++++----- src/types.rs | 3 +- 6 files changed, 141 insertions(+), 96 deletions(-) diff --git a/examples/worker_threads.js b/examples/worker_threads.js index ed9db51..6b94077 100644 --- a/examples/worker_threads.js +++ b/examples/worker_threads.js @@ -6,57 +6,50 @@ const { } = require('node:worker_threads'); const Walker = require('..'); +const response = "Hello World!" if (isMainThread) { setEnvironmentData('Hello', 'World!'); const result = Walker.getWorkerId(); - Walker.initialisePoolForWorker(10000); + Walker.initialisePoolForWorker(200_000); console.log(`Result is ${result}`); - const worker = new Worker(__filename); + + for (let i = 0; i < 10; i++) { + const worker = new Worker(__filename); + } Walker.get(`/${result}`, (res) => { res.sendText(`Hello from main thread our id is ${result}`); }) + Walker.get(`/`, (res) => { + res.sendTextUnchecked(response); + }) + + setTimeout(() => { + console.log('Starting server...') + const config = { + url: "0.0.0.0:8081", + workerThreads: 12, + poolPerWorkerSize: 10000, + backlog: 10000, + debug: false, + tls: false, + } + Walker.startWithConfig(config); + }, 5000); } else { let result = Walker.getWorkerId(); console.log(`Result is ${result}`); - result = Walker.getWorkerId(); - console.log(`Result is ${result}`); - - result = Walker.getWorkerId(); - console.log(`Result is ${result}`); - - result = Walker.getWorkerId(); - console.log(`Result is ${result}`); - - - Walker.get(`/${result}`, (res) => { - res.sendText(`Hello from worker thread our id is ${result}`); - }) - - Walker.get(`/b`, (res) => { + Walker.get(`/key`, (res) => { res.sendText(`Hello from worker thread our id is ${result}`); }) - Walker.get(`/c`, (res) => { - res.sendText(`Hello from worker thread our id is ${result}`); + Walker.get(`/`, (res) => { + res.sendTextUnchecked(response); }) - Walker.get(`/d`, (res) => { - res.sendText(`Hello from worker thread our id is ${result}`); - }) - - const config = { - url: "0.0.0.0:8081", - workerThreads: 1, - poolPerWorkerSize: 100, - backlog: 1000, - debug: true, - tls: false, - } - Walker.initialisePoolForWorker(10000); - Walker.startWithConfig(config); + Walker.initialisePoolForWorker(200_000); } diff --git a/src/object_pool/mod.rs b/src/object_pool/mod.rs index cd634c2..8894f72 100644 --- a/src/object_pool/mod.rs +++ b/src/object_pool/mod.rs @@ -25,14 +25,36 @@ pub unsafe fn build_pool_for_id(env: sys::napi_env, pool_size: usize, thread_id: let mut pool_list = WORKER_POOL.write(); if pool_list.len() >= thread_id { + println!("Resizing the pool"); pool_list.resize_with(thread_id + 1, Default::default); } + println!("Getting id {} from length {}", thread_id, pool_list.len()); let found = pool_list.get_mut(thread_id).ok_or_else(|| make_js_error("Error building pool"))?; let mut pool = found.lock(); build_pool_into_vec(env, pool_size, &mut pool) } +pub fn get_pool_for_threads(count: usize) -> Result>> { + let locked = WORKER_POOL.read(); + let mut result = Vec::with_capacity(locked.len()); + + for thread in locked.iter() { + let mut obj_list = thread.lock(); + if obj_list.len() < count { + return Err(make_js_error("We don't have enough objects provisioned.")) + } + + let split_point = obj_list.len() - count; + let split = obj_list.split_off(split_point); + + println!("Build lookup of length {}", split.len()); + result.push(split); + } + + Ok(result) +} + #[inline(always)] pub fn get_pair_for_thread(thread_id: usize) -> Option { let reader = WORKER_POOL.read(); @@ -65,7 +87,7 @@ unsafe fn get_obj_constructor() -> Result { unsafe fn build_pool_into_vec(env: sys::napi_env, pool_size: usize, pool: &mut Vec) -> Result<()> { pool.reserve(pool_size); - + let ctor_ref = get_obj_constructor()?; let mut ctor = std::ptr::null_mut(); if sys::napi_get_reference_value(env, ctor_ref, &mut ctor) != napi::sys::Status::napi_ok { @@ -76,9 +98,9 @@ unsafe fn build_pool_into_vec(env: sys::napi_env, pool_size: usize, pool: &mut V let mut result = std::ptr::null_mut(); if sys::napi_new_instance(env, ctor, 0, std::ptr::null_mut(), &mut result) != sys::Status::napi_ok - { - return Err(make_js_error("Error creating a new instance.")); - } + { + return Err(make_js_error("Error creating a new instance.")); + } let mut reffering = std::ptr::null_mut(); if sys::napi_create_reference(env, result, 1, &mut reffering) != sys::Status::napi_ok { @@ -95,7 +117,7 @@ unsafe fn build_pool_into_vec(env: sys::napi_env, pool_size: usize, pool: &mut V None, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ); let recovered = Box::from_raw(raw_obj); pool.push(StoredPair((recovered, reffering))); diff --git a/src/router/route_node.rs b/src/router/route_node.rs index 020b94d..67f2240 100644 --- a/src/router/route_node.rs +++ b/src/router/route_node.rs @@ -7,7 +7,6 @@ pub struct RouteNode { } impl RouteNode { - pub fn new_with_fn(function: ThreadsafeFunction) -> Self { let thread_id = crate::thread::get_id(); diff --git a/src/router/store.rs b/src/router/store.rs index bb79573..a28d63c 100644 --- a/src/router/store.rs +++ b/src/router/store.rs @@ -4,7 +4,7 @@ use napi::bindgen_prelude::*; use lazy_static::lazy_static; use parking_lot::RwLock; -use crate::{types::CallBackFunction, Methods}; +use crate::{types::{CallBackFunction, CallbackInner}, Methods}; use super::read_only::{write_reader, ReadRoutes}; @@ -12,71 +12,76 @@ type ReaderLookup = Router; type ThreadSafeLookup = RwLock>; lazy_static! { - static ref GLOBAL_DATA: InternalRoutes = InternalRoutes::new_manager(); + static ref GLOBAL_DATA: InternalRoutes = InternalRoutes::new_manager(); } pub fn thread_to_reader(input: &ThreadSafeLookup) -> ReaderLookup { - let reader = input.read(); - reader.clone() + let reader = input.read(); + reader.clone() } struct InternalRoutes { - get: ThreadSafeLookup, - post: ThreadSafeLookup, - put: ThreadSafeLookup, - patch: ThreadSafeLookup, - delete: ThreadSafeLookup, + get: ThreadSafeLookup, + post: ThreadSafeLookup, + put: ThreadSafeLookup, + patch: ThreadSafeLookup, + delete: ThreadSafeLookup, } impl InternalRoutes { - #[cold] - fn new_manager() -> Self { - Self { - get: RwLock::new(Router::new()), - post: RwLock::new(Router::new()), - put: RwLock::new(Router::new()), - patch: RwLock::new(Router::new()), - delete: RwLock::new(Router::new()), + #[cold] + fn new_manager() -> Self { + Self { + get: RwLock::new(Router::new()), + post: RwLock::new(Router::new()), + put: RwLock::new(Router::new()), + patch: RwLock::new(Router::new()), + delete: RwLock::new(Router::new()), + } } - } - #[cold] - fn get_rw_from_method(&self, method: Methods) -> &ThreadSafeLookup { - match method { - Methods::GET => &self.get, - Methods::POST => &self.post, - Methods::PUT => &self.put, - Methods::PATCH => &self.patch, - Methods::DELETE => &self.delete, + #[cold] + fn get_rw_from_method(&self, method: Methods) -> &ThreadSafeLookup { + match method { + Methods::GET => &self.get, + Methods::POST => &self.post, + Methods::PUT => &self.put, + Methods::PATCH => &self.patch, + Methods::DELETE => &self.delete, + } } - } - #[cold] - fn as_reader_type(&self) -> ReadRoutes { - ReadRoutes { - get: thread_to_reader(&self.get), - post: thread_to_reader(&self.post), - put: thread_to_reader(&self.put), - patch: thread_to_reader(&self.patch), - delete: thread_to_reader(&self.delete), + #[cold] + fn as_reader_type(&self) -> ReadRoutes { + ReadRoutes { + get: thread_to_reader(&self.get), + post: thread_to_reader(&self.post), + put: thread_to_reader(&self.put), + patch: thread_to_reader(&self.patch), + delete: thread_to_reader(&self.delete), + } } - } } #[cold] pub fn initialise_reader() { - let new_reader = GLOBAL_DATA.as_reader_type(); + let new_reader = GLOBAL_DATA.as_reader_type(); - write_reader(new_reader); + write_reader(new_reader); } #[cold] -pub fn add_new_route(route: &str, method: Methods, function: CallBackFunction) -> Result<()> { - let lock = GLOBAL_DATA.get_rw_from_method(method); - let mut writing = lock - .write(); +pub fn add_new_route(route: &str, method: Methods, function: CallbackInner) -> Result<()> { + let lock = GLOBAL_DATA.get_rw_from_method(method); + let mut writing = lock + .write(); - writing - .insert(route, function) - .map_err(|_| Error::new(Status::GenericFailure, "Error inserting route".to_string())) + if let Ok(found) = writing.at_mut(route) { + found.value.push(function); + Ok(()) + } else { + writing + .insert(route, vec![function]) + .map_err(|_| Error::new(Status::GenericFailure, "Error inserting route".to_string())) + } } diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index 69f8f9a..1680858 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -1,4 +1,4 @@ -use std::{cell::UnsafeCell, convert::Infallible, rc::Rc}; +use std::{cell::UnsafeCell, convert::Infallible, rc::Rc, sync::atomic::{AtomicUsize, Ordering}}; use actix_http::{HttpService, Request, Response}; use actix_server::Server; @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use crate::{ extras::scheduler::{pin_js_thread, try_pin_priority, reset_thread_affinity}, - object_pool::{build_up_pool, get_stored_chunk, StoredPair, get_pair_for_thread, replace_for_thread}, + object_pool::{build_up_pool, StoredPair, get_pair_for_thread, replace_for_thread, get_pool_for_threads}, router::{read_only::get_route, store::initialise_reader}, request::helpers::make_js_error, }; @@ -20,13 +20,14 @@ use super::{ }; struct ActixHttpServer { - object_pool: Rc>>, + object_pool: Rc>>>, + idx: UnsafeCell } impl ActixHttpServer { #[allow(clippy::mut_from_ref)] #[inline(always)] - fn get_mut_from_unsafe(unsafe_cell: &UnsafeCell>) -> &mut Vec { + fn get_mut_from_unsafe(unsafe_cell: &UnsafeCell>>) -> &mut Vec> { unsafe { &mut *unsafe_cell.get() } } @@ -34,6 +35,7 @@ impl ActixHttpServer { #[cold] async fn backoff_get_object(items: &mut Vec) -> StoredPair { loop { + println!("Backing off??"); tokio::task::yield_now().await; if let Some(retrieved) = items.pop() { @@ -41,6 +43,16 @@ impl ActixHttpServer { } } } + + #[inline(always)] + fn get_next_idx(&self) -> usize { + let position = unsafe { &mut *self.idx.get() }; + let val = *position; + + *position += 1; + + val + } } impl Service for ActixHttpServer { @@ -53,6 +65,7 @@ impl Service for ActixHttpServer { #[inline(always)] fn call(&self, mut req: Request) -> Self::Future { let vec_ref = self.object_pool.clone(); + let offset = self.get_next_idx(); Box::pin(async move { let router = match get_route(req.path(), req.method().clone()) { @@ -62,6 +75,8 @@ impl Service for ActixHttpServer { } }; + let router = unsafe { router.get_unchecked(offset % router.len()) }; + let mut body = None; if req.method() == http::Method::POST { @@ -73,11 +88,14 @@ impl Service for ActixHttpServer { }; } - let mut js_obj = match get_pair_for_thread(router.threads_id) { - Some(res) => res, - None => { - println!("Thread not been initialized!!"); - return get_failed_message(); + let object_reference = Self::get_mut_from_unsafe(&vec_ref); + + let mut js_obj = unsafe { + let reference = object_reference.get_unchecked_mut(router.threads_id); + + match reference.pop() { + Some(res) => res, + None => Self::backoff_get_object(reference).await, } }; @@ -95,7 +113,10 @@ impl Service for ActixHttpServer { Err(_) => get_failed_message(), }; - replace_for_thread(router.threads_id, js_obj); + unsafe { + let reference = object_reference.get_unchecked_mut(router.threads_id); + reference.push(js_obj); + } result }) @@ -105,6 +126,8 @@ impl Service for ActixHttpServer { #[derive(Clone)] struct AppFactory(usize); +static IDX_OFFSETTER: AtomicUsize = AtomicUsize::new(0); + impl ServiceFactory for AppFactory { type Config = (); type Response = Response; @@ -117,10 +140,12 @@ impl ServiceFactory for AppFactory { try_pin_priority(); let chunk_size = self.0; + let object_pool = Rc::new(UnsafeCell::new(get_pool_for_threads(chunk_size).unwrap())); Box::pin(async move { Ok(ActixHttpServer { - object_pool: Rc::new(UnsafeCell::new(get_stored_chunk(chunk_size))), + object_pool, + idx: UnsafeCell::new(IDX_OFFSETTER.fetch_add(1, Ordering::SeqCst)), }) }) } diff --git a/src/types.rs b/src/types.rs index ca35b70..dcc2cfd 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,3 +1,4 @@ use crate::router::route_node::RouteNode; -pub type CallBackFunction = RouteNode; +pub type CallbackInner = RouteNode; +pub type CallBackFunction = Vec; From 3bd2bd7990b3f2ed8ca6ec721726742bbd9179ab Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Fri, 11 Nov 2022 13:03:00 +0000 Subject: [PATCH 6/8] Tidy up worker threads --- __test__/saturation.spec.mjs | 10 ++-- __test__/worker_file.js | 20 ++++++++ __test__/workers_test.spec.mjs | 93 ++++++++++++++++++++++++++++++++++ examples/worker_threads.js | 44 ++++++++-------- index.d.ts | 31 ++++-------- index.js | 4 +- package-lock.json | 4 +- package.json | 1 + src/lib.rs | 1 - src/object_pool/mod.rs | 44 ++++------------ src/router/node_functions.rs | 3 -- src/server/actix_server.rs | 17 +++---- src/server/config.rs | 42 ++++++++++----- src/server/node_functions.rs | 15 +----- src/server/tls.rs | 4 +- src/thread/node_functions.rs | 8 +-- 16 files changed, 210 insertions(+), 131 deletions(-) create mode 100644 __test__/worker_file.js create mode 100644 __test__/workers_test.spec.mjs diff --git a/__test__/saturation.spec.mjs b/__test__/saturation.spec.mjs index 4ad709e..010cd32 100644 --- a/__test__/saturation.spec.mjs +++ b/__test__/saturation.spec.mjs @@ -9,9 +9,11 @@ import * as Walker from '../index.js' const config = { url: "0.0.0.0:8080", - worker_threads: "1", - backlog: "1000000", - pool_per_worker_size: "1" + workerThreads: 1, + backlog: 1000000, + poolPerWorkerSize: 1, + debug: false, + tls: false, } test.serial.before(async (_) => { @@ -46,4 +48,4 @@ test("Get /cpu 5_000 times", async t => { responses.forEach((resp, index) => { t.is(resp.data, `Param: ${index}`); }); -}); \ No newline at end of file +}); diff --git a/__test__/worker_file.js b/__test__/worker_file.js new file mode 100644 index 0000000..fc1006c --- /dev/null +++ b/__test__/worker_file.js @@ -0,0 +1,20 @@ +const Walker = require('..'); +const response = "Hello World!" + +Walker.get(`/`, (res) => { + res.sendTextUnchecked(response); +}) + +Walker.get(`/slowrunner/:count`, (res) => { + const params = res.getUrlParams(); + const count = parseInt(params.count); + let i = 0; + + for (; i < count; i++) { + + } + + res.sendTextUnchecked(`Result ${i}`) +}) + +Walker.registerThreadsPool(50_000); diff --git a/__test__/workers_test.spec.mjs b/__test__/workers_test.spec.mjs new file mode 100644 index 0000000..e304eb0 --- /dev/null +++ b/__test__/workers_test.spec.mjs @@ -0,0 +1,93 @@ +import test from 'ava' +import axios from 'axios'; +import { Worker } from 'node:worker_threads'; +import { fileURLToPath } from 'url'; +import path from 'path'; + +const Server = axios.create({ + baseURL: 'http://0.0.0.0:8080/' + }); + +import * as Walker from '../index.js' + +const config = { + url: "0.0.0.0:8080", + workerThreads: 8, + backlog: 500, + poolPerWorkerSize: 1_000, + debug: false, + tls: false, +} + +test.serial.before(async (_) => { + let dir = path.dirname(fileURLToPath(import.meta.url)); + for (let i = 0; i < 5; i++) { + const worker = new Worker(`${dir}/worker_file.js`); + } + + // We'll let the worker threads spin up + await new Promise((resolve) => setTimeout(resolve, 1000)); + + Walker.startWithConfig(config); + + // Sleeep for 100ms to let server start + await new Promise((resolve) => setTimeout(resolve, 300)); +}); + +// Hit the cpu endpoint 5_000 times to saturate the object pool +test.serial("Get /slowrunner multiple times in waves", async t => { + const req_number = 100; + const loop_number = 100_000; + + for (let c = 0; c < 100; c++) { + const promises = []; + for (let i = 0; i < req_number; i++) { + promises.push(Server.get(`/slowrunner/${loop_number}`)); + } + + const responses = await Promise.all(promises); + + t.is(responses.length, req_number); + // check all the responses are correct + responses.forEach((resp, index) => { + t.is(resp.data, `Result ${loop_number}`); + }); + } +}); + + +// Hit the endpoint with a continious load +test.serial("Get /slowrunner multiple times with a consistent load", async t => { + const promises = []; + const req_number = 1000; + const loop_number = 100_000; + let sum_of_requests = 10_000; + + for (let i = 0; i < req_number; i++) { + const p = Server.get(`/slowrunner/${loop_number}`).then(res => { + promises.splice(promises.indexOf(p), 1); + return res; + }); + promises.push(p); + } + + while (sum_of_requests > 0) { + const complete = await Promise.race(promises); + sum_of_requests -= 1; + + const p = Server.get(`/slowrunner/${loop_number}`).then(res => { + promises.splice(promises.indexOf(p), 1); + return res; + }); + promises.push(p); + + t.is(complete.data, `Result ${loop_number}`); + } + + const responses = await Promise.all(promises); + + // check all the responses are correct + responses.forEach((resp, index) => { + t.is(resp.data, `Result ${loop_number}`); + }); +}); diff --git a/examples/worker_threads.js b/examples/worker_threads.js index 6b94077..cb88236 100644 --- a/examples/worker_threads.js +++ b/examples/worker_threads.js @@ -9,30 +9,28 @@ const Walker = require('..'); const response = "Hello World!" if (isMainThread) { - setEnvironmentData('Hello', 'World!'); - const result = Walker.getWorkerId(); - Walker.initialisePoolForWorker(200_000); - console.log(`Result is ${result}`); - - for (let i = 0; i < 10; i++) { - const worker = new Worker(__filename); + for (let i = 0; i < 6; i++) { + // const worker = new Worker(__filename); } - Walker.get(`/${result}`, (res) => { - res.sendText(`Hello from main thread our id is ${result}`); - }) + Walker.get(`/key`, (res) => { + let i = 0; - Walker.get(`/`, (res) => { - res.sendTextUnchecked(response); + for (i; i < 1_000_000; i++) { + + } + + res.sendTextUnchecked(`Result ${i}`) }) + setTimeout(() => { console.log('Starting server...') const config = { url: "0.0.0.0:8081", - workerThreads: 12, - poolPerWorkerSize: 10000, + workerThreads: 6, + poolPerWorkerSize: 200_000, backlog: 10000, debug: false, tls: false, @@ -40,16 +38,18 @@ if (isMainThread) { Walker.startWithConfig(config); }, 5000); } else { - let result = Walker.getWorkerId(); - console.log(`Result is ${result}`); - - Walker.get(`/key`, (res) => { - res.sendText(`Hello from worker thread our id is ${result}`); - }) - Walker.get(`/`, (res) => { res.sendTextUnchecked(response); }) - Walker.initialisePoolForWorker(200_000); + Walker.get(`/key`, (res) => { + let i = 0; + + for (; i < 1_000_000; i++) { + + } + + res.sendTextUnchecked(`Result ${i}`) + }) + Walker.registerThreadsPool(200_000); } diff --git a/index.d.ts b/index.d.ts index 517e707..6ea88c3 100644 --- a/index.d.ts +++ b/index.d.ts @@ -52,20 +52,7 @@ export function start(address: string): void * This allows you to configure the number of workers */ export function startWithWorkerCount(address: string, workers: number): void -/** - * This is called to start the server the address will need to include the IP and port - * This allows you to configure more of the parameters of the server current options are all options need to be strings: - * - * url: The url to listen on - * - * worker_threads: The number of worker threads to use - * - * backlog: The number of connections to queue up - * - * pool_per_worker_size: The size of the pool per worker - * - * debug: Whether to enable debug mode - */ +/** This is called to start the server the using the `ServerConfig` object */ export function startWithConfig(config: ServerConfig): void /** * Attempts to stop the server, returns if it woreked @@ -78,18 +65,22 @@ export function stop(): boolean */ export interface ServerConfig { url: string - workerThreads: number - poolPerWorkerSize: number - backlog: number - debug: boolean - tls: boolean + workerThreads?: number + poolPerWorkerSize?: number + backlog?: number + debug?: boolean + tls?: boolean keyLocation?: string certLocation?: string } export function loadNewTemplate(groupName: string, directory: string): void export function reloadGroup(groupName: string): void export function getWorkerId(): number -export function initialisePoolForWorker(poolSize: number): void +/** + * This is used so you can register a worker thread under walker, this needs to be called + * before the server starts to register the pool of objects used for requests. + */ +export function registerThreadsPool(poolSize: number): void export function getThreadAffinity(): Array export class DbConnection { query(query: FastStr): object diff --git a/index.js b/index.js index 364247f..6c21731 100644 --- a/index.js +++ b/index.js @@ -236,7 +236,7 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { DbConnection, connectDb, PreparedStatement, Methods, newRoute, get, post, put, patch, RequestBlob, start, startWithWorkerCount, startWithConfig, stop, loadNewTemplate, reloadGroup, getWorkerId, initialisePoolForWorker, getThreadAffinity } = nativeBinding +const { DbConnection, connectDb, PreparedStatement, Methods, newRoute, get, post, put, patch, RequestBlob, start, startWithWorkerCount, startWithConfig, stop, loadNewTemplate, reloadGroup, getWorkerId, registerThreadsPool, getThreadAffinity } = nativeBinding module.exports.DbConnection = DbConnection module.exports.connectDb = connectDb @@ -255,5 +255,5 @@ module.exports.stop = stop module.exports.loadNewTemplate = loadNewTemplate module.exports.reloadGroup = reloadGroup module.exports.getWorkerId = getWorkerId -module.exports.initialisePoolForWorker = initialisePoolForWorker +module.exports.registerThreadsPool = registerThreadsPool module.exports.getThreadAffinity = getThreadAffinity diff --git a/package-lock.json b/package-lock.json index adf12d8..a9d8abb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@walkerserver/server", - "version": "0.0.3", + "version": "0.0.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@walkerserver/server", - "version": "0.0.3", + "version": "0.0.4", "license": "MIT", "devDependencies": { "@napi-rs/cli": "^2.11.4", diff --git a/package.json b/package.json index 5a32887..890d194 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "test:main": "ava -T 60s ./__test__/index.spec.mjs", "test:stress": "ava -T 60s ./__test__/stress.spec.mjs", "test:saturate": "ava -T 600s ./__test__/saturation.spec.mjs", + "test:workers": "ava -T 600s ./__test__/workers_test.spec.mjs", "version": "napi version" } } diff --git a/src/lib.rs b/src/lib.rs index 10b3434..ad3821e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ #[macro_use] extern crate napi_derive; -#[cfg(not(all(target_os = "linux", target_env = "musl", target_arch = "aarch64")))] #[global_allocator] static ALLOC: mimalloc_rust::GlobalMiMalloc = mimalloc_rust::GlobalMiMalloc; diff --git a/src/object_pool/mod.rs b/src/object_pool/mod.rs index 8894f72..5fdb5e5 100644 --- a/src/object_pool/mod.rs +++ b/src/object_pool/mod.rs @@ -10,28 +10,22 @@ pub struct StoredPair(pub (Box, sys::napi_ref)); unsafe impl Send for StoredPair {} unsafe impl Sync for StoredPair {} -static POOL: Mutex> = Mutex::new(vec![]); - static WORKER_POOL: RwLock>>> = RwLock::new(vec![]); -pub fn get_stored_chunk(count: usize) -> Vec { - let mut locked = POOL.lock(); - let split_point = locked.len() - count; - - locked.split_off(split_point) -} - -pub unsafe fn build_pool_for_id(env: sys::napi_env, pool_size: usize, thread_id: usize) -> Result<()> { +pub unsafe fn build_pool_for_id(env: sys::napi_env, pool_size: usize, thread_id: usize, build_if_present: bool) -> Result<()> { let mut pool_list = WORKER_POOL.write(); if pool_list.len() >= thread_id { - println!("Resizing the pool"); pool_list.resize_with(thread_id + 1, Default::default); } - println!("Getting id {} from length {}", thread_id, pool_list.len()); let found = pool_list.get_mut(thread_id).ok_or_else(|| make_js_error("Error building pool"))?; let mut pool = found.lock(); + + if !build_if_present && !pool.is_empty() { + return Ok(()) + } + build_pool_into_vec(env, pool_size, &mut pool) } @@ -42,39 +36,19 @@ pub fn get_pool_for_threads(count: usize) -> Result>> { for thread in locked.iter() { let mut obj_list = thread.lock(); if obj_list.len() < count { + println!("The object length is {} and count is {}", obj_list.len(), count); return Err(make_js_error("We don't have enough objects provisioned.")) } let split_point = obj_list.len() - count; let split = obj_list.split_off(split_point); - println!("Build lookup of length {}", split.len()); result.push(split); } Ok(result) } -#[inline(always)] -pub fn get_pair_for_thread(thread_id: usize) -> Option { - let reader = WORKER_POOL.read(); - let threads_pool = reader.get(thread_id)?; - let mut locked = threads_pool.lock(); - - locked.pop() -} - -#[inline(always)] -pub fn replace_for_thread(thread_id: usize, pair: StoredPair) -> Option<()> { - let reader = WORKER_POOL.read(); - let threads_pool = reader.get(thread_id)?; - let mut locked = threads_pool.lock(); - - locked.push(pair); - - Some(()) -} - unsafe fn get_obj_constructor() -> Result { let ctor_ref = napi::__private::get_class_constructor("RequestBlob\0") .ok_or_else(|| make_js_error("Error caching contructor."))?; @@ -127,8 +101,8 @@ unsafe fn build_pool_into_vec(env: sys::napi_env, pool_size: usize, pool: &mut V } pub unsafe fn build_up_pool(env: sys::napi_env, pool_size: usize) -> Result<()> { - let mut locked_pool = POOL.lock(); - build_pool_into_vec(env, pool_size, &mut locked_pool) + let thread_id = crate::thread::get_id(); + build_pool_for_id(env, pool_size, thread_id as usize, false) } diff --git a/src/router/node_functions.rs b/src/router/node_functions.rs index 58ecea8..c942a49 100644 --- a/src/router/node_functions.rs +++ b/src/router/node_functions.rs @@ -46,9 +46,6 @@ impl Methods { /// needed to get the information from the request pub fn new_route(route: String, method: Methods, callback: JsFunction) -> Result<()> { let tsfn = ThreadsafeFunction::create(callback.0.env, callback.0.value, 1024)?; - - println!("Thread {} is adding a route...", crate::thread::get_id()); - add_new_route(&route, method, super::RouteNode::new_with_fn(tsfn)) } diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index 1680858..832ed3d 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use crate::{ extras::scheduler::{pin_js_thread, try_pin_priority, reset_thread_affinity}, - object_pool::{build_up_pool, StoredPair, get_pair_for_thread, replace_for_thread, get_pool_for_threads}, + object_pool::{build_up_pool, StoredPair, get_pool_for_threads}, router::{read_only::get_route, store::initialise_reader}, request::helpers::make_js_error, }; @@ -35,7 +35,6 @@ impl ActixHttpServer { #[cold] async fn backoff_get_object(items: &mut Vec) -> StoredPair { loop { - println!("Backing off??"); tokio::task::yield_now().await; if let Some(retrieved) = items.pop() { @@ -152,14 +151,14 @@ impl ServiceFactory for AppFactory { } async fn create_sever(config: ServerConfig) -> std::io::Result<()> { - let pool_size = config.pool_per_worker_size; + let pool_size = config.get_pool_per_worker(); let srv = Server::build() - .backlog(config.backlog as u32) + .backlog(config.get_backlog_size()) .bind("walker_server_h1", &config.url, move || { HttpService::build().finish(AppFactory(pool_size as usize)).tcp() })? - .workers(config.worker_threads as usize) + .workers(config.get_worker_thread() as usize) .run(); attach_server_handle(srv.handle()); @@ -168,15 +167,15 @@ async fn create_sever(config: ServerConfig) -> std::io::Result<()> { } async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { - let pool_size = config.pool_per_worker_size; + let pool_size = config.get_pool_per_worker(); let certs = super::tls::load_tls_certs(&config).unwrap(); let srv = Server::build() - .backlog(config.backlog as u32) + .backlog(config.get_backlog_size()) .bind("walker_server_h1", &config.url, move || { HttpService::build().finish(AppFactory(pool_size as usize)).rustls(certs.clone()) })? - .workers(config.worker_threads as usize) + .workers(config.get_worker_thread() as usize) .run(); attach_server_handle(srv.handle()); @@ -189,7 +188,7 @@ fn run_server(config: ServerConfig) -> std::io::Result<()> { // Lets set net reciever priority here try_pin_priority(); - if config.tls { + if config.get_tls() { actix_rt::System::new().block_on(create_tls_server(config)) } else { actix_rt::System::new().block_on(create_sever(config)) diff --git a/src/server/config.rs b/src/server/config.rs index f4e05b7..7cd3f9f 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -1,16 +1,16 @@ use std::cmp; #[napi(object)] -#[derive(Debug)] +#[derive(Debug, Default)] /// This allows you to configure the server with more /// granular control, some options are required. pub struct ServerConfig { pub url: String, - pub worker_threads: u32, - pub pool_per_worker_size: u32, - pub backlog: u32, - pub debug: bool, - pub tls: bool, + pub worker_threads: Option, + pub pool_per_worker_size: Option, + pub backlog: Option, + pub debug: Option, + pub tls: Option, pub key_location: Option, pub cert_location: Option, } @@ -27,17 +27,31 @@ impl ServerConfig { pub fn default_with_url(url: String) -> Self { Self { url, - worker_threads: guess_optimal_worker_count() as u32, - pool_per_worker_size: 10_000, - backlog: 1024, - debug: false, - tls: false, - key_location: None, - cert_location: None, + ..Default::default() } } + pub fn get_worker_thread(&self) -> u32 { + self.worker_threads.unwrap_or(guess_optimal_worker_count() as u32) + } + + pub fn get_pool_per_worker(&self) -> u32 { + self.pool_per_worker_size.unwrap_or(10_000) + } + + pub fn get_backlog_size(&self) -> u32 { + self.backlog.unwrap_or(1024) + } + + pub fn get_debug(&self) -> bool { + self.debug.unwrap_or(false) + } + + pub fn get_tls(&self) -> bool { + self.tls.unwrap_or(false) + } + pub fn get_pool_size(&self) -> usize { - (self.worker_threads * self.pool_per_worker_size) as usize + (self.get_worker_thread() * self.get_pool_per_worker()) as usize } } diff --git a/src/server/node_functions.rs b/src/server/node_functions.rs index c32b823..e221d1a 100644 --- a/src/server/node_functions.rs +++ b/src/server/node_functions.rs @@ -17,7 +17,7 @@ pub fn start(env: Env, address: String) -> Result<()> { /// This allows you to configure the number of workers pub fn start_with_worker_count(env: Env, address: String, workers: u32) -> Result<()> { let mut config = ServerConfig::default_with_url(address); - config.worker_threads = workers; + config.worker_threads = Some(workers); start_server(config, env.raw()) } @@ -25,18 +25,7 @@ pub fn start_with_worker_count(env: Env, address: String, workers: u32) -> Resul #[cold] #[napi] -/// This is called to start the server the address will need to include the IP and port -/// This allows you to configure more of the parameters of the server current options are all options need to be strings: -/// -/// url: The url to listen on -/// -/// worker_threads: The number of worker threads to use -/// -/// backlog: The number of connections to queue up -/// -/// pool_per_worker_size: The size of the pool per worker -/// -/// debug: Whether to enable debug mode +/// This is called to start the server the using the `ServerConfig` object pub fn start_with_config(env: Env, config: ServerConfig) -> Result<()> { start_server(config, env.raw()) } diff --git a/src/server/tls.rs b/src/server/tls.rs index 0f51af1..e7efe8c 100644 --- a/src/server/tls.rs +++ b/src/server/tls.rs @@ -34,7 +34,5 @@ pub fn load_tls_certs(user_config: &super::config::ServerConfig) -> Result u32 { } #[napi] -pub fn initialise_pool_for_worker(env: Env, pool_size: u32) -> Result<()> { - unsafe { build_pool_for_id(env.raw(), pool_size as usize, get_worker_id() as usize) } +/// This is used so you can register a worker thread under walker, this needs to be called +/// before the server starts to register the pool of objects used for requests. +pub fn register_threads_pool(env: Env, pool_size: u32) -> Result<()> { + unsafe { build_pool_for_id(env.raw(), pool_size as usize, get_worker_id() as usize, true) } } From ee67f1a3b7a5f386eb13bf476fb6066ab20431a3 Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Mon, 14 Nov 2022 16:24:57 +0000 Subject: [PATCH 7/8] using ntex --- Cargo.toml | 6 ++-- examples/simple.js | 19 ++++++++++ examples/test.js | 13 +++++-- src/napi/buff_str.rs | 2 +- src/napi/bytes_recv.rs | 4 +-- src/request/helpers.rs | 4 +-- src/request/request_blob.rs | 4 +-- src/request/resp_utilities.rs | 5 ++- src/response/mod.rs | 31 +++++++++-------- src/router/node_functions.rs | 2 +- src/router/read_only.rs | 4 +-- src/server/actix_server.rs | 65 ++++++++++++++++++++--------------- src/server/helpers.rs | 15 ++++---- src/server/mod.rs | 2 +- src/server/node_functions.rs | 5 +-- src/templates/mod.rs | 15 ++++---- 16 files changed, 115 insertions(+), 81 deletions(-) create mode 100644 examples/simple.js diff --git a/Cargo.toml b/Cargo.toml index 06b4c5c..8fad7bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,8 @@ bytes = "1.2.1" serde_json = "1.0.85" lazy_static = "1.4.0" tokio = { version = "1", features = ["full"] } -actix-http = { version = "3.2", features = ["http2", "rustls"]} -actix-service = "2.0.2" futures = "0.3.24" http = "0.2.8" -actix-rt = "2.7.0" -actix-server = "2.1.1" tera = "1.17.1" parking_lot = "0.12.1" simdutf8 = "0.1.4" @@ -29,6 +25,8 @@ num_cpus = "1.13.1" extreme = "666.666.666666" rustls = "0.20.7" rustls-pemfile = "1.0.1" +kanal = "0.1.0-pre7" +ntex = { version = "0.5.29", features = ["tokio"] } [target.'cfg(not(target_os = "linux"))'.dependencies] mimalloc-rust = { version = "0.2" } diff --git a/examples/simple.js b/examples/simple.js new file mode 100644 index 0000000..1c9598d --- /dev/null +++ b/examples/simple.js @@ -0,0 +1,19 @@ +const Walker = require('..'); + +const response = "Hello world!"; + +Walker.get("/", (res) => { + res.sendTextUnchecked(response); +}); + + +const config = { + url: "0.0.0.0:8081", + workerThreads: 6, + poolPerWorkerSize: 200_000, + backlog: 10000, + debug: false, + tls: false, +} + +Walker.startWithConfig(config) diff --git a/examples/test.js b/examples/test.js index 21c7354..bd75971 100644 --- a/examples/test.js +++ b/examples/test.js @@ -115,7 +115,7 @@ Walker.get("/template.html", (res) => { my_var: `We have 10 Page visitors ${++counter}` }; - res.sendTemplateResp('root', 'users/profile.html', JSON.stringify(data)); + res.sendTemplateResp('root', 'users/profile.html', JSON.stringify(data)); }); Walker.get("/counter", (res) => { @@ -266,4 +266,13 @@ Walker.post("/post", (res) => { res.sendText(`We got this as the body: ${body.toString('utf8')}`); }); -Walker.startWithWorkerCount("0.0.0.0:8081", 12) +const config = { + url: "0.0.0.0:8081", + workerThreads: 6, + poolPerWorkerSize: 200_000, + backlog: 10000, + debug: false, + tls: false, +} + +Walker.startWithConfig(config) diff --git a/src/napi/buff_str.rs b/src/napi/buff_str.rs index 3b0c84d..1158640 100644 --- a/src/napi/buff_str.rs +++ b/src/napi/buff_str.rs @@ -1,6 +1,6 @@ use std::ptr; -use bytes::{BytesMut, Bytes}; +use ntex::util::{BytesMut, Bytes}; use napi::{ bindgen_prelude::{FromNapiValue, TypeName}, check_status, diff --git a/src/napi/bytes_recv.rs b/src/napi/bytes_recv.rs index e72e0e5..cea5ed7 100644 --- a/src/napi/bytes_recv.rs +++ b/src/napi/bytes_recv.rs @@ -1,6 +1,6 @@ use std::ptr; -use bytes::Bytes; +use ntex::util::Bytes; use napi::{ bindgen_prelude::FromNapiValue, check_status, @@ -8,7 +8,7 @@ use napi::{ Result, TypedArrayType, Error, Status, }; -pub struct JsBytes(pub bytes::Bytes); +pub struct JsBytes(pub Bytes); impl FromNapiValue for JsBytes { #[inline(always)] diff --git a/src/request/helpers.rs b/src/request/helpers.rs index ab5bcf7..d09352b 100644 --- a/src/request/helpers.rs +++ b/src/request/helpers.rs @@ -1,5 +1,5 @@ -use actix_http::{header::HeaderMap}; -use bytes::{BytesMut, Bytes, BufMut}; +use ntex::http::{header::HeaderMap}; +use ntex::util::{BytesMut, Bytes, BufMut}; use halfbrown::HashMap; use napi::{Error, Result, Status}; use serde_json::Value; diff --git a/src/request/request_blob.rs b/src/request/request_blob.rs index 56e2415..16e9593 100644 --- a/src/request/request_blob.rs +++ b/src/request/request_blob.rs @@ -1,6 +1,6 @@ use std::mem::MaybeUninit; -use actix_http::Request; -use bytes::Bytes; +use ntex::http::Request; +use ntex::util::Bytes; use tokio::sync::oneshot::Sender; use napi::Result; diff --git a/src/request/resp_utilities.rs b/src/request/resp_utilities.rs index d9e54b3..fd9a163 100644 --- a/src/request/resp_utilities.rs +++ b/src/request/resp_utilities.rs @@ -1,4 +1,3 @@ -use actix_http::HttpMessage; use napi::bindgen_prelude::Uint8Array; use crate::{ @@ -71,7 +70,7 @@ impl RequestBlob { /// Get the url parameters as an object with each key and value /// this will only be null if an error has occurred pub fn header_length(&self) -> i64 { - let header_val = self.get_data_val().headers().len_keys(); + let header_val = self.get_data_val().headers().len(); header_val as i64 } @@ -100,7 +99,7 @@ impl RequestBlob { /// Retrieve the raw body bytes in a Uint8Array to be used pub fn get_body(&mut self) -> Uint8Array { match &self.body { - Some(res) => res.clone().into(), + Some(res) => (&res.clone() as &[u8]).into(), None => vec![].into(), } } diff --git a/src/response/mod.rs b/src/response/mod.rs index e85bb4a..bc6d858 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -1,9 +1,10 @@ -use actix_http::{ +use ntex::http::{ header::{HeaderMap, HeaderName, CONTENT_TYPE, SERVER}, Response, StatusCode, }; -use bytes::Bytes; -use http::HeaderValue; +use ntex::web::HttpResponse; +use ntex::util::Bytes; +use ntex::http::header::HeaderValue; use crate::templates::store_in_bytes_buffer; @@ -36,28 +37,28 @@ use InnerResp::*; #[cold] #[inline(never)] -fn render_internal_error() -> Response { - Response::with_body( +fn render_internal_error() -> HttpResponse { + HttpResponse::with_body( StatusCode::INTERNAL_SERVER_ERROR, - INTERNAL_SERVER_ERROR.clone(), + INTERNAL_SERVER_ERROR.clone().into(), ) } #[cold] #[inline(never)] -fn render_internal_error_with_message(message: &'static [u8]) -> Response { - Response::with_body( +fn render_internal_error_with_message(message: &'static [u8]) -> HttpResponse { + HttpResponse::with_body( StatusCode::INTERNAL_SERVER_ERROR, - Bytes::from_static(message), + Bytes::from_static(message).into(), ) } #[cold] #[inline(never)] -fn render_internal_error_with_bytes(message: Bytes) -> Response { - Response::with_body( +fn render_internal_error_with_bytes(message: Bytes) -> HttpResponse { + HttpResponse::with_body( StatusCode::INTERNAL_SERVER_ERROR, - message, + message.into(), ) } @@ -81,7 +82,7 @@ fn apply_headers( }; unsafe { - let value = HeaderValue::from_maybe_shared_unchecked(val_b); + let value = HeaderValue::from_shared_unchecked(val_b); hdrs.insert(key, value); } } @@ -104,7 +105,7 @@ impl JsResponse { } #[inline(always)] - pub fn apply_to_response(self) -> Response { + pub fn apply_to_response(self) -> HttpResponse { let message = match &self.inner { Text(_) | EmptyString => TEXT_HEADER_VAL.clone(), Json(_) => JSON_HEADER_VAL.clone(), @@ -127,7 +128,7 @@ impl JsResponse { _ => unreachable!(), }; - let mut rsp = Response::with_body(Self::get_status_code(self.status_code), bytes); + let mut rsp = HttpResponse::with_body(Self::get_status_code(self.status_code), bytes.into()); let hdrs = rsp.headers_mut(); apply_headers(hdrs, message, self.headers); diff --git a/src/router/node_functions.rs b/src/router/node_functions.rs index c942a49..e17e9e8 100644 --- a/src/router/node_functions.rs +++ b/src/router/node_functions.rs @@ -1,4 +1,4 @@ -use actix_http::Method; +use ntex::http::Method; use napi::bindgen_prelude::*; use crate::{router::store::add_new_route, napi::tsfn::ThreadsafeFunction}; diff --git a/src/router/read_only.rs b/src/router/read_only.rs index b6b84f5..f3263b2 100644 --- a/src/router/read_only.rs +++ b/src/router/read_only.rs @@ -1,6 +1,6 @@ use std::{cell::UnsafeCell, mem::MaybeUninit}; -use actix_http::Method; +use ntex::http::Method; use halfbrown::HashMap; use matchit::{Router, Params}; @@ -77,4 +77,4 @@ pub fn get_params(route: &str, method: Method) -> Option Some(HalfBrown(params_to_map(&res.params))), Err(_) => None, } -} \ No newline at end of file +} diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index 832ed3d..61780bc 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -1,9 +1,12 @@ -use std::{cell::UnsafeCell, convert::Infallible, rc::Rc, sync::atomic::{AtomicUsize, Ordering}}; +use std::{cell::UnsafeCell, rc::Rc, sync::atomic::{AtomicUsize, Ordering}}; +use std::{task::Context, task::Poll}; + +use ntex::http::{HttpService, Request, Response}; +use ntex::web::Error; +use ntex::server::Server; +use ntex::service::{Service, ServiceFactory}; +use ntex::util::PoolId; -use actix_http::{HttpService, Request, Response}; -use actix_server::Server; -use actix_service::{Service, ServiceFactory}; -use bytes::Bytes; use futures::future::LocalBoxFuture; use napi::sys; use tokio::sync::oneshot; @@ -16,7 +19,7 @@ use crate::{ use super::{ config::ServerConfig, - helpers::{get_failed_message, get_post_body}, shutdown::{attach_server_handle, try_own_start}, + helpers::{get_failed_message, get_post_body}, }; struct ActixHttpServer { @@ -55,11 +58,14 @@ impl ActixHttpServer { } impl Service for ActixHttpServer { - type Response = Response; - type Error = Infallible; + type Response = Response; + type Error = Error; type Future = LocalBoxFuture<'static, Result>; - actix_service::always_ready!(); + #[inline] + fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } #[inline(always)] fn call(&self, mut req: Request) -> Self::Future { @@ -128,9 +134,8 @@ struct AppFactory(usize); static IDX_OFFSETTER: AtomicUsize = AtomicUsize::new(0); impl ServiceFactory for AppFactory { - type Config = (); - type Response = Response; - type Error = Infallible; + type Response = Response; + type Error = Error; type Service = ActixHttpServer; type InitError = (); type Future = LocalBoxFuture<'static, Result>; @@ -154,14 +159,18 @@ async fn create_sever(config: ServerConfig) -> std::io::Result<()> { let pool_size = config.get_pool_per_worker(); let srv = Server::build() - .backlog(config.get_backlog_size()) - .bind("walker_server_h1", &config.url, move || { - HttpService::build().finish(AppFactory(pool_size as usize)).tcp() + .backlog(config.get_backlog_size() as _) + .bind("walker_server_h1", &config.url, move |cfg| { + cfg.memory_pool(PoolId::P1); + PoolId::P1.set_read_params(65535, 8192); + PoolId::P1.set_write_params(65535, 8192); + + HttpService::build().finish(AppFactory(pool_size as usize)) })? - .workers(config.get_worker_thread() as usize) + .workers(config.get_worker_thread() as usize) .run(); - attach_server_handle(srv.handle()); + // attach_server_handle(srv.handle()); srv.await } @@ -171,14 +180,14 @@ async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { let certs = super::tls::load_tls_certs(&config).unwrap(); let srv = Server::build() - .backlog(config.get_backlog_size()) - .bind("walker_server_h1", &config.url, move || { - HttpService::build().finish(AppFactory(pool_size as usize)).rustls(certs.clone()) + .backlog(config.get_backlog_size() as _) + .bind("walker_server_h1", &config.url, move |_| { + HttpService::build().finish(AppFactory(pool_size as usize)) })? - .workers(config.get_worker_thread() as usize) + .workers(config.get_worker_thread() as usize) .run(); - attach_server_handle(srv.handle()); + // attach_server_handle(srv.handle()); srv.await } @@ -189,18 +198,18 @@ fn run_server(config: ServerConfig) -> std::io::Result<()> { try_pin_priority(); if config.get_tls() { - actix_rt::System::new().block_on(create_tls_server(config)) + ntex::rt::System::new("Server thread").block_on(create_tls_server(config)) } else { - actix_rt::System::new().block_on(create_sever(config)) + ntex::rt::System::new("Server thread").block_on(create_sever(config)) } } #[cold] pub fn start_server(config: ServerConfig, env: sys::napi_env) -> napi::Result<()> { - if !try_own_start() { - return Err(make_js_error("Server already started")); - } - + // if !try_own_start() { + // return Err(make_js_error("Server already started")); + // } + reset_thread_affinity(); initialise_reader(); unsafe { build_up_pool(env, config.get_pool_size())?; } diff --git a/src/server/helpers.rs b/src/server/helpers.rs index f86664a..85e5c8b 100644 --- a/src/server/helpers.rs +++ b/src/server/helpers.rs @@ -1,17 +1,16 @@ -use std::convert::Infallible; - -use actix_http::{Response, Payload}; -use bytes::{Bytes, BytesMut}; +use ntex::web::{HttpResponse, Error}; +use ntex::http::{Response, Payload}; +use ntex::util::{Bytes, BytesMut}; use futures::StreamExt; const MAX_SIZE: usize = 262_144; // max payload size is 256k #[cold] #[inline(never)] -pub fn get_failed_message() -> Result, Infallible> { - Ok(Response::with_body( +pub fn get_failed_message() -> Result { + Ok(HttpResponse::with_body( http::StatusCode::NOT_FOUND, - Bytes::new(), + Bytes::new().into(), )) } @@ -32,4 +31,4 @@ pub async fn get_post_body(payload: &mut Payload) -> Result } Ok(body.freeze()) -} \ No newline at end of file +} diff --git a/src/server/mod.rs b/src/server/mod.rs index a1ad7ba..031ec70 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,5 +2,5 @@ pub mod node_functions; mod config; mod actix_server; mod helpers; -mod shutdown; +// mod shutdown; mod tls; diff --git a/src/server/node_functions.rs b/src/server/node_functions.rs index e221d1a..8d58a97 100644 --- a/src/server/node_functions.rs +++ b/src/server/node_functions.rs @@ -1,6 +1,6 @@ use napi::bindgen_prelude::*; -use super::{actix_server::start_server, config::ServerConfig, shutdown::stop_server}; +use super::{actix_server::start_server, config::ServerConfig}; #[cold] #[napi] @@ -35,5 +35,6 @@ pub fn start_with_config(env: Env, config: ServerConfig) -> Result<()> { /// Attempts to stop the server, returns if it woreked /// Experimental at the moment pub fn stop() -> bool { - stop_server(true) + // stop_server(true) + false } diff --git a/src/templates/mod.rs b/src/templates/mod.rs index 48f0f73..5406718 100644 --- a/src/templates/mod.rs +++ b/src/templates/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use bytes::{BytesMut, buf::Writer, BufMut}; +use ntex::util::{BytesMut, Buf, BufMut}; use lazy_static::lazy_static; use napi::Result; use parking_lot::RwLock; @@ -43,14 +43,14 @@ pub(crate) fn render_value_to_writer( group_name: &str, file_name: &str, data: Value, - writer: &mut Writer, + writer: &mut BytesMut, ) -> Result<()> { let reader = TEMPLATES.read(); let found_template = reader.get(group_name).ok_or_else(|| make_js_error("Error finding the template file."))?; let context = &Context::from_value(data).map_err(|_| make_js_error("Error reading data value."))?; found_template - .render_to(file_name, context, writer) + .render_to(file_name, context, writer.writer()) .map_err(|_| make_js_error("Error rendering the text")) } @@ -59,7 +59,7 @@ pub(crate) fn render_string_to_writer( group_name: &str, file_name: &str, data: &str, - writer: &mut Writer, + writer: &mut BytesMut, ) -> Result<()> { let parsed: Value = serde_json::from_str(data).map_err(|_| make_js_error("Error parsing json data."))?; render_value_to_writer(group_name, file_name, parsed, writer) @@ -71,8 +71,7 @@ pub(crate) fn store_in_bytes_buffer( file_name: &str, data: &str, ) -> Result { - let buffer = BytesMut::with_capacity(1024); - let mut writer = buffer.writer(); - render_string_to_writer(group_name, file_name, data, &mut writer)?; - Ok(writer.into_inner()) + let mut buffer = BytesMut::with_capacity(1024); + render_string_to_writer(group_name, file_name, data, &mut buffer)?; + Ok(buffer) } From bc3da7b512adb1261a3933e8f7c53db4a4f1598f Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Sat, 19 Nov 2022 14:17:39 +0000 Subject: [PATCH 8/8] Add tls support --- Cargo.toml | 2 +- examples/simple.js | 6 +++++- examples/tls.js | 16 +++++++++++----- src/request/request_blob.rs | 27 ++++++++++++++++----------- src/response/mod.rs | 3 +-- src/server/actix_server.rs | 29 +++++++++++++---------------- src/templates/mod.rs | 2 +- 7 files changed, 48 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8fad7bf..2245924 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ extreme = "666.666.666666" rustls = "0.20.7" rustls-pemfile = "1.0.1" kanal = "0.1.0-pre7" -ntex = { version = "0.5.29", features = ["tokio"] } +ntex = { version = "0.5.29", features = ["tokio", "rustls"] } [target.'cfg(not(target_os = "linux"))'.dependencies] mimalloc-rust = { version = "0.2" } diff --git a/examples/simple.js b/examples/simple.js index 1c9598d..4de0524 100644 --- a/examples/simple.js +++ b/examples/simple.js @@ -7,9 +7,13 @@ Walker.get("/", (res) => { }); +Walker.get("/blank", (res) => { + res.uncheckedSendEmptyText(); +}); + const config = { url: "0.0.0.0:8081", - workerThreads: 6, + workerThreads: 8, poolPerWorkerSize: 200_000, backlog: 10000, debug: false, diff --git a/examples/tls.js b/examples/tls.js index bc67c3f..f102276 100644 --- a/examples/tls.js +++ b/examples/tls.js @@ -2,15 +2,21 @@ const Walker = require('..'); const config = { url: "0.0.0.0:8081", - worker_threads: "1", - pool_per_worker_size: "100", - tls: "true", - cert_location: './certs/cert.pem', - key_location: './certs/key.pem' + workerThreads: 1, + poolPerWorkerSize: 100, + backlog: 10000, + debug: false, + tls: true, + certLocation: './certs/cert.pem', + keyLocation: './certs/key.pem' } Walker.get("/", (res) => { res.sendTextUnchecked("Hello world!"); }); +Walker.get("/test", (res) => { + res.sendText("Hello world!"); +}); + Walker.startWithConfig(config); diff --git a/src/request/request_blob.rs b/src/request/request_blob.rs index 16e9593..cf59310 100644 --- a/src/request/request_blob.rs +++ b/src/request/request_blob.rs @@ -1,8 +1,8 @@ use std::mem::MaybeUninit; use ntex::http::Request; use ntex::util::Bytes; -use tokio::sync::oneshot::Sender; use napi::Result; +use kanal::{AsyncReceiver, Sender}; use super::helpers::make_js_error; use crate::response::{JsResponse, InnerResp}; @@ -10,7 +10,8 @@ use crate::response::{JsResponse, InnerResp}; #[napi] pub struct RequestBlob { pub(crate) data: MaybeUninit, - pub(crate) oneshot: MaybeUninit>, + pub(crate) reciever: AsyncReceiver, + pub(crate) sender: Sender, pub(crate) sent: bool, pub(crate) body: Option, pub(crate) headers: MaybeUninit>>, @@ -20,9 +21,19 @@ pub struct RequestBlob { impl RequestBlob { pub fn new_empty_with_js() -> Box { + let (send, recv) = kanal::bounded(0); + + let recv = { + let copied = recv.clone_async(); + drop(recv); + + copied + }; + Box::new(Self { data: MaybeUninit::uninit(), - oneshot: MaybeUninit::uninit(), + reciever: recv, + sender: send, sent: false, body: None, headers: MaybeUninit::uninit(), @@ -32,8 +43,7 @@ impl RequestBlob { } #[inline] - pub fn store_self_data(&mut self, data: Request, sender: Sender, body: Option) { - let oneshot = MaybeUninit::new(sender); + pub fn store_self_data(&mut self, data: Request, body: Option) { let headers = MaybeUninit::new(None); let data = MaybeUninit::new(data); @@ -42,7 +52,6 @@ impl RequestBlob { } self.data = data; - self.oneshot = oneshot; self.headers = headers; self.body = body; self.sent = false; @@ -62,10 +71,6 @@ impl RequestBlob { } self.sent = true; - let oneshot = unsafe { - let result = std::mem::replace(&mut self.oneshot, MaybeUninit::uninit()); - result.assume_init() - }; let headers = unsafe { let result = std::mem::replace(&mut self.headers, MaybeUninit::uninit()); @@ -73,7 +78,7 @@ impl RequestBlob { }; let js_resp = JsResponse { inner, headers, status_code: self.status_code }; - let res = oneshot.send(js_resp); + let res = self.sender.send(js_resp); if checked && res.is_err() { return Err(make_js_error("Error with sending the response.")) diff --git a/src/response/mod.rs b/src/response/mod.rs index bc6d858..de27d39 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -1,6 +1,5 @@ use ntex::http::{ - header::{HeaderMap, HeaderName, CONTENT_TYPE, SERVER}, - Response, StatusCode, + header::{HeaderMap, HeaderName, CONTENT_TYPE, SERVER}, StatusCode, }; use ntex::web::HttpResponse; use ntex::util::Bytes; diff --git a/src/server/actix_server.rs b/src/server/actix_server.rs index 61780bc..b1f4133 100644 --- a/src/server/actix_server.rs +++ b/src/server/actix_server.rs @@ -9,12 +9,11 @@ use ntex::util::PoolId; use futures::future::LocalBoxFuture; use napi::sys; -use tokio::sync::oneshot; use crate::{ extras::scheduler::{pin_js_thread, try_pin_priority, reset_thread_affinity}, object_pool::{build_up_pool, StoredPair, get_pool_for_threads}, - router::{read_only::get_route, store::initialise_reader}, request::helpers::make_js_error, + router::{read_only::get_route, store::initialise_reader}, }; use super::{ @@ -39,6 +38,7 @@ impl ActixHttpServer { async fn backoff_get_object(items: &mut Vec) -> StoredPair { loop { tokio::task::yield_now().await; + println!("Object pool starved!"); if let Some(retrieved) = items.pop() { return retrieved; @@ -83,7 +83,6 @@ impl Service for ActixHttpServer { let router = unsafe { router.get_unchecked(offset % router.len()) }; let mut body = None; - if req.method() == http::Method::POST { body = match get_post_body(req.payload()).await { Ok(body) => Some(body), @@ -94,32 +93,30 @@ impl Service for ActixHttpServer { } let object_reference = Self::get_mut_from_unsafe(&vec_ref); + let reference = unsafe { object_reference.get_unchecked_mut(router.threads_id) }; - let mut js_obj = unsafe { - let reference = object_reference.get_unchecked_mut(router.threads_id); - - match reference.pop() { - Some(res) => res, - None => Self::backoff_get_object(reference).await, - } + let mut js_obj = match reference.pop() { + Some(res) => res, + None => Self::backoff_get_object(reference).await, }; - - let (send, rec) = oneshot::channel(); - js_obj.0 .0.store_self_data(req, send, body); + js_obj.0 .0.store_self_data(req, body); router.function.call( js_obj.0 .1, crate::napi::tsfn::ThreadsafeFunctionCallMode::NonBlocking, ); - let result = match rec.await { + let result = match js_obj.0 .0.reciever.recv().await { Ok(res) => Ok(res.apply_to_response()), Err(_) => get_failed_message(), }; unsafe { - let reference = object_reference.get_unchecked_mut(router.threads_id); + if reference.len() == reference.capacity() { + std::hint::unreachable_unchecked() + } + reference.push(js_obj); } @@ -182,7 +179,7 @@ async fn create_tls_server(config: ServerConfig) -> std::io::Result<()> { let srv = Server::build() .backlog(config.get_backlog_size() as _) .bind("walker_server_h1", &config.url, move |_| { - HttpService::build().finish(AppFactory(pool_size as usize)) + HttpService::build().finish(AppFactory(pool_size as usize)).rustls(certs.clone()) })? .workers(config.get_worker_thread() as usize) .run(); diff --git a/src/templates/mod.rs b/src/templates/mod.rs index 5406718..4b396e5 100644 --- a/src/templates/mod.rs +++ b/src/templates/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use ntex::util::{BytesMut, Buf, BufMut}; +use ntex::util::{BytesMut, BufMut}; use lazy_static::lazy_static; use napi::Result; use parking_lot::RwLock;