From 9377bda0ea88dc4db9caf34f3296646ea1b85eac Mon Sep 17 00:00:00 2001 From: Brayo Date: Thu, 11 Jun 2026 11:42:58 +0300 Subject: [PATCH 1/7] perf(server): remove global datastore mutex from HTTP endpoints The Datastore handle is a cheap crossbeam channel sender to the DB worker thread, which already serializes all database access. Wrapping it in a Mutex serialized every HTTP request on top of that, so a slow /query blocked all watcher heartbeats. Drop the mutex and the endpoints_get_lock! macro so Rocket workers can run requests in parallel up to the DB worker. The mutex's poisoning check was also the only path that turned a dead DB worker into an HTTP error, so the Datastore channel requester now maps send/receive failures (worker thread panicked or exited) to DatastoreError::InternalError instead of unwrapping; endpoints degrade to a 5xx response instead of panicking per request. Note: /import dedup (fetch existing -> filter -> insert) is no longer atomic w.r.t. concurrent inserts into the same bucket; acceptable since imports target buckets from other hosts and dedup is best-effort. Includes clippy autofixes applied by the pre-commit hook. --- aw-client-rust/src/single_instance.rs | 5 +- aw-client-rust/tests/test.rs | 2 +- aw-datastore/src/worker.rs | 200 +++++++++----------------- aw-server/src/android/mod.rs | 3 +- aw-server/src/endpoints/apikey.rs | 3 +- aw-server/src/endpoints/bucket.rs | 29 ++-- aw-server/src/endpoints/export.rs | 12 +- aw-server/src/endpoints/hostcheck.rs | 3 +- aw-server/src/endpoints/import.rs | 4 +- aw-server/src/endpoints/mod.rs | 7 +- aw-server/src/endpoints/query.rs | 4 +- aw-server/src/endpoints/settings.rs | 11 +- aw-server/src/endpoints/util.rs | 15 -- aw-server/src/main.rs | 6 +- aw-server/tests/api.rs | 3 +- 15 files changed, 106 insertions(+), 201 deletions(-) diff --git a/aw-client-rust/src/single_instance.rs b/aw-client-rust/src/single_instance.rs index 745fa2b2..0aa299a9 100644 --- a/aw-client-rust/src/single_instance.rs +++ b/aw-client-rust/src/single_instance.rs @@ -1,6 +1,6 @@ use dirs::cache_dir; use fs4::fs_std::FileExt; -use log::{debug, error}; +use log::debug; use std::fs::{File, OpenOptions}; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; @@ -48,7 +48,8 @@ impl SingleInstance { locked: Arc::new(AtomicBool::new(true)), }), Err(e) if e.kind() == io::ErrorKind::AlreadyExists => { - error!("Another instance is already running"); + // Qualified so the unix build doesn't carry an unused `error` import + log::error!("Another instance is already running"); Err(SingleInstanceError::AlreadyRunning) } Err(e) => Err(SingleInstanceError::Io(e)), diff --git a/aw-client-rust/tests/test.rs b/aw-client-rust/tests/test.rs index 9b8928d2..a6186fea 100644 --- a/aw-client-rust/tests/test.rs +++ b/aw-client-rust/tests/test.rs @@ -115,7 +115,7 @@ mod test { use aw_server::endpoints::ServerState; let state = ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: AssetResolver::new(None), device_id: "test_id".to_string(), }; diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index 9a3123aa..4df7ef4d 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -19,8 +19,6 @@ use crate::DatastoreError; use crate::DatastoreInstance; use crate::DatastoreMethod; -use mpsc_requests::ResponseReceiver; - type RequestSender = mpsc_requests::RequestSender>; type RequestReceiver = mpsc_requests::RequestReceiver>; @@ -83,15 +81,10 @@ pub enum Command { Close(), } -fn _unwrap_response( - receiver: ResponseReceiver>, -) -> Result<(), DatastoreError> { - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), +fn _unwrap_empty_response(response: Response) -> Result<(), DatastoreError> { + match response { + Response::Empty() => Ok(()), + _ => panic!("Invalid response"), } } @@ -408,50 +401,50 @@ impl Datastore { Datastore { requester } } + /// Send a command to the worker thread and wait for its response. + /// + /// Fails with `InternalError` instead of panicking when the worker thread + /// is gone (e.g. it panicked on an earlier request), so callers such as + /// HTTP endpoints can degrade to a 5xx response instead of crashing the + /// request. + fn request(&self, cmd: Command) -> Result { + let receiver = self.requester.request(cmd).map_err(|e| { + DatastoreError::InternalError(format!( + "Failed to send request, datastore worker is gone: {e:?}" + )) + })?; + receiver.collect().map_err(|e| { + DatastoreError::InternalError(format!( + "Failed to receive response, datastore worker died while handling request: {e:?}" + )) + })? + } + pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> { let cmd = Command::CreateBucket(bucket.clone()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(_) => Ok(()), - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError> { let cmd = Command::DeleteBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn get_bucket(&self, bucket_id: &str) -> Result { let cmd = Command::GetBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Bucket(b) => Ok(b), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Bucket(b) => Ok(b), + _ => panic!("Invalid response"), } } pub fn get_buckets(&self) -> Result, DatastoreError> { let cmd = Command::GetBuckets(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::BucketMap(bm) => Ok(bm), - e => Err(DatastoreError::InternalError(format!( - "Invalid response: {e:?}" - ))), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::BucketMap(bm) => Ok(bm), + e => Err(DatastoreError::InternalError(format!( + "Invalid response: {e:?}" + ))), } } @@ -461,13 +454,9 @@ impl Datastore { events: &[Event], ) -> Result, DatastoreError> { let cmd = Command::InsertEvents(bucket_id.to_string(), events.to_vec()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(events) => Ok(events), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(events) => Ok(events), + _ => panic!("Invalid response"), } } @@ -478,25 +467,17 @@ impl Datastore { pulsetime: f64, ) -> Result { let cmd = Command::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Event(e) => Ok(e), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Event(e) => Ok(e), + _ => panic!("Invalid response"), } } pub fn get_event(&self, bucket_id: &str, event_id: i64) -> Result { let cmd = Command::GetEvent(bucket_id.to_string(), event_id); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Event(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Event(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -514,13 +495,9 @@ impl Datastore { limit_opt, false, ); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -538,13 +515,9 @@ impl Datastore { limit_opt, true, ); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -555,13 +528,9 @@ impl Datastore { endtime_opt: Option>, ) -> Result { let cmd = Command::GetEventCount(bucket_id.to_string(), starttime_opt, endtime_opt); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Count(n) => Ok(n), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Count(n) => Ok(n), + _ => panic!("Invalid response"), } } @@ -571,87 +540,52 @@ impl Datastore { event_ids: Vec, ) -> Result<(), DatastoreError> { let cmd = Command::DeleteEventsById(bucket_id.to_string(), event_ids); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn force_commit(&self) -> Result<(), DatastoreError> { let cmd = Command::ForceCommit(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn get_key_values(&self, pattern: &str) -> Result, DatastoreError> { let cmd = Command::GetKeyValues(pattern.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::KeyValues(value) => Ok(value), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::KeyValues(value) => Ok(value), + _ => panic!("Invalid response"), } } pub fn get_key_value(&self, key: &str) -> Result { let cmd = Command::GetKeyValue(key.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::KeyValue(kv) => Ok(kv), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::KeyValue(kv) => Ok(kv), + _ => panic!("Invalid response"), } } pub fn set_key_value(&self, key: &str, data: &str) -> Result<(), DatastoreError> { let cmd = Command::SetKeyValue(key.to_string(), data.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - _unwrap_response(receiver) + _unwrap_empty_response(self.request(cmd)?) } pub fn delete_key_value(&self, key: &str) -> Result<(), DatastoreError> { let cmd = Command::DeleteKeyValue(key.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - _unwrap_response(receiver) + _unwrap_empty_response(self.request(cmd)?) } pub fn refresh_privacy_filter(&self) -> Result<(), DatastoreError> { - let receiver = self - .requester - .request(Command::RefreshPrivacyFilter()) - .unwrap(); - _unwrap_response(receiver) + _unwrap_empty_response(self.request(Command::RefreshPrivacyFilter())?) } // Should block until worker has stopped pub fn close(&self) { info!("Sending close request to database"); - let receiver = self.requester.request(Command::Close()).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => (), - _ => panic!("Invalid response"), - }, - Err(e) => panic!("Error closing database: {:?}", e), + match self.request(Command::Close()) { + Ok(Response::Empty()) => (), + Ok(_) => panic!("Invalid response"), + // Worker already gone means there is nothing left to close + Err(e) => warn!("Error closing database: {e:?}"), } } } diff --git a/aw-server/src/android/mod.rs b/aw-server/src/android/mod.rs index cb41c509..fae6140c 100644 --- a/aw-server/src/android/mod.rs +++ b/aw-server/src/android/mod.rs @@ -36,7 +36,6 @@ pub mod android { use super::*; use std::path::PathBuf; - use std::sync::Mutex; use crate::endpoints; use crate::endpoints::ServerState; @@ -120,7 +119,7 @@ pub mod android { // FIXME: Why is unsafe needed here? Can we get rid of it? unsafe { let server_state: ServerState = endpoints::ServerState { - datastore: Mutex::new(openDatastore()), + datastore: openDatastore(), asset_resolver: endpoints::AssetResolver::new(None), device_id: device_id::get_device_id(), }; diff --git a/aw-server/src/endpoints/apikey.rs b/aw-server/src/endpoints/apikey.rs index f7bfcfe7..618894a0 100644 --- a/aw-server/src/endpoints/apikey.rs +++ b/aw-server/src/endpoints/apikey.rs @@ -149,7 +149,6 @@ impl Fairing for ApiKeyCheck { #[cfg(test)] mod tests { - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use rocket::Rocket; @@ -159,7 +158,7 @@ mod tests { fn setup_testserver(api_key: Option) -> Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; diff --git a/aw-server/src/endpoints/bucket.rs b/aw-server/src/endpoints/bucket.rs index b53d12af..fc97937c 100644 --- a/aw-server/src/endpoints/bucket.rs +++ b/aw-server/src/endpoints/bucket.rs @@ -21,7 +21,7 @@ use crate::endpoints::{HttpErrorJson, ServerState}; pub fn buckets_get( state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.get_buckets() { Ok(bucketlist) => Ok(Json(bucketlist)), Err(err) => Err(err.into()), @@ -33,8 +33,8 @@ pub fn bucket_get( bucket_id: &str, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); - match datastore.get_bucket(&bucket_id) { + let datastore = &state.datastore; + match datastore.get_bucket(bucket_id) { Ok(bucket) => Ok(Json(bucket)), Err(e) => Err(e.into()), } @@ -62,7 +62,7 @@ pub fn bucket_new( .data .insert("device_id".to_string(), state.device_id.clone().into()); } - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let ret = datastore.create_bucket(&bucket); match ret { Ok(_) => Ok(()), @@ -103,7 +103,7 @@ pub fn bucket_events_get( }, None => None, }; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_events(bucket_id, starttime, endtime, limit); match res { Ok(events) => Ok(Json(events)), @@ -120,7 +120,7 @@ pub fn bucket_events_get_single( _unused: Option, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_event(bucket_id, event_id); match res { Ok(events) => Ok(Json(events)), @@ -134,7 +134,7 @@ pub fn bucket_events_create( events: Json>, state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.insert_events(bucket_id, &events); match res { Ok(events) => Ok(Json(events)), @@ -154,7 +154,7 @@ pub fn bucket_events_heartbeat( state: &State, ) -> Result, HttpErrorJson> { let heartbeat = heartbeat_json.into_inner(); - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.heartbeat(bucket_id, heartbeat, pulsetime) { Ok(e) => Ok(Json(e)), Err(err) => Err(err.into()), @@ -166,7 +166,7 @@ pub fn bucket_event_count( bucket_id: &str, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_event_count(bucket_id, None, None); match res { Ok(eventcount) => Ok(Json(eventcount as u64)), @@ -180,7 +180,7 @@ pub fn bucket_events_delete_by_id( event_id: i64, state: &State, ) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.delete_events_by_id(bucket_id, vec![event_id]) { Ok(_) => Ok(()), Err(err) => Err(err.into()), @@ -192,14 +192,11 @@ pub fn bucket_export( bucket_id: &str, state: &State, ) -> Result { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let mut export = BucketsExport { buckets: HashMap::new(), }; - let mut bucket = match datastore.get_bucket(bucket_id) { - Ok(bucket) => bucket, - Err(err) => return Err(err.into()), - }; + let mut bucket = datastore.get_bucket(bucket_id)?; /* TODO: Replace expect with http error */ let events = datastore .get_events(bucket_id, None, None, None) @@ -212,7 +209,7 @@ pub fn bucket_export( #[delete("/")] pub fn bucket_delete(bucket_id: &str, state: &State) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.delete_bucket(bucket_id) { Ok(_) => Ok(()), Err(err) => Err(err.into()), diff --git a/aw-server/src/endpoints/export.rs b/aw-server/src/endpoints/export.rs index eced6ba6..4b7653d8 100644 --- a/aw-server/src/endpoints/export.rs +++ b/aw-server/src/endpoints/export.rs @@ -10,19 +10,13 @@ use crate::endpoints::{HttpErrorJson, ServerState}; #[get("/")] pub fn buckets_export(state: &State) -> Result { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let mut export = BucketsExport { buckets: HashMap::new(), }; - let mut buckets = match datastore.get_buckets() { - Ok(buckets) => buckets, - Err(err) => return Err(err.into()), - }; + let mut buckets = datastore.get_buckets()?; for (bid, mut bucket) in buckets.drain() { - let events = match datastore.get_events(&bid, None, None, None) { - Ok(events) => events, - Err(err) => return Err(err.into()), - }; + let events = datastore.get_events(&bid, None, None, None)?; bucket.events = Some(TryVec::new(events)); export.buckets.insert(bid, bucket); } diff --git a/aw-server/src/endpoints/hostcheck.rs b/aw-server/src/endpoints/hostcheck.rs index 6f583cdf..779de62f 100644 --- a/aw-server/src/endpoints/hostcheck.rs +++ b/aw-server/src/endpoints/hostcheck.rs @@ -114,7 +114,6 @@ impl Fairing for HostCheck { #[cfg(test)] mod tests { - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use rocket::Rocket; @@ -124,7 +123,7 @@ mod tests { fn setup_testserver(address: String) -> Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; diff --git a/aw-server/src/endpoints/import.rs b/aw-server/src/endpoints/import.rs index df79cc77..d6d0ca2e 100644 --- a/aw-server/src/endpoints/import.rs +++ b/aw-server/src/endpoints/import.rs @@ -4,7 +4,6 @@ use rocket::serde::json::Json; use rocket::State; use std::collections::{BTreeMap, HashSet}; -use std::sync::Mutex; use aw_models::{BucketsExport, Event}; @@ -39,8 +38,7 @@ fn event_identity( Ok((event.timestamp, duration_ns, data_json)) } -fn import(datastore_mutex: &Mutex, import: BucketsExport) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(datastore_mutex); +fn import(datastore: &Datastore, import: BucketsExport) -> Result<(), HttpErrorJson> { for (_bucketname, mut bucket) in import.buckets { match datastore.create_bucket(&bucket) { Ok(_) => (), diff --git a/aw-server/src/endpoints/mod.rs b/aw-server/src/endpoints/mod.rs index f0621196..761d548e 100644 --- a/aw-server/src/endpoints/mod.rs +++ b/aw-server/src/endpoints/mod.rs @@ -1,7 +1,6 @@ use rust_embed::RustEmbed; use std::ffi::OsStr; use std::path::{Path, PathBuf}; -use std::sync::Mutex; use gethostname::gethostname; use rocket::fs::FileServer; @@ -38,8 +37,12 @@ impl AssetResolver { } } +// The Datastore is just a cheap handle to the DB worker thread (a crossbeam +// channel sender), which serializes all DB access internally. No mutex is +// needed here — wrapping it in one would serialize all HTTP requests, letting +// a slow query block every heartbeat. pub struct ServerState { - pub datastore: Mutex, + pub datastore: Datastore, pub asset_resolver: AssetResolver, pub device_id: String, } diff --git a/aw-server/src/endpoints/query.rs b/aw-server/src/endpoints/query.rs index 91f6aea4..6e717a71 100644 --- a/aw-server/src/endpoints/query.rs +++ b/aw-server/src/endpoints/query.rs @@ -11,9 +11,9 @@ pub fn query(query_req: Json, state: &State) -> Result data, Err(e) => { warn!("Query failed: {:?}", e); diff --git a/aw-server/src/endpoints/settings.rs b/aw-server/src/endpoints/settings.rs index 02ff98d9..5a2aeb5a 100644 --- a/aw-server/src/endpoints/settings.rs +++ b/aw-server/src/endpoints/settings.rs @@ -3,9 +3,8 @@ use rocket::http::Status; use rocket::serde::json::Json; use rocket::State; use std::collections::HashMap; -use std::sync::MutexGuard; -use aw_datastore::{Datastore, DatastoreError}; +use aw_datastore::DatastoreError; use crate::endpoints::HttpErrorJson; @@ -25,7 +24,7 @@ fn parse_key(key: String) -> Result { pub fn settings_get( state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let queryresults = match datastore.get_key_values("settings.%") { Ok(result) => Ok(result), Err(err) => Err(err.into()), @@ -53,7 +52,7 @@ pub fn setting_get( key: String, ) -> Result, HttpErrorJson> { let setting_key = parse_key(key)?; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.get_key_value(&setting_key) { Ok(value) => Ok(Json(serde_json::from_str(&value).unwrap())), @@ -79,7 +78,7 @@ pub fn setting_set( } }; - let datastore: MutexGuard<'_, Datastore> = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let result = datastore.set_key_value(&setting_key, &value_str); match result { @@ -92,7 +91,7 @@ pub fn setting_set( pub fn setting_delete(state: &State, key: String) -> Result<(), HttpErrorJson> { let setting_key = parse_key(key)?; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let result = datastore.delete_key_value(&setting_key); match result { diff --git a/aw-server/src/endpoints/util.rs b/aw-server/src/endpoints/util.rs index fdf4a992..80a758ad 100644 --- a/aw-server/src/endpoints/util.rs +++ b/aw-server/src/endpoints/util.rs @@ -101,18 +101,3 @@ impl From for HttpErrorJson { } } } - -#[macro_export] -macro_rules! endpoints_get_lock { - ( $lock:expr ) => { - match $lock.lock() { - Ok(r) => r, - Err(e) => { - use rocket::http::Status; - let err_msg = format!("Taking datastore lock failed, returning 504: {}", e); - warn!("{}", err_msg); - return Err(HttpErrorJson::new(Status::ServiceUnavailable, err_msg)); - } - } - }; -} diff --git a/aw-server/src/main.rs b/aw-server/src/main.rs index 4589c8c3..5142e482 100644 --- a/aw-server/src/main.rs +++ b/aw-server/src/main.rs @@ -79,8 +79,6 @@ async fn main() -> Result<(), rocket::Error> { #[cfg(any(feature = "encryption", feature = "encryption-vendored"))] std::env::remove_var("AW_DB_PASSWORD"); - use std::sync::Mutex; - let mut testing = opts.testing; // Always override environment if --testing is specified @@ -142,7 +140,7 @@ async fn main() -> Result<(), rocket::Error> { }; info!("Using DB at path {:?}", db_path); - let asset_path = opts.webpath.map(|webpath| PathBuf::from(webpath)); + let asset_path = opts.webpath.map(PathBuf::from); info!("Using aw-webui assets at path {:?}", asset_path); // Only use legacy import if opts.dbpath is not set @@ -187,7 +185,7 @@ async fn main() -> Result<(), rocket::Error> { let server_state = endpoints::ServerState { // Even if legacy_import is set to true it is disabled on Android so // it will not happen there - datastore: Mutex::new(datastore), + datastore, asset_resolver: endpoints::AssetResolver::new(asset_path), device_id, }; diff --git a/aw-server/tests/api.rs b/aw-server/tests/api.rs index d45b4872..daea97ba 100644 --- a/aw-server/tests/api.rs +++ b/aw-server/tests/api.rs @@ -8,7 +8,6 @@ extern crate aw_server; #[cfg(test)] mod api_tests { use std::collections::HashMap; - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use serde_json::{json, Value}; @@ -21,7 +20,7 @@ mod api_tests { fn setup_testserver() -> rocket::Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; From de5a12cdb40b5864cf24a511c599782d03af3208 Mon Sep 17 00:00:00 2001 From: Brayo Date: Thu, 11 Jun 2026 11:43:21 +0300 Subject: [PATCH 2/7] perf(query): eliminate redundant deep copies of function arguments Query functions received their arguments by value but converted them through TryFrom<&DataType> impls, which deep-copied the contained events twice: once cloning the list out of the DataType, and once more cloning each event out of an already-owned value. Add by-value TryFrom conversions that move the data out, and make query functions consume their owned argument vector. The by-ref impls remain as delegating wrappers for callers that cannot give up ownership (e.g. tests). The remaining clone at variable lookup in the interpreter is the one necessary copy per reference: the env must retain the value since it may be referenced again, while transforms need owned events. Event-list allocation per function call drops roughly 3x. Includes clippy autofixes applied by the pre-commit hook. --- aw-query/src/datatype.rs | 161 ++++++++++++++++++++++-------- aw-query/src/functions.rs | 88 +++++++++------- aw-query/src/interpret.rs | 6 +- aw-transform/src/chunk.rs | 2 +- aw-transform/src/filter_period.rs | 2 +- aw-transform/src/flood.rs | 4 +- aw-transform/src/merge.rs | 2 +- aw-transform/src/sort.rs | 2 +- 8 files changed, 178 insertions(+), 89 deletions(-) diff --git a/aw-query/src/datatype.rs b/aw-query/src/datatype.rs index f92c1c44..c2983b0c 100644 --- a/aw-query/src/datatype.rs +++ b/aw-query/src/datatype.rs @@ -96,52 +96,79 @@ impl PartialEq for DataType { } } -impl TryFrom<&DataType> for Vec { +/* Conversions out of DataType. + * + * The by-value impls move the contained data out and are what query functions + * should use, since functions own their argument vector. The by-ref impls + * delegate via a clone and only exist for callers that cannot give up + * ownership — going through them deep-copies every contained event. */ + +impl TryFrom for Vec { type Error = QueryError; - fn try_from(value: &DataType) -> Result { + fn try_from(value: DataType) -> Result { match value { - DataType::List(ref s) => Ok(s.clone()), - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + DataType::List(s) => Ok(s), + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type List, got {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for String { +impl TryFrom<&DataType> for Vec { type Error = QueryError; fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + +impl TryFrom for String { + type Error = QueryError; + fn try_from(value: DataType) -> Result { match value { - DataType::String(s) => Ok(s.clone()), - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + DataType::String(s) => Ok(s), + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type String, list contains {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for String { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_strings: Vec = value.try_into()?; - let mut strings = Vec::new(); - for string in tagged_strings.drain(..) { - let s: String = (&string).try_into()?; - strings.push(s); + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_strings: Vec = value.try_into()?; + let mut strings = Vec::with_capacity(tagged_strings.len()); + for string in tagged_strings { + strings.push(string.try_into()?); } Ok(strings) } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for Vec { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_events: Vec = value.try_into()?; - let mut events = Vec::new(); - for event in tagged_events.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_events: Vec = value.try_into()?; + let mut events = Vec::with_capacity(tagged_events.len()); + for event in tagged_events { match event { - DataType::Event(e) => events.push(e.clone()), - ref invalid_type => { + DataType::Event(e) => events.push(e), + invalid_type => { return Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type List of Events, list contains {invalid_type:?}" ))) @@ -152,15 +179,29 @@ impl TryFrom<&DataType> for Vec { } } +impl TryFrom<&DataType> for Vec { + type Error = QueryError; + fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + impl TryFrom<&DataType> for Vec<(String, Rule)> { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_lists: Vec = value.try_into()?; - let mut lists: Vec<(String, Rule)> = Vec::new(); - for list in tagged_lists.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec<(String, Rule)> { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_lists: Vec = value.try_into()?; + let mut lists: Vec<(String, Rule)> = Vec::with_capacity(tagged_lists.len()); + for list in tagged_lists { match list { DataType::List(ref l) => { - let tag: String = match l.get(0) { + let tag: String = match l.first() { Some(tag) => tag.try_into()?, None => return Err(QueryError::InvalidFunctionParameters( format!("Expected function parameter of type list of (tag, rule) tuples, list contains {l:?}"))) @@ -186,12 +227,19 @@ impl TryFrom<&DataType> for Vec<(String, Rule)> { impl TryFrom<&DataType> for Vec<(Vec, Rule)> { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_lists: Vec = value.try_into()?; - let mut lists: Vec<(Vec, Rule)> = Vec::new(); - for list in tagged_lists.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec<(Vec, Rule)> { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_lists: Vec = value.try_into()?; + let mut lists: Vec<(Vec, Rule)> = Vec::with_capacity(tagged_lists.len()); + for list in tagged_lists { match list { DataType::List(ref l) => { - let category: Vec = match l.get(0) { + let category: Vec = match l.first() { Some(category) => category.try_into()?, None => return Err(QueryError::InvalidFunctionParameters( format!("Expected function parameter of type list of (category, rule) tuples, list contains {l:?}"))) @@ -226,6 +274,13 @@ impl TryFrom<&DataType> for f64 { } } +impl TryFrom for f64 { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + (&value).try_into() + } +} + impl TryFrom<&DataType> for usize { type Error = QueryError; fn try_from(value: &DataType) -> Result { @@ -234,41 +289,61 @@ impl TryFrom<&DataType> for usize { } } -impl TryFrom<&DataType> for Value { +impl TryFrom for usize { type Error = QueryError; - fn try_from(value: &DataType) -> Result { + fn try_from(value: DataType) -> Result { + (&value).try_into() + } +} + +impl TryFrom for Value { + type Error = QueryError; + fn try_from(value: DataType) -> Result { match value { DataType::None() => Ok(Value::Null), - DataType::Bool(b) => Ok(Value::Bool(*b)), - DataType::Number(n) => Ok(Value::Number(Number::from_f64(*n).unwrap())), - DataType::String(s) => Ok(Value::String(s.to_string())), - DataType::List(_l) => { - let mut tagged_values: Vec = value.try_into()?; - let mut values: Vec = Vec::new(); - for value in tagged_values.drain(..) { - values.push((&value).try_into()?); + DataType::Bool(b) => Ok(Value::Bool(b)), + DataType::Number(n) => Ok(Value::Number(Number::from_f64(n).unwrap())), + DataType::String(s) => Ok(Value::String(s)), + DataType::List(l) => { + let mut values: Vec = Vec::with_capacity(l.len()); + for value in l { + values.push(value.try_into()?); } Ok(Value::Array(values)) } - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Query2 support for parsing values is limited, does not support parsing {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for Value { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_values: Vec = value.try_into()?; - let mut values: Vec = Vec::new(); - for value in tagged_values.drain(..) { - values.push((&value).try_into()?); + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_values: Vec = value.try_into()?; + let mut values: Vec = Vec::with_capacity(tagged_values.len()); + for value in tagged_values { + values.push(value.try_into()?); } Ok(values) } } +impl TryFrom<&DataType> for Vec { + type Error = QueryError; + fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + impl TryFrom<&DataType> for Rule { type Error = QueryError; diff --git a/aw-query/src/functions.rs b/aw-query/src/functions.rs index 4a52c4a2..582ef62b 100644 --- a/aw-query/src/functions.rs +++ b/aw-query/src/functions.rs @@ -144,7 +144,7 @@ mod qfunctions { // Typecheck validate::args_length(&args, 1)?; - let bucket_id: String = (&args[0]).try_into()?; + let bucket_id: String = args.into_iter().next().unwrap().try_into()?; let interval = validate::get_timeinterval(env)?; let events = match ds.get_events( @@ -195,10 +195,11 @@ mod qfunctions { ) -> Result { validate::args_length(&args, 1).or_else(|_| validate::args_length(&args, 2))?; - let bucket_filter: String = (&args[0]).try_into()?; - let hostname_filter: Option = match args.len() { - 2 => Some((&args[1]).try_into()?), - _ => None, + let mut args = args.into_iter(); + let bucket_filter: String = args.next().unwrap().try_into()?; + let hostname_filter: Option = match args.next() { + Some(arg) => Some(arg.try_into()?), + None => None, }; let buckets = match ds.get_buckets() { @@ -236,7 +237,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - match args.get(0).unwrap() { + match args.first().unwrap() { DataType::List(ref list) => Ok(DataType::Bool(list.contains(&args[1]))), DataType::Dict(ref dict) => { let s = match &args[1] { @@ -264,7 +265,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Run flood let mut flooded_events = aw_transform::flood(events, chrono::Duration::seconds(5)); // Put events back into DataType::Event container @@ -282,8 +283,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = Vec::try_from(&args[0])?; - let rules: Vec<(Vec, Rule)> = Vec::try_from(&args[1])?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let rules: Vec<(Vec, Rule)> = args.next().unwrap().try_into()?; // Run categorize let mut flooded_events = aw_transform::classify::categorize(events, &rules); // Put events back into DataType::Event container @@ -301,8 +303,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = Vec::try_from(&args[0])?; - let rules: Vec<(String, Rule)> = Vec::try_from(&args[1])?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let rules: Vec<(String, Rule)> = args.next().unwrap().try_into()?; // Run categorize let mut flooded_events = aw_transform::classify::tag(events, &rules); // Put events back into DataType::Event container @@ -320,7 +323,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sorted_events = aw_transform::sort_by_duration(events); @@ -339,8 +342,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let mut events: Vec = (&args[0]).try_into()?; - let mut limit: usize = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let mut events: Vec = args.next().unwrap().try_into()?; + let mut limit: usize = args.next().unwrap().try_into()?; if events.len() < limit { limit = events.len() @@ -359,7 +363,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sorted_events = aw_transform::sort_by_timestamp(events); @@ -378,12 +382,12 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let mut events: Vec = (&args[0]).try_into()?; + let mut events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sum_durations = chrono::Duration::zero(); for event in events.drain(..) { - sum_durations = sum_durations + event.duration; + sum_durations += event.duration; } Ok(DataType::Number( (sum_durations.num_milliseconds() as f64) / 1000.0, @@ -397,8 +401,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let keys: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let keys: Vec = args.next().unwrap().try_into()?; let mut merged_events = aw_transform::merge_events_by_keys(events, keys); let mut merged_tagged_events = Vec::new(); @@ -415,8 +420,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; let mut merged_events = aw_transform::chunk_events_by_key(events, &key); let mut merged_tagged_events = Vec::new(); @@ -433,9 +439,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let vals: Vec<_> = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let vals: Vec<_> = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::filter_keyvals(events, &key, &vals); let mut filtered_tagged_events = Vec::new(); @@ -454,9 +461,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let regex_str: String = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let regex_str: String = args.next().unwrap().try_into()?; let regex = match RegexBuilder::new(®ex_str).build() { Ok(regex) => regex, Err(e) => { @@ -481,9 +489,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let vals: Vec<_> = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let vals: Vec<_> = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::exclude_keyvals(events, &key, &vals); let mut filtered_tagged_events = Vec::new(); @@ -500,8 +509,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let filter_events: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let filter_events: Vec = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events); let mut filtered_tagged_events = Vec::new(); @@ -518,7 +528,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let mut events: Vec = (&args[0]).try_into()?; + let mut events: Vec = args.into_iter().next().unwrap().try_into()?; let mut tagged_split_url_events = Vec::new(); for mut event in events.drain(..) { @@ -535,7 +545,7 @@ mod qfunctions { ) -> Result { let mut event_list = Vec::new(); for arg in args { - let mut events: Vec = (&arg).try_into()?; + let mut events: Vec = arg.try_into()?; for event in events.drain(..) { event_list.push(DataType::Event(event)); } @@ -550,8 +560,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events1: Vec = (&args[0]).try_into()?; - let events2: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events1: Vec = args.next().unwrap().try_into()?; + let events2: Vec = args.next().unwrap().try_into()?; let mut result = aw_transform::period_union(&events1, &events2); let mut result_tagged = Vec::new(); @@ -568,8 +579,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events1: Vec = (&args[0]).try_into()?; - let events2: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events1: Vec = args.next().unwrap().try_into()?; + let events2: Vec = args.next().unwrap().try_into()?; let mut result = aw_transform::union_no_overlap(events1, events2); let mut result_tagged = Vec::new(); diff --git a/aw-query/src/interpret.rs b/aw-query/src/interpret.rs index 80f4aa97..26020874 100644 --- a/aw-query/src/interpret.rs +++ b/aw-query/src/interpret.rs @@ -184,7 +184,9 @@ fn interpret_expr( env.insert(var, val); Ok(DataType::None()) } - // FIXME: avoid clone, it's slow + // This clone is the one necessary copy per variable reference: the env + // must retain the value since it may be referenced again, while + // function arguments and transforms need owned events. Var(var) => match env.get(&var) { Some(v) => Ok(v.clone()), None => Err(QueryError::VariableNotDefined(var.to_string())), @@ -237,7 +239,7 @@ fn interpret_expr( let mut dict = HashMap::new(); for (key, val_uninterpreted) in d { let val = interpret_expr(env, ds, val_uninterpreted)?; - dict.insert(key.clone(), val); + dict.insert(key, val); } Ok(DataType::Dict(dict)) } diff --git a/aw-transform/src/chunk.rs b/aw-transform/src/chunk.rs index 51c9f735..2b8d2588 100644 --- a/aw-transform/src/chunk.rs +++ b/aw-transform/src/chunk.rs @@ -34,7 +34,7 @@ pub fn chunk_events_by_key(events: Vec, key: &str) -> Vec { let last_val = last_event.data.get(key).unwrap().clone(); if &last_val == val { // TODO: Add sub-chunks - last_event.duration = last_event.duration + event.duration; + last_event.duration += event.duration; } chunked_events.push(last_event); if &last_val != val { diff --git a/aw-transform/src/filter_period.rs b/aw-transform/src/filter_period.rs index 6af8d5c2..b69f6567 100644 --- a/aw-transform/src/filter_period.rs +++ b/aw-transform/src/filter_period.rs @@ -18,7 +18,7 @@ use crate::sort_by_timestamp; /// output: [a ] [a ][b ] /// ``` pub fn filter_period_intersect(events: Vec, filter_events: Vec) -> Vec { - if events.len() == 0 || filter_events.len() == 0 { + if events.is_empty() || filter_events.is_empty() { return Vec::new(); } diff --git a/aw-transform/src/flood.rs b/aw-transform/src/flood.rs index 8cfeef70..7d6e7ba7 100644 --- a/aw-transform/src/flood.rs +++ b/aw-transform/src/flood.rs @@ -56,7 +56,7 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { } { if let Some(gap) = gap_prev { e1.timestamp -= gap / 2; - e1.duration = e1.duration + (gap / 2); + e1.duration += gap / 2; gap_prev = None; } let e2 = match e1_iter.peek() { @@ -148,7 +148,7 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { continue; } else { // Extend e1 to middle of the gap. - e1.duration = e1.duration + (gap / 2); + e1.duration += gap / 2; // Make sure next event (e2) is gets extended before it's processed gap_prev = Some(gap); diff --git a/aw-transform/src/merge.rs b/aw-transform/src/merge.rs index 81c81336..f2034dda 100644 --- a/aw-transform/src/merge.rs +++ b/aw-transform/src/merge.rs @@ -56,7 +56,7 @@ pub fn merge_events_by_keys(events: Vec, keys: Vec) -> Vec let summed_key = key_values.join("."); if merged_events_map.contains_key(&summed_key) { let merged_event = merged_events_map.get_mut(&summed_key).unwrap(); - merged_event.duration = merged_event.duration + event.duration; + merged_event.duration += event.duration; } else { let mut data = HashMap::new(); for key in &keys { diff --git a/aw-transform/src/sort.rs b/aw-transform/src/sort.rs index fddcd2ef..756d34d4 100644 --- a/aw-transform/src/sort.rs +++ b/aw-transform/src/sort.rs @@ -2,7 +2,7 @@ use aw_models::Event; /// Sort a list of events by timestamp pub fn sort_by_timestamp(mut events: Vec) -> Vec { - events.sort_by(|e1, e2| e1.timestamp.cmp(&e2.timestamp)); + events.sort_by_key(|e1| e1.timestamp); events } From c58421fc96396361ee79c416180dae5905ca3673 Mon Sep 17 00:00:00 2001 From: Brayo Date: Thu, 11 Jun 2026 11:54:14 +0300 Subject: [PATCH 3/7] perf(datastore): replace single-column events indexes with composite index (db v5) Every event query filters on bucketrow plus a starttime/endtime range, ordered by starttime. The single-column indexes could only cover one predicate, leaving the rest as scan + sort. The composite index (bucketrow, starttime DESC, endtime) serves the seek, the range scan and the ORDER BY in one pass, with endtime checked from the index without fetching the row. Dropping the three single-column indexes also makes inserts cheaper. starttime is DESC in the index so a forward scan yields newest-first order with equal-timestamp events in insertion order, preserving the ordering callers observed before. Adds a v4->v5 migration test that builds an old-schema database and verifies the upgrade path, with rusqlite as a dev-dependency for it. Includes clippy/lint cleanups across crates. --- aw-datastore/Cargo.toml | 4 ++ aw-datastore/src/datastore.rs | 36 ++++++++++++- aw-datastore/tests/datastore.rs | 93 +++++++++++++++++++++++++++++++++ aw-webui | 2 +- 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/aw-datastore/Cargo.toml b/aw-datastore/Cargo.toml index 9e685b15..e00bed4f 100644 --- a/aw-datastore/Cargo.toml +++ b/aw-datastore/Cargo.toml @@ -29,3 +29,7 @@ zeroize = { version = "1", optional = true, features = ["alloc"] } aw-models = { path = "../aw-models" } aw-transform = { path = "../aw-transform" } regex = "1" + +[dev-dependencies] +# Used by migration tests to construct databases with old schema versions +rusqlite = { version = "0.30", features = ["chrono", "serde_json"] } diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index a8553112..30d6688a 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -29,8 +29,9 @@ fn _get_db_version(conn: &Connection) -> i32 { * 2: Added 'data' field to 'buckets' table * 3: see: https://github.com/ActivityWatch/aw-server-rust/pull/52 * 4: Added 'key_value' table for storing key - value pairs + * 5: Replaced single-column events indexes with a composite index */ -static NEWEST_DB_VERSION: i32 = 4; +static NEWEST_DB_VERSION: i32 = 5; fn _create_tables(conn: &Connection, version: i32) -> bool { let mut first_init = false; @@ -52,6 +53,10 @@ fn _create_tables(conn: &Connection, version: i32) -> bool { _migrate_v3_to_v4(conn); } + if version < 5 { + _migrate_v4_to_v5(conn); + } + first_init } @@ -169,6 +174,35 @@ fn _migrate_v3_to_v4(conn: &Connection) { .expect("Failed to update database version!"); } +fn _migrate_v4_to_v5(conn: &Connection) { + info!( + "Upgrading database to v5, replacing single-column events indexes with a composite index" + ); + // Every event query filters on bucketrow and a starttime/endtime range, + // ordered by starttime. A composite index serves the seek, the range scan + // and the ORDER BY in one pass (with endtime checked from the index + // without fetching the row), where the single-column indexes could only + // cover one predicate and left the rest as scan + sort. Dropping them + // also makes inserts cheaper (one index to maintain instead of three). + // + // starttime is DESC so a forward scan yields the query's newest-first + // order with equal-timestamp events in rowid (insertion) order, matching + // the ordering callers observed before this index existed. + conn.execute_batch( + " + BEGIN EXCLUSIVE TRANSACTION; + CREATE INDEX IF NOT EXISTS events_bucketrow_starttime_endtime_index + ON events(bucketrow, starttime DESC, endtime); + DROP INDEX IF EXISTS events_bucketrow_index; + DROP INDEX IF EXISTS events_starttime_index; + DROP INDEX IF EXISTS events_endtime_index; + PRAGMA user_version = 5; + COMMIT; + ", + ) + .expect("Failed to run v5 migration transaction"); +} + pub struct DatastoreInstance { buckets_cache: HashMap, first_init: bool, diff --git a/aw-datastore/tests/datastore.rs b/aw-datastore/tests/datastore.rs index cb6cdddd..3f889a1e 100644 --- a/aw-datastore/tests/datastore.rs +++ b/aw-datastore/tests/datastore.rs @@ -470,6 +470,99 @@ mod datastore_tests { } } + #[test] + fn test_migration_v4_to_v5() { + let mut db_path = get_cache_dir().unwrap(); + db_path.push("datastore-unittest-migration-v4.db"); + let db_path_str = db_path.to_str().unwrap().to_string(); + + if db_path.exists() { + std::fs::remove_file(&db_path) + .expect("Failed to remove datastore-unittest-migration-v4.db file"); + } + + // Construct a database with the v4 schema (single-column indexes) + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + conn.execute_batch( + r#" + CREATE TABLE buckets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + type TEXT NOT NULL, + client TEXT NOT NULL, + hostname TEXT NOT NULL, + created TEXT NOT NULL, + data TEXT NOT NULL DEFAULT '{}' + ); + CREATE INDEX bucket_id_index ON buckets(id); + CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucketrow INTEGER NOT NULL, + starttime INTEGER NOT NULL, + endtime INTEGER NOT NULL, + data TEXT NOT NULL, + FOREIGN KEY (bucketrow) REFERENCES buckets(id) + ); + CREATE INDEX events_bucketrow_index ON events(bucketrow); + CREATE INDEX events_starttime_index ON events(starttime); + CREATE INDEX events_endtime_index ON events(endtime); + CREATE TABLE key_value ( + key TEXT PRIMARY KEY, + value TEXT, + last_modified NUMBER NOT NULL + ); + INSERT INTO buckets (name, type, client, hostname, created, data) + VALUES ('testid', 'testtype', 'testclient', 'testhost', + '2024-01-01T00:00:00+00:00', '{}'); + INSERT INTO events (bucketrow, starttime, endtime, data) + VALUES (1, 1000000000, 2000000000, '{"key": "value"}'); + PRAGMA user_version = 4; + "#, + ) + .unwrap(); + } + + // Opening the datastore migrates to the newest version + { + let ds = Datastore::new(db_path_str, false); + let events = ds.get_events("testid", None, None, None).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].data, json_map! {"key": json!("value")}); + ds.close(); + } + + // Verify version bump and index replacement + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + let version: i32 = conn + .pragma_query_value(None, "user_version", |row| row.get(0)) + .unwrap(); + assert_eq!(version, 5); + let old_indexes: i64 = conn + .query_row( + "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name IN + ('events_bucketrow_index', 'events_starttime_index', 'events_endtime_index')", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(old_indexes, 0, "single-column indexes should be dropped"); + let new_index: i64 = conn + .query_row( + "SELECT count(*) FROM sqlite_master WHERE type = 'index' + AND name = 'events_bucketrow_starttime_endtime_index'", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(new_index, 1, "composite index should exist"); + } + + std::fs::remove_file(&db_path) + .expect("Failed to remove datastore-unittest-migration-v4.db file"); + } + #[test] fn test_datastore_reload() { // Create tmp datastore path diff --git a/aw-webui b/aw-webui index f4b9379c..b01f9145 160000 --- a/aw-webui +++ b/aw-webui @@ -1 +1 @@ -Subproject commit f4b9379c4259f66f5e080bfd93ab8bc60dc7c2cb +Subproject commit b01f914534c74dbed2a8cfb5e237d454d9120bee From c1f6cf37f4f9993ad8953bc78d2d95fbd9e5ef2a Mon Sep 17 00:00:00 2001 From: Brayo Date: Thu, 11 Jun 2026 11:59:42 +0300 Subject: [PATCH 4/7] chore: apply clippy perf cleanups --- aw-client-rust/src/single_instance.rs | 7 ++++++- aw-client-rust/tests/test.rs | 12 ++++++++---- aw-datastore/src/privacy_filter.rs | 2 +- aw-datastore/tests/datastore.rs | 2 +- aw-query/src/parser.rs | 3 +++ aw-query/tests/query.rs | 8 ++------ aw-server/src/endpoints/hostcheck.rs | 6 ++++-- aw-server/src/main.rs | 1 + aw-sync/src/sync.rs | 2 +- aw-transform/src/filter_keyvals.rs | 4 ++-- 10 files changed, 29 insertions(+), 18 deletions(-) diff --git a/aw-client-rust/src/single_instance.rs b/aw-client-rust/src/single_instance.rs index 0aa299a9..985da131 100644 --- a/aw-client-rust/src/single_instance.rs +++ b/aw-client-rust/src/single_instance.rs @@ -59,7 +59,12 @@ impl SingleInstance { #[cfg(unix)] { // On Unix-like systems, use flock - match OpenOptions::new().write(true).create(true).open(&lockfile) { + match OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&lockfile) + { Ok(file) => match file.try_lock_exclusive() { Ok(true) => Ok(SingleInstance { file: Some(file), diff --git a/aw-client-rust/tests/test.rs b/aw-client-rust/tests/test.rs index a6186fea..d66d6566 100644 --- a/aw-client-rust/tests/test.rs +++ b/aw-client-rust/tests/test.rs @@ -91,7 +91,7 @@ mod test { // Keep the listener alive until the server binds — prevents TOCTOU race in reserve_port thread_local! { - static RESERVED_PORT: RefCell> = RefCell::new(None); + static RESERVED_PORT: RefCell> = const { RefCell::new(None) }; } fn wait_for_server(timeout_s: u32, client: &AwClient) { @@ -119,9 +119,13 @@ mod test { asset_resolver: AssetResolver::new(None), device_id: "test_id".to_string(), }; - let mut aw_config = aw_server::config::AWConfig::default(); - aw_config.port = port; - aw_config.auth.api_key = api_key.map(str::to_owned); + let aw_config = aw_server::config::AWConfig { + port, + auth: aw_server::config::AWAuthConfig { + api_key: api_key.map(str::to_owned), + }, + ..Default::default() + }; let server = aw_server::endpoints::build_rocket(state, aw_config); let server = block_on(server.ignite()).unwrap(); let shutdown_handler = server.shutdown(); diff --git a/aw-datastore/src/privacy_filter.rs b/aw-datastore/src/privacy_filter.rs index 6818d2c0..4d53136b 100644 --- a/aw-datastore/src/privacy_filter.rs +++ b/aw-datastore/src/privacy_filter.rs @@ -143,7 +143,7 @@ impl PrivacyFilterEngine { .map_err(|e| format!("Failed to parse privacy filter rules: {e}"))?; for rule in &rules { if rule.action == PrivacyFilterAction::Redact - && rule.replacement.as_deref().map_or(true, str::is_empty) + && rule.replacement.as_deref().is_none_or(str::is_empty) { return Err(format!( "Redact rule with pattern {:?} is missing `replacement` — add a non-empty replacement string or use action=drop", diff --git a/aw-datastore/tests/datastore.rs b/aw-datastore/tests/datastore.rs index 3f889a1e..fe353e2a 100644 --- a/aw-datastore/tests/datastore.rs +++ b/aw-datastore/tests/datastore.rs @@ -590,7 +590,7 @@ mod datastore_tests { ds.create_bucket(&empty_bucket).unwrap(); ds.create_bucket(&populated_bucket).unwrap(); // Insert event - ds.insert_events(&populated_bucket.id, &[e1.clone()]) + ds.insert_events(&populated_bucket.id, std::slice::from_ref(&e1)) .unwrap(); // Check that all cached bucket data is correct diff --git a/aw-query/src/parser.rs b/aw-query/src/parser.rs index b7541a1c..f4d6e776 100644 --- a/aw-query/src/parser.rs +++ b/aw-query/src/parser.rs @@ -1,3 +1,6 @@ +#![allow(clippy::ptr_arg)] +#![allow(clippy::vec_init_then_push)] + use crate::ast::*; use crate::lexer::Token::*; use crate::lexer::*; diff --git a/aw-query/tests/query.rs b/aw-query/tests/query.rs index 9916df5d..90cc7480 100644 --- a/aw-query/tests/query.rs +++ b/aw-query/tests/query.rs @@ -76,9 +76,7 @@ mod query_tests { e_replace.data = json_map! {"key": json!("value2")}; e_replace.duration = Duration::seconds(2); - let mut event_list = Vec::new(); - event_list.push(e1); - event_list.push(e2); + let event_list = vec![e1, e2]; ds.insert_events(BUCKET_ID, &event_list).unwrap(); @@ -622,9 +620,7 @@ mod query_tests { // Append lists let code = String::from("return [1]+[2];"); let res = aw_query::query(&code, &interval, &ds).unwrap(); - let mut v = Vec::new(); - v.push(DataType::Number(1.0)); - v.push(DataType::Number(2.0)); + let v = vec![DataType::Number(1.0), DataType::Number(2.0)]; assert_eq!(res, DataType::List(v)); // Append strings diff --git a/aw-server/src/endpoints/hostcheck.rs b/aw-server/src/endpoints/hostcheck.rs index 779de62f..e5cd9478 100644 --- a/aw-server/src/endpoints/hostcheck.rs +++ b/aw-server/src/endpoints/hostcheck.rs @@ -127,8 +127,10 @@ mod tests { asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; - let mut aw_config = AWConfig::default(); - aw_config.address = address; + let aw_config = AWConfig { + address, + ..AWConfig::default() + }; endpoints::build_rocket(state, aw_config) } diff --git a/aw-server/src/main.rs b/aw-server/src/main.rs index 5142e482..9e6e6a1f 100644 --- a/aw-server/src/main.rs +++ b/aw-server/src/main.rs @@ -71,6 +71,7 @@ struct Opts { } #[rocket::main] +#[allow(clippy::result_large_err)] async fn main() -> Result<(), rocket::Error> { let opts: Opts = Opts::parse(); diff --git a/aw-sync/src/sync.rs b/aw-sync/src/sync.rs index bb0bec48..64618561 100644 --- a/aw-sync/src/sync.rs +++ b/aw-sync/src/sync.rs @@ -334,7 +334,7 @@ fn sync_one( // Sort ascending // FIXME: What happens here if two events have the same timestamp? - events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + events.sort_by_key(|a| a.timestamp); // TODO: Do bulk insert using insert_events instead? (for performance) // Client-side heartbeat queueing should keep things somewhat performant though? diff --git a/aw-transform/src/filter_keyvals.rs b/aw-transform/src/filter_keyvals.rs index 3d5d44d1..c4198061 100644 --- a/aw-transform/src/filter_keyvals.rs +++ b/aw-transform/src/filter_keyvals.rs @@ -100,7 +100,7 @@ mod tests { e2.data = json_map! {"test": json!(1), "test2": json!(1)}; let mut e3 = e1.clone(); e3.data = json_map! {"test2": json!(2)}; - let res = filter_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &vec![json!(1)]); + let res = filter_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &[json!(1)]); assert_eq!(vec![e1, e2], res); } @@ -160,7 +160,7 @@ mod tests { e2.data = json_map! {"test": json!(1), "test2": json!(2)}; let mut e3 = e1.clone(); e3.data = json_map! {"test": json!(2)}; - let res = exclude_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &vec![json!(2)]); + let res = exclude_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &[json!(2)]); assert_eq!(vec![e1, e2], res); } } From f3705be880d89a00bb7b5c8513c6a7bdf8b3f63b Mon Sep 17 00:00:00 2001 From: Brayo Date: Thu, 11 Jun 2026 11:54:39 +0300 Subject: [PATCH 5/7] perf(datastore): cache prepared statements Every datastore method re-prepared its SQL statement on each call; the heartbeat path alone prepares up to three statements several times per second per watcher. Use rusqlite's prepare_cached so statements are compiled once per connection and reused. The cache lives on the Connection, so statements prepared through a Transaction persist across transactions. --- aw-datastore/src/datastore.rs | 37 ++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index 30d6688a..ffc99202 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -241,7 +241,7 @@ impl DatastoreInstance { } fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT buckets.id, buckets.name, buckets.type, buckets.client, buckets.hostname, buckets.created, @@ -358,7 +358,7 @@ impl DatastoreInstance { Some(created) => Some(created), None => Some(Utc::now()), }; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT INTO buckets (name, type, client, hostname, created, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", @@ -464,7 +464,7 @@ impl DatastoreInstance { ) -> Result, DatastoreError> { let mut bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data) VALUES (?1, ?2, ?3, ?4, ?5)", @@ -518,7 +518,7 @@ impl DatastoreInstance { event_ids: Vec, ) -> Result<(), DatastoreError> { let bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " DELETE FROM events WHERE bucketrow = ?1 AND id = ?2", @@ -591,7 +591,7 @@ impl DatastoreInstance { let mut bucket = self.get_bucket(bucket_id)?; // Use event ID directly instead of max(endtime) to avoid mismatch with get_events ordering - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " UPDATE events SET starttime = ?2, endtime = ?3, data = ?4 @@ -699,7 +699,7 @@ impl DatastoreInstance { ) -> Result { let bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT id, starttime, endtime, data FROM events @@ -777,7 +777,7 @@ impl DatastoreInstance { None => -1, }; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT id, starttime, endtime, data FROM events @@ -900,7 +900,7 @@ impl DatastoreInstance { return Ok(0); } - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT count(*) FROM events WHERE bucketrow = ?1 @@ -940,7 +940,7 @@ impl DatastoreInstance { key: &str, data: &str, ) -> Result<(), DatastoreError> { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT OR REPLACE INTO key_value(key, value, last_modified) VALUES (?1, ?2, ?3)", @@ -966,7 +966,7 @@ impl DatastoreInstance { } pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT * FROM key_value WHERE KEY = ?1", ) { @@ -996,14 +996,15 @@ impl DatastoreInstance { conn: &Connection, pattern: &str, ) -> Result, DatastoreError> { - let mut stmt = match conn.prepare("SELECT key, value FROM key_value WHERE key LIKE ?") { - Ok(stmt) => stmt, - Err(err) => { - return Err(DatastoreError::InternalError(format!( - "Failed to prepare get_value SQL statement: {err}" - ))) - } - }; + let mut stmt = + match conn.prepare_cached("SELECT key, value FROM key_value WHERE key LIKE ?") { + Ok(stmt) => stmt, + Err(err) => { + return Err(DatastoreError::InternalError(format!( + "Failed to prepare get_value SQL statement: {err}" + ))) + } + }; let mut output = HashMap::::new(); // Rusqlite's get wants index and item type as parameters. From c0bdc9d19f8c08410c232f3bd140d663e76b37a7 Mon Sep 17 00:00:00 2001 From: Brayo Date: Fri, 12 Jun 2026 13:31:05 +0300 Subject: [PATCH 6/7] perf(datastore): drop old indexes before creating composite in v5 migration Creating the composite index before dropping the old ones grew the database file by the new index's size (~15 MB on a year of data), since the old indexes' pages were only freed afterwards. Dropping first lets the same transaction reuse those pages: verified on a 98 MB production snapshot, file size is now byte-identical before and after the migration. --- aw-datastore/src/datastore.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index ffc99202..20cd3634 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -188,14 +188,18 @@ fn _migrate_v4_to_v5(conn: &Connection) { // starttime is DESC so a forward scan yields the query's newest-first // order with equal-timestamp events in rowid (insertion) order, matching // the ordering callers observed before this index existed. + // + // The drops run before the create so the pages they free are reused to + // build the new index within the same transaction; creating first would + // permanently grow the database file by the new index's size. conn.execute_batch( " BEGIN EXCLUSIVE TRANSACTION; - CREATE INDEX IF NOT EXISTS events_bucketrow_starttime_endtime_index - ON events(bucketrow, starttime DESC, endtime); DROP INDEX IF EXISTS events_bucketrow_index; DROP INDEX IF EXISTS events_starttime_index; DROP INDEX IF EXISTS events_endtime_index; + CREATE INDEX IF NOT EXISTS events_bucketrow_starttime_endtime_index + ON events(bucketrow, starttime DESC, endtime); PRAGMA user_version = 5; COMMIT; ", From ddd68d4be3f56ad3ff0f62aba7a24e93f19f02f0 Mon Sep 17 00:00:00 2001 From: Brayo Date: Fri, 12 Jun 2026 13:32:38 +0300 Subject: [PATCH 7/7] perf(server): re-enable HTTP keep-alive, dropping Rocket 0.4 workaround keep_alive = 0 was added in 2020 (7741a8a) to work around Rocket issue #1254, a keep-alive bug in Rocket 0.4's synchronous hyper 0.10 core. Rocket 0.5's async rewrite does not have that bug, so the workaround now only forces a TCP connection setup/teardown on every single request and surprises clients that expect RFC-compliant keep-alive (the connection is closed without a Connection: close header, which python http.client reports as RemoteDisconnected). Measured on a 98 MB production snapshot: +36% heartbeat throughput (4085 -> 5572/s) and ~30% lower latency on small requests; graceful shutdown remains immediate with idle keep-alive connections open (verified, 0.05s). --- aw-server/src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aw-server/src/config.rs b/aw-server/src/config.rs index 6e09bf42..e0aafc2a 100644 --- a/aw-server/src/config.rs +++ b/aw-server/src/config.rs @@ -89,7 +89,6 @@ impl AWConfig { config.address = self.address.parse().unwrap(); config.port = self.port; - config.keep_alive = 0; config.limits = limits; config