From aad0bc9c411a1d2353f200d040af1eca7c21ec22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?LoboGuardian=20=F0=9F=90=BA?= <30099451+LoboGuardian@users.noreply.github.com> Date: Sun, 13 Apr 2025 03:41:49 -0400 Subject: [PATCH 01/20] =?UTF-8?q?Update=20Cargo.toml,=20README,=20and=20CO?= =?UTF-8?q?NTRIBUTING=20link=20for=20better=20documentation=20and=20access?= =?UTF-8?q?ibility=20=F0=9F=A6=9C=F0=9F=95=BA=20(#32)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9c789b1..e5e9eae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,11 @@ documentation = "https://docs.rs/hteapot/" homepage = "https://github.com/az107/HTeaPot" repository = "https://github.com/az107/HTeaPot" keywords = ["http", "server", "web", "lightweight", "rust"] -categories = ["network-programming", "web-programming", "command-line-utilities"] +categories = [ + "network-programming", + "web-programming", + "command-line-utilities", +] exclude = ["config.toml", "demo/", "README.md"] [lib] @@ -21,4 +25,4 @@ path = "src/hteapot/mod.rs" name = "hteapot" [package.metadata.docs.rs] -no-readme = true \ No newline at end of file +no-readme = true From bfbb3f3fabeb047a1a4148b0d386f12e18ed6048 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Fri, 11 Apr 2025 17:47:30 +0000 Subject: [PATCH 02/20] Update the mod in src --- src/hteapot/mod.rs | 55 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index c03acd3..d41ac0a 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -1,5 +1,5 @@ // Written by Alberto Ruiz 2024-03-08 -// +// // This is the HTTP server module, it will handle the requests and responses // Also provides utilities to parse the requests and build the response @@ -17,11 +17,11 @@ //! ``` /// Submodules for HTTP functionality. -pub mod brew; // HTTP client implementation -mod methods; // HTTP method and status enums -mod request; // Request parsing and builder -mod response; // Response types and streaming -mod status; // Status code mapping +pub mod brew; // HTTP client implementation +mod methods; // HTTP method and status enums +mod request; // Request parsing and builder +mod response; // Response types and streaming +mod status; // Status code mapping // Internal types used for connection management use self::response::{EmptyHttpResponse, HttpResponseCommon, IterError}; @@ -76,6 +76,8 @@ pub struct Hteapot { port: u16, address: String, threads: u16, + shutdown_signal: Option>, + shutdown_hooks: Vec>, } /// Represents the state of a connection's lifecycle. @@ -95,12 +97,25 @@ struct SocketData { } impl Hteapot { + pub fn set_shutdown_signal(&mut self, signal: Arc) { + self.shutdown_signal = Some(signal); + } + + pub fn add_shutdown_hook(&mut self, hook: F) + where + F: Fn() + Send + Sync + 'static, + { + self.shutdown_hooks.push(Arc::new(hook)); + } + // Constructor pub fn new(address: &str, port: u16) -> Self { Hteapot { port, address: address.to_string(), threads: 1, + shutdown_signal: None, + shutdown_hooks: Vec::new(), } } @@ -109,6 +124,8 @@ impl Hteapot { port, address: address.to_string(), threads: if threads == 0 { 1 } else { threads }, + shutdown_signal: None, + shutdown_hooks: Vec::new(), } } @@ -132,10 +149,16 @@ impl Hteapot { Arc::new(Mutex::new(vec![0; self.threads as usize])); let arc_action = Arc::new(action); + // Clone shutdown_signal and share the shutdown_hooks via Arc + let shutdown_signal = self.shutdown_signal.clone(); + let shutdown_hooks = Arc::new(self.shutdown_hooks.clone()); + for thread_index in 0..self.threads { let pool_clone = pool.clone(); let action_clone = arc_action.clone(); let priority_list_clone = priority_list.clone(); + let shutdown_signal_clone = shutdown_signal.clone(); + let shutdown_hooks_clone = shutdown_hooks.clone(); thread::spawn(move || { let mut streams_to_handle = Vec::new(); @@ -146,10 +169,20 @@ impl Hteapot { if streams_to_handle.is_empty() { // Store the returned guard back into pool - pool = cvar.wait_while(pool, |pool| pool.is_empty()) + pool = cvar + .wait_while(pool, |pool| pool.is_empty()) .expect("Error waiting on cvar"); } + if let Some(signal) = &shutdown_signal_clone { + if !signal.load(Ordering::SeqCst) { + for hook in shutdown_hooks_clone.iter() { + hook(); + } + break; // Exit the server loop + } + } + while let Some(stream) = pool.pop_back() { let socket_status = SocketStatus { ttl: Instant::now(), @@ -216,10 +249,9 @@ impl Hteapot { ) -> Option<()> { let status = socket_data.status.as_mut()?; - // Fix by miky-rola 2025-04-08 // Check if the TTL (time-to-live) for the connection has expired. // If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written, - // the connection is gracefully shut down to free resources. + // the connection is gracefully shut down to free resources. if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { let _ = socket_data.stream.shutdown(Shutdown::Both); return None; @@ -286,8 +318,7 @@ impl Hteapot { status.response = response; } - // Write the response to the client in chunks using the `peek` and `next` methods. - // This ensures that large responses are sent incrementally without blocking the server. + // Write the response to the client in chunks loop { match status.response.peek() { Ok(n) => match socket_data.stream.write(&n) { @@ -390,4 +421,4 @@ mod tests { assert!(response_str.contains("Server: HTeaPot/")); assert!(second_response_str.contains("Second Request")); } -} \ No newline at end of file +} From 03bc97d0dfe42c4ccf21ad85e07e4431c193716d Mon Sep 17 00:00:00 2001 From: miky-rola Date: Fri, 11 Apr 2025 17:47:52 +0000 Subject: [PATCH 03/20] Add shutdown logger --- src/logger.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index c7799ef..c18dfdb 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,9 +1,9 @@ +use std::fmt; use std::io::Write; -use std::sync::mpsc::{channel, Sender}; +use std::sync::Arc; +use std::sync::mpsc::{Sender, channel}; use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use std::fmt; -use std::sync::Arc; /// Differnt log levels #[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Copy)] @@ -86,7 +86,15 @@ impl SimpleTime { // calculate millisecs from nanosecs let millis = nanos / 1_000_000; - (year, month as u32 + 1, day as u32, hour, minute, second, millis) + ( + year, + month as u32 + 1, + day as u32, + hour, + minute, + second, + millis, + ) } /// Returns a formatted timestamp string for the current system time. @@ -133,7 +141,7 @@ impl Logger { pub fn new( mut writer: W, min_level: LogLevel, - component: &str + component: &str, ) -> Logger { let (tx, rx) = channel::(); let thread = thread::spawn(move || { @@ -151,7 +159,7 @@ impl Logger { msg.timestamp, msg.level, msg.component, msg.content ); buff.push(formatted); - }, + } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} Err(_) => break, } @@ -238,8 +246,8 @@ impl Logger { pub fn fatal(&self, content: String) { self.log(LogLevel::FATAL, content); } - /// Log a message with TRACE level - #[allow(dead_code)] + /// Log a message with TRACE level + #[allow(dead_code)] pub fn trace(&self, content: String) { self.log(LogLevel::TRACE, content); } @@ -255,20 +263,19 @@ impl Clone for Logger { } } - #[cfg(test)] mod tests { use super::*; use std::io::stdout; - + #[test] fn test_basic() { let logs = Logger::new(stdout(), LogLevel::DEBUG, "test"); logs.info("test message".to_string()); logs.debug("debug info".to_string()); - + // Create a sub-logger with a different component let sub_logger = logs.with_component("sub-component"); sub_logger.warn("warning from sub-component".to_string()); } -} \ No newline at end of file +} From 6ca4a75f4ec4528420f08c7fda56a4203246601f Mon Sep 17 00:00:00 2001 From: miky-rola Date: Fri, 11 Apr 2025 17:48:04 +0000 Subject: [PATCH 04/20] Add the handler for shutdown --- src/main.rs | 136 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 94 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4b41751..2f9d7ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,16 +40,21 @@ pub mod hteapot; mod logger; mod utils; -use std::{fs, io, path::PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::Duration; + use std::path::Path; use std::sync::Mutex; +use std::{fs, io, path::PathBuf}; use cache::Cache; use config::Config; use hteapot::{Hteapot, HttpRequest, HttpResponse, HttpStatus}; use utils::get_mime_tipe; -use logger::{Logger, LogLevel}; +use logger::{LogLevel, Logger}; use std::time::Instant; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -79,13 +84,13 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); fn safe_join_paths(root: &str, requested_path: &str) -> Option { let root_path = Path::new(root).canonicalize().ok()?; let requested_full_path = root_path.join(requested_path.trim_start_matches("/")); - + if !requested_full_path.exists() { return None; } - + let canonical_path = requested_full_path.canonicalize().ok()?; - + if canonical_path.starts_with(&root_path) { Some(canonical_path) } else { @@ -143,14 +148,13 @@ fn serve_file(path: &PathBuf) -> Option> { let r = fs::read(path); if r.is_ok() { Some(r.unwrap()) } else { None } } -// +// // Suggest to use .ok()? instead of manual unwrap/if is_ok for more idiomatic error handling: // fn serve_file(path: &PathBuf) -> Option> { - // fs::read(path).ok() +// fs::read(path).ok() // } -// -// - +// +// /// Main entry point of the Hteapot server. /// @@ -169,6 +173,53 @@ fn serve_file(path: &PathBuf) -> Option> { /// - Optional response caching /// - HTTP server via [`Hteapot::new_threaded`](crate::hteapot::Hteapot::new_threaded) +fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { + let running = Arc::new(AtomicBool::new(true)); + let r_server = running.clone(); + let shutdown_logger = logger.with_component("shutdown"); + + #[cfg(windows)] + { + let r_win = running.clone(); + let win_logger = shutdown_logger.clone(); + + if !win_console::set_handler(r_win, win_logger.clone()) { + win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); + } else { + win_logger.info("Windows Ctrl+C handler registered".to_string()); + } + } + + let r_enter = running.clone(); + let enter_logger = shutdown_logger.clone(); + + thread::spawn(move || { + println!(" Ctrl+C to shutdown the server gracefully..."); + let mut buffer = String::new(); + let _ = std::io::stdin().read_line(&mut buffer); + enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); + r_enter.store(false, Ordering::SeqCst); + }); + + // Set up server with shutdown signal + server.set_shutdown_signal(r_server); + + // Add shutdown hook for cleanup + let shutdown_logger_clone = shutdown_logger.clone(); + server.add_shutdown_hook(move || { + shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); + + thread::sleep(Duration::from_secs(3)); + + shutdown_logger_clone.info("Server shutdown complete.".to_string()); + + std::process::exit(0); + }); + + // Return the running flag so the main thread can also check it + running +} + fn main() { // Parse CLI args and handle --help / --version / --serve flags let args = std::env::args().collect::>(); @@ -177,7 +228,7 @@ fn main() { println!("usage: {} ", args[0]); return; } - + // Initialize logger based on config or default to stdout let config = match args[1].as_str() { "--help" | "-h" => { @@ -223,22 +274,23 @@ fn main() { // Initialize the logger based on the config or default to stdout if the log file can't be created let logger = match config.log_file.clone() { Some(file_name) => { - let file = fs::File::create(file_name.clone()); // Attempt to create the log file - match file { // If creating the file fails, log to stdout instead - Ok(file) => Logger::new(file, LogLevel::INFO, "main"), // If successful, use the file + let file = fs::File::create(file_name.clone()); // Attempt to create the log file + match file { + // If creating the file fails, log to stdout instead + Ok(file) => Logger::new(file, LogLevel::INFO, "main"), // If successful, use the file Err(e) => { println!("Failed to create log file: {:?}. Using stdout instead.", e); - Logger::new(io::stdout(), LogLevel::INFO, "main") // Log to stdout + Logger::new(io::stdout(), LogLevel::INFO, "main") // Log to stdout } } } - None => Logger::new(io::stdout(), LogLevel::INFO, "main"), // If no log file is specified, use stdout + None => Logger::new(io::stdout(), LogLevel::INFO, "main"), // If no log file is specified, use stdout }; // Set up the cache with thread-safe locking // The Mutex ensures that only one thread can access the cache at a time, // preventing race conditions when reading and writing to the cache. - let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); // Initialize the cache with TTL + let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); // Initialize the cache with TTL // Create a new threaded HTTP server with the provided host, port, and number of threads let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); @@ -246,13 +298,13 @@ fn main() { logger.info(format!( "Server started at http://{}:{}", config.host, config.port - )); // Log that the server has started + )); // Log that the server has started // Log whether the cache is enabled based on the config setting if config.cache { logger.info("Cache Enabled".to_string()); } - + // If proxy-only mode is enabled, issue a warning that local paths won't be used if proxy_only { logger @@ -268,9 +320,9 @@ fn main() { // Start listening for HTTP requests server.listen(move |req| { // SERVER CORE: For each incoming request, we handle it in this closure - let start_time = Instant::now(); // Track request processing time - let req_method = req.method.to_str(); // Get the HTTP method (e.g., GET, POST) - let req_path = req.path.clone(); // Get the requested path + let start_time = Instant::now(); // Track request processing time + let req_method = req.method.to_str(); // Get the HTTP method (e.g., GET, POST) + let req_path = req.path.clone(); // Get the requested path // Log the incoming request method and path http_logger.info(format!("Request {} {}", req_method, req.path)); @@ -279,22 +331,21 @@ fn main() { let is_proxy = is_proxy(&config, req.clone()); if proxy_only || is_proxy.is_some() { // If proxying is enabled or this request matches a proxy rule, handle it - let (host, proxy_req) = is_proxy.unwrap(); // Get the target host and modified request + let (host, proxy_req) = is_proxy.unwrap(); // Get the target host and modified request proxy_logger.info(format!( "Proxying request {} {} to {}", req_method, req_path, host )); - // Perform the proxy request (forward the request to the target server) let res = proxy_req.brew(host.as_str()); - let elapsed = start_time.elapsed(); // Measure the time taken to process the proxy request + let elapsed = start_time.elapsed(); // Measure the time taken to process the proxy request if res.is_ok() { // If the proxy request is successful, log the time taken and return the response let response = res.unwrap(); proxy_logger.info(format!( "Proxy request processed in {:.6}ms", - elapsed.as_secs_f64() * 1000.0 // Log the time taken in milliseconds + elapsed.as_secs_f64() * 1000.0 // Log the time taken in milliseconds )); return response; } else { @@ -316,12 +367,12 @@ fn main() { // If the root path exists and is valid, try to join the index file let index_path = root_path.unwrap().join(&config.index); if index_path.exists() { - Some(index_path) // If index exists, return its path + Some(index_path) // If index exists, return its path } else { - None // If no index exists, return None + None // If no index exists, return None } } else { - None // If the root path is invalid, return None + None // If the root path is invalid, return None } } else { // For any other path, resolve it safely using the `safe_join_paths` function @@ -335,16 +386,17 @@ fn main() { // If it's a directory, check for the index file in that directory let index_path = path.join(&config.index); if index_path.exists() { - index_path // If index exists, return its path + index_path // If index exists, return its path } else { // If no index file exists, log a warning and return a 404 response - http_logger.warn(format!("Index file not found in directory: {}", req.path)); + http_logger + .warn(format!("Index file not found in directory: {}", req.path)); return HttpResponse::new(HttpStatus::NotFound, "Index not found", None); } } else { - path // If it's not a directory, just return the path + path // If it's not a directory, just return the path } - }, + } None => { // If the path is invalid or access is denied, return a 404 response http_logger.warn(format!("Path not found or access denied: {}", req.path)); @@ -359,9 +411,9 @@ fn main() { let content: Option> = if config.cache { // Lock the cache to ensure thread-safe access let mut cachee = cache.lock().expect("Error locking cache"); - let cache_start = Instant::now(); // Track cache operation time - let cache_key = req.path.clone(); // Use the request path as the cache key - let mut r = cachee.get(cache_key.clone()); // Try to get the content from cache + let cache_start = Instant::now(); // Track cache operation time + let cache_key = req.path.clone(); // Use the request path as the cache key + let mut r = cachee.get(cache_key.clone()); // Try to get the content from cache if r.is_none() { // If cache miss, read the file from disk and store it in cache cache_logger.debug(format!("cache miss for {}", cache_key)); @@ -379,10 +431,10 @@ fn main() { // Log how long the cache operation took let cache_elapsed = cache_start.elapsed(); cache_logger.debug(format!( - "Cache operation completed in {:.6}µs", + "Cache operation completed in {:.6}µs", cache_elapsed.as_micros() )); - r // Return the cached content (or None if not found) + r // Return the cached content (or None if not found) } else { // If cache is disabled, read the file from disk serve_file(&safe_path) @@ -392,7 +444,7 @@ fn main() { let elapsed = start_time.elapsed(); http_logger.info(format!( "Request processed in {:.6}ms", - elapsed.as_secs_f64() * 1000.0 // Log the time taken in milliseconds + elapsed.as_secs_f64() * 1000.0 // Log the time taken in milliseconds )); // If content was found, return it with the appropriate headers, otherwise return a 404 @@ -404,11 +456,11 @@ fn main() { "X-Content-Type-Options" => "nosniff" ); HttpResponse::new(HttpStatus::OK, c, headers) - }, + } None => { // If no content is found, return a 404 Not Found response HttpResponse::new(HttpStatus::NotFound, "Not found", None) - }, + } } }); -} \ No newline at end of file +} From 096ee8173021ceb3295cfb26be895b8c7e3ae280 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sat, 12 Apr 2025 14:47:11 +0000 Subject: [PATCH 05/20] Use libc to handle unix --- Cargo.lock | 2 +- Cargo.toml | 1 + src/main.rs | 48 +----------------------------------------------- 3 files changed, 3 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c7d917..4bf5bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "hteapot" diff --git a/Cargo.toml b/Cargo.toml index e5e9eae..ae0d7ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,5 +24,6 @@ path = "src/hteapot/mod.rs" [[bin]] name = "hteapot" + [package.metadata.docs.rs] no-readme = true diff --git a/src/main.rs b/src/main.rs index 2f9d7ed..cdb4e4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,6 +81,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); /// let safe_path = safe_join_paths("/var/www", "/index.html"); /// assert!(safe_path.unwrap().ends_with("index.html")); /// ``` + fn safe_join_paths(root: &str, requested_path: &str) -> Option { let root_path = Path::new(root).canonicalize().ok()?; let requested_full_path = root_path.join(requested_path.trim_start_matches("/")); @@ -173,53 +174,6 @@ fn serve_file(path: &PathBuf) -> Option> { /// - Optional response caching /// - HTTP server via [`Hteapot::new_threaded`](crate::hteapot::Hteapot::new_threaded) -fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { - let running = Arc::new(AtomicBool::new(true)); - let r_server = running.clone(); - let shutdown_logger = logger.with_component("shutdown"); - - #[cfg(windows)] - { - let r_win = running.clone(); - let win_logger = shutdown_logger.clone(); - - if !win_console::set_handler(r_win, win_logger.clone()) { - win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); - } else { - win_logger.info("Windows Ctrl+C handler registered".to_string()); - } - } - - let r_enter = running.clone(); - let enter_logger = shutdown_logger.clone(); - - thread::spawn(move || { - println!(" Ctrl+C to shutdown the server gracefully..."); - let mut buffer = String::new(); - let _ = std::io::stdin().read_line(&mut buffer); - enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); - r_enter.store(false, Ordering::SeqCst); - }); - - // Set up server with shutdown signal - server.set_shutdown_signal(r_server); - - // Add shutdown hook for cleanup - let shutdown_logger_clone = shutdown_logger.clone(); - server.add_shutdown_hook(move || { - shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); - - thread::sleep(Duration::from_secs(3)); - - shutdown_logger_clone.info("Server shutdown complete.".to_string()); - - std::process::exit(0); - }); - - // Return the running flag so the main thread can also check it - running -} - fn main() { // Parse CLI args and handle --help / --version / --serve flags let args = std::env::args().collect::>(); From a6bb98fcc62dc9f71b3393bc1309446f7adbc477 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sat, 12 Apr 2025 15:09:11 +0000 Subject: [PATCH 06/20] Add shutdown.rs --- src/shutdown.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 src/shutdown.rs diff --git a/src/shutdown.rs b/src/shutdown.rs new file mode 100644 index 0000000..de0e5fb --- /dev/null +++ b/src/shutdown.rs @@ -0,0 +1,168 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use crate::hteapot::Hteapot; +use crate::logger::Logger; + +pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { + let running = Arc::new(AtomicBool::new(true)); + let r_server = running.clone(); + let shutdown_logger = logger.with_component("shutdown"); + + #[cfg(unix)] + { + let r_unix = running.clone(); + let unix_logger = shutdown_logger.clone(); + unix_signal::register_signal_handler(r_unix, unix_logger); + } + + #[cfg(windows)] + { + let r_win = running.clone(); + let win_logger = shutdown_logger.clone(); + + if !win_console::set_handler(r_win, win_logger.clone()) { + win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); + } else { + win_logger.info("Windows Ctrl+C handler registered".to_string()); + } + } + + let r_enter = running.clone(); + let enter_logger = shutdown_logger.clone(); + + thread::spawn(move || { + println!(" Ctrl+C to shutdown the server gracefully..."); + let mut buffer = String::new(); + let _ = std::io::stdin().read_line(&mut buffer); + enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); + r_enter.store(false, Ordering::SeqCst); + }); + + // Set up server with shutdown signal + server.set_shutdown_signal(r_server); + + // Add shutdown hook for cleanup + let shutdown_logger_clone = shutdown_logger.clone(); + server.add_shutdown_hook(move || { + shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); + + thread::sleep(Duration::from_secs(3)); + + shutdown_logger_clone.info("Server shutdown complete.".to_string()); + + std::process::exit(0); + }); + + // Return the running flag so the main thread can also check it + running +} + +#[cfg(unix)] +pub mod unix_signal { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::ptr; + use std::mem; + + use libc::{c_int, c_void, sigaction, sighandler_t, sigset_t}; + use libc::{SA_RESTART, SIGINT, SIG_IGN}; + + use crate::logger::Logger; + + // Global variables to store the signal handler state + static mut RUNNING: Option> = None; + static mut LOGGER: Option = None; + + // Signal handler function + extern "C" fn handle_signal(_: c_int) { + unsafe { + if let Some(running) = &RUNNING { + if let Some(logger) = &LOGGER { + logger.info("SIGINT received, initiating graceful shutdown...".to_string()); + } + running.store(false, Ordering::SeqCst); + } + } + } + + pub fn register_signal_handler(running: Arc, logger: Logger) { + unsafe { + // Store our state in global variables + RUNNING = Some(running); + LOGGER = Some(logger.clone()); + + // Set up the sigaction struct + let mut sigact: sigaction = mem::zeroed(); + sigact.sa_handler = handle_signal as sighandler_t; + sigact.sa_flags = SA_RESTART; + + // Empty the signal mask + sigemptyset(&mut sigact.sa_mask); + + // Register our signal handler for SIGINT + if sigaction(SIGINT, &sigact, ptr::null_mut()) < 0 { + logger.error("Failed to set SIGINT handler".to_string()); + } else { + logger.info("SIGINT handler registered".to_string()); + } + } + } + + // Helper function to create an empty signal set + unsafe fn sigemptyset(set: *mut sigset_t) { + ptr::write_bytes(set, 0, 1); + } +} + +#[cfg(windows)] +pub mod win_console { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::sync::Mutex; + + // Define the external Windows API function in an unsafe extern block + unsafe extern "system" { + pub fn SetConsoleCtrlHandler( + handler: Option i32>, + add: i32, + ) -> i32; + } + + pub const CTRL_C_EVENT: u32 = 0; + + struct StaticData { + running: Option>, + logger: Option, + } + + // We need to ensure thread safety, so use a Mutex for it + static HANDLER_DATA: Mutex = Mutex::new(StaticData { + running: None, + logger: None, + }); + + pub fn set_handler(running: Arc, logger: crate::logger::Logger) -> bool { + // Store references in the mutex-protected static + let mut data = HANDLER_DATA.lock().unwrap(); + data.running = Some(running); + data.logger = Some(logger); + + unsafe extern "system" fn handler_func(ctrl_type: u32) -> i32 { + if ctrl_type == CTRL_C_EVENT { + if let Ok(data) = HANDLER_DATA.lock() { + if let (Some(r), Some(l)) = (&data.running, &data.logger) { + l.info("initiating graceful shutdown...".to_string()); + r.store(false, Ordering::SeqCst); + std::process::exit(0); + } + } + } + 0 + } + + unsafe { SetConsoleCtrlHandler(Some(handler_func), 1) != 0 } + } +} \ No newline at end of file From dceb9c54207c42aa40cb45e51b02471e31033270 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Fri, 11 Apr 2025 17:47:30 +0000 Subject: [PATCH 07/20] Update the mod in src --- src/hteapot/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index d41ac0a..a695579 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -250,15 +250,13 @@ impl Hteapot { let status = socket_data.status.as_mut()?; // Check if the TTL (time-to-live) for the connection has expired. - // If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written, - // the connection is gracefully shut down to free resources. if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write { let _ = socket_data.stream.shutdown(Shutdown::Both); return None; } + // If the request is not yet complete, read data from the stream into a buffer. // This ensures that the server can handle partial or chunked requests. - if !status.request.done { let mut buffer = [0; BUFFER_SIZE]; match socket_data.stream.read(&mut buffer) { From 2cd322dd65dc51dcd50eac6a778e592a604e4b1f Mon Sep 17 00:00:00 2001 From: miky-rola Date: Fri, 11 Apr 2025 17:48:04 +0000 Subject: [PATCH 08/20] Add the handler for shutdown --- src/main.rs | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/main.rs b/src/main.rs index cdb4e4c..597b643 100644 --- a/src/main.rs +++ b/src/main.rs @@ -174,6 +174,53 @@ fn serve_file(path: &PathBuf) -> Option> { /// - Optional response caching /// - HTTP server via [`Hteapot::new_threaded`](crate::hteapot::Hteapot::new_threaded) +fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { + let running = Arc::new(AtomicBool::new(true)); + let r_server = running.clone(); + let shutdown_logger = logger.with_component("shutdown"); + + #[cfg(windows)] + { + let r_win = running.clone(); + let win_logger = shutdown_logger.clone(); + + if !win_console::set_handler(r_win, win_logger.clone()) { + win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); + } else { + win_logger.info("Windows Ctrl+C handler registered".to_string()); + } + } + + let r_enter = running.clone(); + let enter_logger = shutdown_logger.clone(); + + thread::spawn(move || { + println!(" Ctrl+C to shutdown the server gracefully..."); + let mut buffer = String::new(); + let _ = std::io::stdin().read_line(&mut buffer); + enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); + r_enter.store(false, Ordering::SeqCst); + }); + + // Set up server with shutdown signal + server.set_shutdown_signal(r_server); + + // Add shutdown hook for cleanup + let shutdown_logger_clone = shutdown_logger.clone(); + server.add_shutdown_hook(move || { + shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); + + thread::sleep(Duration::from_secs(3)); + + shutdown_logger_clone.info("Server shutdown complete.".to_string()); + + std::process::exit(0); + }); + + // Return the running flag so the main thread can also check it + running +} + fn main() { // Parse CLI args and handle --help / --version / --serve flags let args = std::env::args().collect::>(); From b523ad76c1c40ee369ed98e60a445ec72b9a11f2 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sat, 12 Apr 2025 14:47:11 +0000 Subject: [PATCH 09/20] Use libc to handle unix --- src/hteapot/request.rs | 1 - src/main.rs | 48 ------------------------------------------ 2 files changed, 49 deletions(-) diff --git a/src/hteapot/request.rs b/src/hteapot/request.rs index 4b8f939..fafadb3 100644 --- a/src/hteapot/request.rs +++ b/src/hteapot/request.rs @@ -190,7 +190,6 @@ impl HttpRequestBuilder { if parts.len() != 3 { return Err("Invalid method + path + version request"); } - self.request.method = HttpMethod::from_str(parts[0]); let path_parts: Vec<&str> = parts[1].split('?').collect(); self.request.path = path_parts[0].to_string(); diff --git a/src/main.rs b/src/main.rs index 597b643..10478c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,6 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); /// let safe_path = safe_join_paths("/var/www", "/index.html"); /// assert!(safe_path.unwrap().ends_with("index.html")); /// ``` - fn safe_join_paths(root: &str, requested_path: &str) -> Option { let root_path = Path::new(root).canonicalize().ok()?; let requested_full_path = root_path.join(requested_path.trim_start_matches("/")); @@ -174,53 +173,6 @@ fn serve_file(path: &PathBuf) -> Option> { /// - Optional response caching /// - HTTP server via [`Hteapot::new_threaded`](crate::hteapot::Hteapot::new_threaded) -fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { - let running = Arc::new(AtomicBool::new(true)); - let r_server = running.clone(); - let shutdown_logger = logger.with_component("shutdown"); - - #[cfg(windows)] - { - let r_win = running.clone(); - let win_logger = shutdown_logger.clone(); - - if !win_console::set_handler(r_win, win_logger.clone()) { - win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); - } else { - win_logger.info("Windows Ctrl+C handler registered".to_string()); - } - } - - let r_enter = running.clone(); - let enter_logger = shutdown_logger.clone(); - - thread::spawn(move || { - println!(" Ctrl+C to shutdown the server gracefully..."); - let mut buffer = String::new(); - let _ = std::io::stdin().read_line(&mut buffer); - enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); - r_enter.store(false, Ordering::SeqCst); - }); - - // Set up server with shutdown signal - server.set_shutdown_signal(r_server); - - // Add shutdown hook for cleanup - let shutdown_logger_clone = shutdown_logger.clone(); - server.add_shutdown_hook(move || { - shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); - - thread::sleep(Duration::from_secs(3)); - - shutdown_logger_clone.info("Server shutdown complete.".to_string()); - - std::process::exit(0); - }); - - // Return the running flag so the main thread can also check it - running -} - fn main() { // Parse CLI args and handle --help / --version / --serve flags let args = std::env::args().collect::>(); From e317b28a2acb7b4d70ee056a7e5668156d315d69 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sun, 13 Apr 2025 09:29:29 +0000 Subject: [PATCH 10/20] Fix shutdown module compilation error --- src/shutdown.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index de0e5fb..af219a2 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -96,7 +96,8 @@ pub mod unix_signal { // Set up the sigaction struct let mut sigact: sigaction = mem::zeroed(); - sigact.sa_handler = handle_signal as sighandler_t; + // Fix: Use the correct field name for the handler + sigact.sa_sigaction = handle_signal as sighandler_t; sigact.sa_flags = SA_RESTART; // Empty the signal mask @@ -113,7 +114,10 @@ pub mod unix_signal { // Helper function to create an empty signal set unsafe fn sigemptyset(set: *mut sigset_t) { - ptr::write_bytes(set, 0, 1); + // Fix: Add unsafe block around the unsafe operation + unsafe { + ptr::write_bytes(set, 0, 1); + } } } From 453153893fc63f6418d64a1e4fba9b5f3d9e9bd7 Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sun, 13 Apr 2025 09:31:57 +0000 Subject: [PATCH 11/20] Fix shared reference in unix --- src/shutdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index af219a2..803e2a4 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -79,8 +79,8 @@ pub mod unix_signal { // Signal handler function extern "C" fn handle_signal(_: c_int) { unsafe { - if let Some(running) = &RUNNING { - if let Some(logger) = &LOGGER { + if let Some(running) = &raw const &RUNNING { + if let Some(logger) = &raw const &LOGGER { logger.info("SIGINT received, initiating graceful shutdown...".to_string()); } running.store(false, Ordering::SeqCst); From ed978d25875eccd6e98a00ff97454be8713a1c6a Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sun, 13 Apr 2025 09:35:49 +0000 Subject: [PATCH 12/20] Fix shared reference in unix --- src/shutdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index 803e2a4..af219a2 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -79,8 +79,8 @@ pub mod unix_signal { // Signal handler function extern "C" fn handle_signal(_: c_int) { unsafe { - if let Some(running) = &raw const &RUNNING { - if let Some(logger) = &raw const &LOGGER { + if let Some(running) = &RUNNING { + if let Some(logger) = &LOGGER { logger.info("SIGINT received, initiating graceful shutdown...".to_string()); } running.store(false, Ordering::SeqCst); From 61961d8478a8e7f1f75ca5d357ac6b089ec252ef Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sun, 13 Apr 2025 09:38:02 +0000 Subject: [PATCH 13/20] Access the global without shared reference --- src/shutdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index af219a2..2a850d2 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -79,8 +79,8 @@ pub mod unix_signal { // Signal handler function extern "C" fn handle_signal(_: c_int) { unsafe { - if let Some(running) = &RUNNING { - if let Some(logger) = &LOGGER { + if let Some(running) = RUNNING.as_ref() { + if let Some(logger) = LOGGER.as_ref() { logger.info("SIGINT received, initiating graceful shutdown...".to_string()); } running.store(false, Ordering::SeqCst); From 7147d4e7f91e20a68a02219effb29e663f6f002a Mon Sep 17 00:00:00 2001 From: miky-rola Date: Sun, 13 Apr 2025 09:45:01 +0000 Subject: [PATCH 14/20] Fix temporary value errors in shutdown.rs --- src/shutdown.rs | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index 2a850d2..c78230a 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -67,36 +67,25 @@ pub mod unix_signal { use std::ptr; use std::mem; - use libc::{c_int, c_void, sigaction, sighandler_t, sigset_t}; - use libc::{SA_RESTART, SIGINT, SIG_IGN}; + use libc::{c_int, sigaction, sighandler_t, sigset_t}; + use libc::{SA_RESTART, SIGINT}; use crate::logger::Logger; - // Global variables to store the signal handler state - static mut RUNNING: Option> = None; - static mut LOGGER: Option = None; + // Thread-safe flag to indicate signal received + static mut SIGNAL_RECEIVED: bool = false; - // Signal handler function + // Signal handler function - minimal to avoid UB extern "C" fn handle_signal(_: c_int) { unsafe { - if let Some(running) = RUNNING.as_ref() { - if let Some(logger) = LOGGER.as_ref() { - logger.info("SIGINT received, initiating graceful shutdown...".to_string()); - } - running.store(false, Ordering::SeqCst); - } + SIGNAL_RECEIVED = true; } } pub fn register_signal_handler(running: Arc, logger: Logger) { unsafe { - // Store our state in global variables - RUNNING = Some(running); - LOGGER = Some(logger.clone()); - // Set up the sigaction struct let mut sigact: sigaction = mem::zeroed(); - // Fix: Use the correct field name for the handler sigact.sa_sigaction = handle_signal as sighandler_t; sigact.sa_flags = SA_RESTART; @@ -106,18 +95,32 @@ pub mod unix_signal { // Register our signal handler for SIGINT if sigaction(SIGINT, &sigact, ptr::null_mut()) < 0 { logger.error("Failed to set SIGINT handler".to_string()); + return; } else { logger.info("SIGINT handler registered".to_string()); } } + + // Start a monitoring thread that periodically checks the signal flag + // and updates the running atomic + let monitor_logger = logger.clone(); + std::thread::spawn(move || { + while running.load(Ordering::SeqCst) { + unsafe { + if SIGNAL_RECEIVED { + monitor_logger.info("SIGINT received, initiating graceful shutdown...".to_string()); + running.store(false, Ordering::SeqCst); + break; + } + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + }); } // Helper function to create an empty signal set unsafe fn sigemptyset(set: *mut sigset_t) { - // Fix: Add unsafe block around the unsafe operation - unsafe { - ptr::write_bytes(set, 0, 1); - } + ptr::write_bytes(set, 0, 1); } } From 5a01f8f28cefd08d5c3d3e5e6cc48a0657558898 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sun, 13 Apr 2025 21:20:25 +0200 Subject: [PATCH 15/20] Simplified exit logic in unix Add force quiting for repeatedly ctrl+c press --- src/hteapot/mod.rs | 1 - src/shutdown.rs | 132 +++++++++++++-------------------------------- 2 files changed, 38 insertions(+), 95 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index a695579..975f0e2 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -166,7 +166,6 @@ impl Hteapot { { let (lock, cvar) = &*pool_clone; let mut pool = lock.lock().expect("Error locking pool"); - if streams_to_handle.is_empty() { // Store the returned guard back into pool pool = cvar diff --git a/src/shutdown.rs b/src/shutdown.rs index c78230a..89c9ae2 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,134 +1,78 @@ -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; +use std::{ptr, thread}; use crate::hteapot::Hteapot; use crate::logger::Logger; pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { let running = Arc::new(AtomicBool::new(true)); - let r_server = running.clone(); let shutdown_logger = logger.with_component("shutdown"); - + #[cfg(unix)] { - let r_unix = running.clone(); - let unix_logger = shutdown_logger.clone(); - unix_signal::register_signal_handler(r_unix, unix_logger); + static mut RUNNING_PTR: *const AtomicBool = ptr::null(); + static COUNTER_PTR: AtomicUsize = AtomicUsize::new(0); + + extern "C" fn handle_sigint(_: i32) { + unsafe { + if COUNTER_PTR.load(Ordering::SeqCst) < 9 { + COUNTER_PTR.fetch_add(1, Ordering::SeqCst); + if COUNTER_PTR.load(Ordering::SeqCst) == 9 { + println!("\rPress ctrl+c one more time to force quit"); + } + } else { + println!("\rForcing exit, now!"); + std::process::exit(0); + } + + if !RUNNING_PTR.is_null() { + (*RUNNING_PTR).store(false, Ordering::SeqCst); + } + } + } + + unsafe { + RUNNING_PTR = Arc::as_ptr(&running); + libc::signal(libc::SIGINT, handle_sigint as usize); + } } - + #[cfg(windows)] { let r_win = running.clone(); let win_logger = shutdown_logger.clone(); - + if !win_console::set_handler(r_win, win_logger.clone()) { win_logger.error("Failed to set Windows Ctrl+C handler".to_string()); } else { win_logger.info("Windows Ctrl+C handler registered".to_string()); } } - - let r_enter = running.clone(); - let enter_logger = shutdown_logger.clone(); - - thread::spawn(move || { - println!(" Ctrl+C to shutdown the server gracefully..."); - let mut buffer = String::new(); - let _ = std::io::stdin().read_line(&mut buffer); - enter_logger.info("Enter pressed, initiating graceful shutdown...".to_string()); - r_enter.store(false, Ordering::SeqCst); - }); - - // Set up server with shutdown signal - server.set_shutdown_signal(r_server); - + // Add shutdown hook for cleanup let shutdown_logger_clone = shutdown_logger.clone(); server.add_shutdown_hook(move || { shutdown_logger_clone.info("Waiting for ongoing requests to complete...".to_string()); - + thread::sleep(Duration::from_secs(3)); - + shutdown_logger_clone.info("Server shutdown complete.".to_string()); - + std::process::exit(0); }); - + + server.set_shutdown_signal(running.clone()); // Return the running flag so the main thread can also check it running } -#[cfg(unix)] -pub mod unix_signal { - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - use std::ptr; - use std::mem; - - use libc::{c_int, sigaction, sighandler_t, sigset_t}; - use libc::{SA_RESTART, SIGINT}; - - use crate::logger::Logger; - - // Thread-safe flag to indicate signal received - static mut SIGNAL_RECEIVED: bool = false; - - // Signal handler function - minimal to avoid UB - extern "C" fn handle_signal(_: c_int) { - unsafe { - SIGNAL_RECEIVED = true; - } - } - - pub fn register_signal_handler(running: Arc, logger: Logger) { - unsafe { - // Set up the sigaction struct - let mut sigact: sigaction = mem::zeroed(); - sigact.sa_sigaction = handle_signal as sighandler_t; - sigact.sa_flags = SA_RESTART; - - // Empty the signal mask - sigemptyset(&mut sigact.sa_mask); - - // Register our signal handler for SIGINT - if sigaction(SIGINT, &sigact, ptr::null_mut()) < 0 { - logger.error("Failed to set SIGINT handler".to_string()); - return; - } else { - logger.info("SIGINT handler registered".to_string()); - } - } - - // Start a monitoring thread that periodically checks the signal flag - // and updates the running atomic - let monitor_logger = logger.clone(); - std::thread::spawn(move || { - while running.load(Ordering::SeqCst) { - unsafe { - if SIGNAL_RECEIVED { - monitor_logger.info("SIGINT received, initiating graceful shutdown...".to_string()); - running.store(false, Ordering::SeqCst); - break; - } - } - std::thread::sleep(std::time::Duration::from_millis(50)); - } - }); - } - - // Helper function to create an empty signal set - unsafe fn sigemptyset(set: *mut sigset_t) { - ptr::write_bytes(set, 0, 1); - } -} - #[cfg(windows)] pub mod win_console { - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::Mutex; + use std::sync::atomic::{AtomicBool, Ordering}; // Define the external Windows API function in an unsafe extern block unsafe extern "system" { @@ -172,4 +116,4 @@ pub mod win_console { unsafe { SetConsoleCtrlHandler(Some(handler_func), 1) != 0 } } -} \ No newline at end of file +} From b34f164e68fd4f1e1592228e363863f8e678d7c6 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Sun, 13 Apr 2025 21:52:49 +0200 Subject: [PATCH 16/20] Move close logic to main thread. --- src/hteapot/mod.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 975f0e2..de0647e 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -158,7 +158,6 @@ impl Hteapot { let action_clone = arc_action.clone(); let priority_list_clone = priority_list.clone(); let shutdown_signal_clone = shutdown_signal.clone(); - let shutdown_hooks_clone = shutdown_hooks.clone(); thread::spawn(move || { let mut streams_to_handle = Vec::new(); @@ -173,13 +172,14 @@ impl Hteapot { .expect("Error waiting on cvar"); } - if let Some(signal) = &shutdown_signal_clone { - if !signal.load(Ordering::SeqCst) { - for hook in shutdown_hooks_clone.iter() { - hook(); + if let Some(signal) = &shutdown_signal_clone { + if !signal.load(Ordering::SeqCst) { + break; // Exit the server loop } - break; // Exit the server loop } + pool = cvar + .wait_while(pool, |pool| pool.is_empty()) + .expect("Error waiting on cvar"); } while let Some(stream) = pool.pop_back() { @@ -217,6 +217,17 @@ impl Hteapot { } loop { + if let Some(signal) = &shutdown_signal { + if !signal.load(Ordering::SeqCst) { + let (lock, cvar) = &*pool; + let _guard = lock.lock().unwrap(); + cvar.notify_all(); + for hook in shutdown_hooks.iter() { + hook(); + } + break; + } + } let stream = match listener.accept() { Ok((stream, _)) => stream, Err(_) => continue, From 289e96503c3cbe0161b6a98f7ed6f00f7b0c7189 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Mon, 14 Apr 2025 08:56:59 +0200 Subject: [PATCH 17/20] Change signal handling and add safety checks to avoid UB --- src/hteapot/mod.rs | 4 ++++ src/shutdown.rs | 19 +++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index de0647e..ae68f43 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -101,6 +101,10 @@ impl Hteapot { self.shutdown_signal = Some(signal); } + pub fn get_shutdown_signal(&self) -> Option> { + self.shutdown_signal.clone() + } + pub fn add_shutdown_hook(&mut self, hook: F) where F: Fn() + Send + Sync + 'static, diff --git a/src/shutdown.rs b/src/shutdown.rs index 89c9ae2..cdc291f 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,15 +1,23 @@ +use std::mem::zeroed; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{ptr, thread}; +use libc::{SA_RESTART, sigaction, sighandler_t}; + use crate::hteapot::Hteapot; use crate::logger::Logger; pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc { + let existing_signal = server.get_shutdown_signal(); + if existing_signal.is_some() { + return existing_signal.unwrap(); + } let running = Arc::new(AtomicBool::new(true)); let shutdown_logger = logger.with_component("shutdown"); + //This is a simplification an a ad-hoc solution #[cfg(unix)] { static mut RUNNING_PTR: *const AtomicBool = ptr::null(); @@ -34,8 +42,15 @@ pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc Arc Date: Mon, 14 Apr 2025 12:37:49 +0200 Subject: [PATCH 18/20] Basic implementation to force the awake of listener thread. this will attempt to connect to itself to wake the listener and check if the flag is set to false --- src/hteapot/mod.rs | 18 ++++++++------- src/shutdown.rs | 55 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index ae68f43..2b74b62 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -112,6 +112,12 @@ impl Hteapot { self.shutdown_hooks.push(Arc::new(hook)); } + pub fn get_addr(&self) -> (String, u16) { + //TODO: write logic to resolve adress to a correct ip + //example: "localhost" -> "0.0.0.0" or "127.0.0.1" + return (self.address.clone(), self.port); + } + // Constructor pub fn new(address: &str, port: u16) -> Self { Hteapot { @@ -175,15 +181,11 @@ impl Hteapot { .wait_while(pool, |pool| pool.is_empty()) .expect("Error waiting on cvar"); } - - if let Some(signal) = &shutdown_signal_clone { - if !signal.load(Ordering::SeqCst) { - break; // Exit the server loop - } + //TODO: move this to allow process the last request + if let Some(signal) = &shutdown_signal_clone { + if !signal.load(Ordering::SeqCst) { + break; // Exit the server loop } - pool = cvar - .wait_while(pool, |pool| pool.is_empty()) - .expect("Error waiting on cvar"); } while let Some(stream) = pool.pop_back() { diff --git a/src/shutdown.rs b/src/shutdown.rs index cdc291f..90d5421 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,10 +1,15 @@ +use core::panic; use std::mem::zeroed; +use std::net::Ipv4Addr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{ptr, thread}; -use libc::{SA_RESTART, sigaction, sighandler_t}; +use libc::{ + AF_INET, SA_RESTART, SOCK_STREAM, close, connect, htons, in_addr, sigaction, sighandler_t, + sockaddr, sockaddr_in, socket, +}; use crate::hteapot::Hteapot; use crate::logger::Logger; @@ -20,6 +25,34 @@ pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc sockaddr_in { + let ip: Ipv4Addr = addr.0.parse().expect("IP inválida"); + let port = addr.1 as u16; + + sockaddr_in { + sin_family: libc::AF_INET as u8, + #[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd" + ))] + sin_len: std::mem::size_of::() as u8, + sin_port: htons(port), + sin_addr: in_addr { + s_addr: u32::from_ne_bytes(ip.octets()), + }, + sin_zero: [0; 8], + } + } + unsafe { + // safety guard to avoid editions of RUNNING_PTR + // this will change whit multi server support + if !RUNNING_PTR.is_null() { + panic!("Tried to setup shutdown for two different server instances"); + } + } static mut RUNNING_PTR: *const AtomicBool = ptr::null(); static COUNTER_PTR: AtomicUsize = AtomicUsize::new(0); @@ -37,16 +70,30 @@ pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc() as u32, + ); + + // cerramos el socket aunque haya fallado + close(fd); } } } unsafe { + /////////////////////////////////////////////////////////////////////////// // Create a raw pointer and increase the reference counter to avoid // UB and early deallocation. IMPORTANT: remember to decrement if in the // future there is a function to disable this ctrl+c logic + /////////////////////////////////////////////////////////////////////////// + RUNNING_PTR = Arc::as_ptr(&running); Arc::increment_strong_count(RUNNING_PTR); + let mut action: sigaction = zeroed(); action.sa_flags = SA_RESTART; action.sa_sigaction = handle_sigint as sighandler_t; @@ -70,12 +117,8 @@ pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc Date: Tue, 15 Apr 2025 12:36:04 +0200 Subject: [PATCH 19/20] Refactor to reach scalability and security in unix sign handler --- src/shutdown.rs | 178 +++++++++++++++++++++++++----------------------- 1 file changed, 93 insertions(+), 85 deletions(-) diff --git a/src/shutdown.rs b/src/shutdown.rs index 90d5421..49f0756 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,15 +1,8 @@ -use core::panic; -use std::mem::zeroed; -use std::net::Ipv4Addr; +use std::net::TcpStream; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use std::time::Duration; -use std::{ptr, thread}; - -use libc::{ - AF_INET, SA_RESTART, SOCK_STREAM, close, connect, htons, in_addr, sigaction, sighandler_t, - sockaddr, sockaddr_in, socket, -}; use crate::hteapot::Hteapot; use crate::logger::Logger; @@ -22,83 +15,16 @@ pub fn setup_graceful_shutdown(server: &mut Hteapot, logger: Logger) -> Arc sockaddr_in { - let ip: Ipv4Addr = addr.0.parse().expect("IP inválida"); - let port = addr.1 as u16; - - sockaddr_in { - sin_family: libc::AF_INET as u8, - #[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd" - ))] - sin_len: std::mem::size_of::() as u8, - sin_port: htons(port), - sin_addr: in_addr { - s_addr: u32::from_ne_bytes(ip.octets()), - }, - sin_zero: [0; 8], - } - } - unsafe { - // safety guard to avoid editions of RUNNING_PTR - // this will change whit multi server support - if !RUNNING_PTR.is_null() { - panic!("Tried to setup shutdown for two different server instances"); - } - } - static mut RUNNING_PTR: *const AtomicBool = ptr::null(); - static COUNTER_PTR: AtomicUsize = AtomicUsize::new(0); - - extern "C" fn handle_sigint(_: i32) { - unsafe { - if COUNTER_PTR.load(Ordering::SeqCst) < 9 { - COUNTER_PTR.fetch_add(1, Ordering::SeqCst); - if COUNTER_PTR.load(Ordering::SeqCst) == 9 { - println!("\rPress ctrl+c one more time to force quit"); - } - } else { - println!("\rForcing exit, now!"); - std::process::exit(0); - } - - if !RUNNING_PTR.is_null() { - (*RUNNING_PTR).store(false, Ordering::SeqCst); - let fd = socket(AF_INET, SOCK_STREAM, 0); - let addr = to_sockaddr_in(("0.0.0.0".to_string(), 8081)); - let _ = connect( - fd, - &addr as *const sockaddr_in as *const sockaddr, - size_of::() as u32, - ); - - // cerramos el socket aunque haya fallado - close(fd); - } - } - } - - unsafe { - /////////////////////////////////////////////////////////////////////////// - // Create a raw pointer and increase the reference counter to avoid - // UB and early deallocation. IMPORTANT: remember to decrement if in the - // future there is a function to disable this ctrl+c logic - /////////////////////////////////////////////////////////////////////////// - - RUNNING_PTR = Arc::as_ptr(&running); - Arc::increment_strong_count(RUNNING_PTR); - - let mut action: sigaction = zeroed(); - action.sa_flags = SA_RESTART; - action.sa_sigaction = handle_sigint as sighandler_t; - sigaction(libc::SIGINT, &action, std::ptr::null_mut()); - } + let mut ush = unix_signhandler::UnixSignHandler::new(); + let running_clone = running.clone(); + let addr = server.get_addr(); + ush.set_handler(move || { + println!("It works!"); + running_clone.store(false, Ordering::SeqCst); + let _ = TcpStream::connect(format!("{}:{}", addr.0, addr.1)); + }); } #[cfg(windows)] @@ -175,3 +101,85 @@ pub mod win_console { unsafe { SetConsoleCtrlHandler(Some(handler_func), 1) != 0 } } } + +//Brought to you by: Overengeneering DIY™️ +#[cfg(unix)] +mod unix_signhandler { + use libc::{POLLIN, SA_RESTART, poll, pollfd, sigaction, sighandler_t}; + use std::io; + use std::sync::{Arc, RwLock}; + use std::{mem::zeroed, os::fd::RawFd, thread}; + + static mut PIPE_FD_READ: RawFd = -1; + static mut PIPE_FD_WRITE: RawFd = -1; + extern "C" fn handler(_: i32) { + let buf = [1u8]; + unsafe { + if PIPE_FD_WRITE != -1 { + let _ = libc::write(PIPE_FD_WRITE, buf.as_ptr() as *const _, 1); + } + } + } + + fn wait_for_readable(fd: RawFd) -> io::Result<()> { + let mut fds = [pollfd { + fd, + events: POLLIN, + revents: 0, + }]; + let ret = unsafe { poll(fds.as_mut_ptr(), 1, -1) }; // -1 = undefined timeout + + if ret < 0 { + return Err(io::Error::last_os_error()); + } + + if fds[0].revents & POLLIN != 0 { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "unexpected poll result", + )) + } + } + + pub struct UnixSignHandler { + actions: Arc>>>, + } + + impl UnixSignHandler { + pub fn new() -> Self { + let ush = UnixSignHandler { + actions: Arc::new(RwLock::new(Vec::new())), + }; + unsafe { + let mut fds = [0; 2]; + if libc::pipe(fds.as_mut_ptr()) == -1 { + panic!("failed to create pipe"); + } + PIPE_FD_READ = fds[0]; + PIPE_FD_WRITE = fds[1]; + } + unsafe { + let mut action: sigaction = zeroed(); + action.sa_flags = SA_RESTART; + action.sa_sigaction = handler as sighandler_t; + sigaction(libc::SIGINT, &action, std::ptr::null_mut()); + } + + let actions_clone = ush.actions.clone(); + thread::spawn(move || { + unsafe { + let _ = wait_for_readable(PIPE_FD_READ); + } + for action in actions_clone.read().unwrap().iter() { + action(); + } + }); + return ush; + } + pub fn set_handler(&mut self, action: impl Fn() + Send + Sync + 'static) { + self.actions.write().unwrap().push(Box::new(action)); + } + } +} From 95921163802b908114c033588bbdbbe5e1459b4e Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:18:44 +0200 Subject: [PATCH 20/20] Add setup for graceful shutdown --- Cargo.lock | 9 +++++++++ Cargo.toml | 3 +++ src/hteapot/mod.rs | 3 +-- src/main.rs | 11 +++++------ 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bf5bc4..da585f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,3 +5,12 @@ version = 4 [[package]] name = "hteapot" version = "0.5.0" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" diff --git a/Cargo.toml b/Cargo.toml index ae0d7ef..7bedc6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,9 @@ path = "src/hteapot/mod.rs" [[bin]] name = "hteapot" +[dependencies] +libc = "0.2.172" + [package.metadata.docs.rs] no-readme = true diff --git a/src/hteapot/mod.rs b/src/hteapot/mod.rs index 2b74b62..5b1f478 100644 --- a/src/hteapot/mod.rs +++ b/src/hteapot/mod.rs @@ -37,6 +37,7 @@ pub use self::status::HttpStatus; use std::collections::VecDeque; use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpListener, TcpStream}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; @@ -113,8 +114,6 @@ impl Hteapot { } pub fn get_addr(&self) -> (String, u16) { - //TODO: write logic to resolve adress to a correct ip - //example: "localhost" -> "0.0.0.0" or "127.0.0.1" return (self.address.clone(), self.port); } diff --git a/src/main.rs b/src/main.rs index 10478c3..f328571 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,13 +38,9 @@ mod cache; mod config; pub mod hteapot; mod logger; +mod shutdown; mod utils; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; -use std::time::Duration; - use std::path::Path; use std::sync::Mutex; use std::{fs, io, path::PathBuf}; @@ -246,7 +242,10 @@ fn main() { let cache: Mutex = Mutex::new(Cache::new(config.cache_ttl as u64)); // Initialize the cache with TTL // Create a new threaded HTTP server with the provided host, port, and number of threads - let server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); + let mut server = Hteapot::new_threaded(config.host.as_str(), config.port, config.threads); + + //Configure graceful shutdown from ctrl+c + shutdown::setup_graceful_shutdown(&mut server, logger.clone()); logger.info(format!( "Server started at http://{}:{}",