Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ documentation = "https://docs.rs/hteapot/"
homepage = "https://github.com/az107/HTeaPot"
repository = "https://github.com/az107/HTeaPot"
keywords = ["http", "server", "web", "lightweight", "rust"]
categories = ["network-programming", "web-programming", "command-line-utilities"]
categories = [
"network-programming",
"web-programming",
"command-line-utilities",
]
exclude = ["config.toml", "demo/", "README.md"]

[lib]
Expand All @@ -20,5 +24,9 @@ path = "src/hteapot/mod.rs"
[[bin]]
name = "hteapot"

[dependencies]
libc = "0.2.172"


[package.metadata.docs.rs]
no-readme = true
no-readme = true
74 changes: 59 additions & 15 deletions src/hteapot/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Written by Alberto Ruiz 2024-03-08
//
//
// This is the HTTP server module, it will handle the requests and responses
// Also provides utilities to parse the requests and build the response

Expand All @@ -17,11 +17,11 @@
//! ```

/// Submodules for HTTP functionality.
pub mod brew; // HTTP client implementation
mod methods; // HTTP method and status enums
mod request; // Request parsing and builder
mod response; // Response types and streaming
mod status; // Status code mapping
pub mod brew; // HTTP client implementation
mod methods; // HTTP method and status enums
mod request; // Request parsing and builder
mod response; // Response types and streaming
mod status; // Status code mapping

// Internal types used for connection management
use self::response::{EmptyHttpResponse, HttpResponseCommon, IterError};
Expand All @@ -37,6 +37,7 @@ pub use self::status::HttpStatus;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -76,6 +77,8 @@ pub struct Hteapot {
port: u16,
address: String,
threads: u16,
shutdown_signal: Option<Arc<AtomicBool>>,
shutdown_hooks: Vec<Arc<dyn Fn() + Send + Sync + 'static>>,
}

/// Represents the state of a connection's lifecycle.
Expand All @@ -95,12 +98,33 @@ struct SocketData {
}

impl Hteapot {
pub fn set_shutdown_signal(&mut self, signal: Arc<AtomicBool>) {
self.shutdown_signal = Some(signal);
}

pub fn get_shutdown_signal(&self) -> Option<Arc<AtomicBool>> {
self.shutdown_signal.clone()
}

pub fn add_shutdown_hook<F>(&mut self, hook: F)
where
F: Fn() + Send + Sync + 'static,
{
self.shutdown_hooks.push(Arc::new(hook));
}

pub fn get_addr(&self) -> (String, u16) {
return (self.address.clone(), self.port);
}

// Constructor
pub fn new(address: &str, port: u16) -> Self {
Hteapot {
port,
address: address.to_string(),
threads: 1,
shutdown_signal: None,
shutdown_hooks: Vec::new(),
}
}

Expand All @@ -109,6 +133,8 @@ impl Hteapot {
port,
address: address.to_string(),
threads: if threads == 0 { 1 } else { threads },
shutdown_signal: None,
shutdown_hooks: Vec::new(),
}
}

Expand All @@ -132,23 +158,34 @@ impl Hteapot {
Arc::new(Mutex::new(vec![0; self.threads as usize]));
let arc_action = Arc::new(action);

// Clone shutdown_signal and share the shutdown_hooks via Arc
let shutdown_signal = self.shutdown_signal.clone();
let shutdown_hooks = Arc::new(self.shutdown_hooks.clone());

for thread_index in 0..self.threads {
let pool_clone = pool.clone();
let action_clone = arc_action.clone();
let priority_list_clone = priority_list.clone();
let shutdown_signal_clone = shutdown_signal.clone();

thread::spawn(move || {
let mut streams_to_handle = Vec::new();
loop {
{
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");

if streams_to_handle.is_empty() {
// Store the returned guard back into pool
pool = cvar.wait_while(pool, |pool| pool.is_empty())
pool = cvar
.wait_while(pool, |pool| pool.is_empty())
.expect("Error waiting on cvar");
}
//TODO: move this to allow process the last request
if let Some(signal) = &shutdown_signal_clone {
if !signal.load(Ordering::SeqCst) {
break; // Exit the server loop
}
}

while let Some(stream) = pool.pop_back() {
let socket_status = SocketStatus {
Expand Down Expand Up @@ -185,6 +222,17 @@ impl Hteapot {
}

loop {
if let Some(signal) = &shutdown_signal {
if !signal.load(Ordering::SeqCst) {
let (lock, cvar) = &*pool;
let _guard = lock.lock().unwrap();
cvar.notify_all();
for hook in shutdown_hooks.iter() {
hook();
}
break;
}
}
let stream = match listener.accept() {
Ok((stream, _)) => stream,
Err(_) => continue,
Expand Down Expand Up @@ -216,17 +264,14 @@ impl Hteapot {
) -> Option<()> {
let status = socket_data.status.as_mut()?;

// Fix by miky-rola 2025-04-08
// Check if the TTL (time-to-live) for the connection has expired.
// If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written,
// the connection is gracefully shut down to free resources.
if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write {
let _ = socket_data.stream.shutdown(Shutdown::Both);
return None;
}

// If the request is not yet complete, read data from the stream into a buffer.
// This ensures that the server can handle partial or chunked requests.

if !status.request.done {
let mut buffer = [0; BUFFER_SIZE];
match socket_data.stream.read(&mut buffer) {
Expand Down Expand Up @@ -286,8 +331,7 @@ impl Hteapot {
status.response = response;
}

// Write the response to the client in chunks using the `peek` and `next` methods.
// This ensures that large responses are sent incrementally without blocking the server.
// Write the response to the client in chunks
loop {
match status.response.peek() {
Ok(n) => match socket_data.stream.write(&n) {
Expand Down Expand Up @@ -390,4 +434,4 @@ mod tests {
assert!(response_str.contains("Server: HTeaPot/"));
assert!(second_response_str.contains("Second Request"));
}
}
}
1 change: 0 additions & 1 deletion src/hteapot/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl HttpRequestBuilder {
if parts.len() != 3 {
return Err("Invalid method + path + version request");
}

self.request.method = HttpMethod::from_str(parts[0]);
let path_parts: Vec<&str> = parts[1].split('?').collect();
self.request.path = path_parts[0].to_string();
Expand Down
31 changes: 19 additions & 12 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt;
use std::io::Write;
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::sync::mpsc::{Sender, channel};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::fmt;
use std::sync::Arc;

/// Differnt log levels
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Copy)]
Expand Down Expand Up @@ -86,7 +86,15 @@ impl SimpleTime {
// calculate millisecs from nanosecs
let millis = nanos / 1_000_000;

(year, month as u32 + 1, day as u32, hour, minute, second, millis)
(
year,
month as u32 + 1,
day as u32,
hour,
minute,
second,
millis,
)
}

/// Returns a formatted timestamp string for the current system time.
Expand Down Expand Up @@ -133,7 +141,7 @@ impl Logger {
pub fn new<W: Sized + Write + Send + Sync + 'static>(
mut writer: W,
min_level: LogLevel,
component: &str
component: &str,
) -> Logger {
let (tx, rx) = channel::<LogMessage>();
let thread = thread::spawn(move || {
Expand All @@ -151,7 +159,7 @@ impl Logger {
msg.timestamp, msg.level, msg.component, msg.content
);
buff.push(formatted);
},
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(_) => break,
}
Expand Down Expand Up @@ -238,8 +246,8 @@ impl Logger {
pub fn fatal(&self, content: String) {
self.log(LogLevel::FATAL, content);
}
/// Log a message with TRACE level
#[allow(dead_code)]
/// Log a message with TRACE level
#[allow(dead_code)]
pub fn trace(&self, content: String) {
self.log(LogLevel::TRACE, content);
}
Expand All @@ -255,20 +263,19 @@ impl Clone for Logger {
}
}


#[cfg(test)]
mod tests {
use super::*;
use std::io::stdout;

#[test]
fn test_basic() {
let logs = Logger::new(stdout(), LogLevel::DEBUG, "test");
logs.info("test message".to_string());
logs.debug("debug info".to_string());

// Create a sub-logger with a different component
let sub_logger = logs.with_component("sub-component");
sub_logger.warn("warning from sub-component".to_string());
}
}
}
Loading