diff --git a/aw-client-rust/src/single_instance.rs b/aw-client-rust/src/single_instance.rs index 745fa2b2..985da131 100644 --- a/aw-client-rust/src/single_instance.rs +++ b/aw-client-rust/src/single_instance.rs @@ -1,6 +1,6 @@ use dirs::cache_dir; use fs4::fs_std::FileExt; -use log::{debug, error}; +use log::debug; use std::fs::{File, OpenOptions}; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; @@ -48,7 +48,8 @@ impl SingleInstance { locked: Arc::new(AtomicBool::new(true)), }), Err(e) if e.kind() == io::ErrorKind::AlreadyExists => { - error!("Another instance is already running"); + // Qualified so the unix build doesn't carry an unused `error` import + log::error!("Another instance is already running"); Err(SingleInstanceError::AlreadyRunning) } Err(e) => Err(SingleInstanceError::Io(e)), @@ -58,7 +59,12 @@ impl SingleInstance { #[cfg(unix)] { // On Unix-like systems, use flock - match OpenOptions::new().write(true).create(true).open(&lockfile) { + match OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&lockfile) + { Ok(file) => match file.try_lock_exclusive() { Ok(true) => Ok(SingleInstance { file: Some(file), diff --git a/aw-client-rust/tests/test.rs b/aw-client-rust/tests/test.rs index 9b8928d2..d66d6566 100644 --- a/aw-client-rust/tests/test.rs +++ b/aw-client-rust/tests/test.rs @@ -91,7 +91,7 @@ mod test { // Keep the listener alive until the server binds — prevents TOCTOU race in reserve_port thread_local! { - static RESERVED_PORT: RefCell> = RefCell::new(None); + static RESERVED_PORT: RefCell> = const { RefCell::new(None) }; } fn wait_for_server(timeout_s: u32, client: &AwClient) { @@ -115,13 +115,17 @@ mod test { use aw_server::endpoints::ServerState; let state = ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: AssetResolver::new(None), device_id: "test_id".to_string(), }; - let mut aw_config = aw_server::config::AWConfig::default(); - aw_config.port = port; - aw_config.auth.api_key = api_key.map(str::to_owned); + let aw_config = aw_server::config::AWConfig { + port, + auth: aw_server::config::AWAuthConfig { + api_key: api_key.map(str::to_owned), + }, + ..Default::default() + }; let server = aw_server::endpoints::build_rocket(state, aw_config); let server = block_on(server.ignite()).unwrap(); let shutdown_handler = server.shutdown(); diff --git a/aw-datastore/Cargo.toml b/aw-datastore/Cargo.toml index 9e685b15..e00bed4f 100644 --- a/aw-datastore/Cargo.toml +++ b/aw-datastore/Cargo.toml @@ -29,3 +29,7 @@ zeroize = { version = "1", optional = true, features = ["alloc"] } aw-models = { path = "../aw-models" } aw-transform = { path = "../aw-transform" } regex = "1" + +[dev-dependencies] +# Used by migration tests to construct databases with old schema versions +rusqlite = { version = "0.30", features = ["chrono", "serde_json"] } diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index a8553112..20cd3634 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -29,8 +29,9 @@ fn _get_db_version(conn: &Connection) -> i32 { * 2: Added 'data' field to 'buckets' table * 3: see: https://github.com/ActivityWatch/aw-server-rust/pull/52 * 4: Added 'key_value' table for storing key - value pairs + * 5: Replaced single-column events indexes with a composite index */ -static NEWEST_DB_VERSION: i32 = 4; +static NEWEST_DB_VERSION: i32 = 5; fn _create_tables(conn: &Connection, version: i32) -> bool { let mut first_init = false; @@ -52,6 +53,10 @@ fn _create_tables(conn: &Connection, version: i32) -> bool { _migrate_v3_to_v4(conn); } + if version < 5 { + _migrate_v4_to_v5(conn); + } + first_init } @@ -169,6 +174,39 @@ fn _migrate_v3_to_v4(conn: &Connection) { .expect("Failed to update database version!"); } +fn _migrate_v4_to_v5(conn: &Connection) { + info!( + "Upgrading database to v5, replacing single-column events indexes with a composite index" + ); + // Every event query filters on bucketrow and a starttime/endtime range, + // ordered by starttime. A composite index serves the seek, the range scan + // and the ORDER BY in one pass (with endtime checked from the index + // without fetching the row), where the single-column indexes could only + // cover one predicate and left the rest as scan + sort. Dropping them + // also makes inserts cheaper (one index to maintain instead of three). + // + // starttime is DESC so a forward scan yields the query's newest-first + // order with equal-timestamp events in rowid (insertion) order, matching + // the ordering callers observed before this index existed. + // + // The drops run before the create so the pages they free are reused to + // build the new index within the same transaction; creating first would + // permanently grow the database file by the new index's size. + conn.execute_batch( + " + BEGIN EXCLUSIVE TRANSACTION; + DROP INDEX IF EXISTS events_bucketrow_index; + DROP INDEX IF EXISTS events_starttime_index; + DROP INDEX IF EXISTS events_endtime_index; + CREATE INDEX IF NOT EXISTS events_bucketrow_starttime_endtime_index + ON events(bucketrow, starttime DESC, endtime); + PRAGMA user_version = 5; + COMMIT; + ", + ) + .expect("Failed to run v5 migration transaction"); +} + pub struct DatastoreInstance { buckets_cache: HashMap, first_init: bool, @@ -207,7 +245,7 @@ impl DatastoreInstance { } fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT buckets.id, buckets.name, buckets.type, buckets.client, buckets.hostname, buckets.created, @@ -324,7 +362,7 @@ impl DatastoreInstance { Some(created) => Some(created), None => Some(Utc::now()), }; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT INTO buckets (name, type, client, hostname, created, data) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", @@ -430,7 +468,7 @@ impl DatastoreInstance { ) -> Result, DatastoreError> { let mut bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data) VALUES (?1, ?2, ?3, ?4, ?5)", @@ -484,7 +522,7 @@ impl DatastoreInstance { event_ids: Vec, ) -> Result<(), DatastoreError> { let bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " DELETE FROM events WHERE bucketrow = ?1 AND id = ?2", @@ -557,7 +595,7 @@ impl DatastoreInstance { let mut bucket = self.get_bucket(bucket_id)?; // Use event ID directly instead of max(endtime) to avoid mismatch with get_events ordering - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " UPDATE events SET starttime = ?2, endtime = ?3, data = ?4 @@ -665,7 +703,7 @@ impl DatastoreInstance { ) -> Result { let bucket = self.get_bucket(bucket_id)?; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT id, starttime, endtime, data FROM events @@ -743,7 +781,7 @@ impl DatastoreInstance { None => -1, }; - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT id, starttime, endtime, data FROM events @@ -866,7 +904,7 @@ impl DatastoreInstance { return Ok(0); } - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT count(*) FROM events WHERE bucketrow = ?1 @@ -906,7 +944,7 @@ impl DatastoreInstance { key: &str, data: &str, ) -> Result<(), DatastoreError> { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " INSERT OR REPLACE INTO key_value(key, value, last_modified) VALUES (?1, ?2, ?3)", @@ -932,7 +970,7 @@ impl DatastoreInstance { } pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result { - let mut stmt = match conn.prepare( + let mut stmt = match conn.prepare_cached( " SELECT * FROM key_value WHERE KEY = ?1", ) { @@ -962,14 +1000,15 @@ impl DatastoreInstance { conn: &Connection, pattern: &str, ) -> Result, DatastoreError> { - let mut stmt = match conn.prepare("SELECT key, value FROM key_value WHERE key LIKE ?") { - Ok(stmt) => stmt, - Err(err) => { - return Err(DatastoreError::InternalError(format!( - "Failed to prepare get_value SQL statement: {err}" - ))) - } - }; + let mut stmt = + match conn.prepare_cached("SELECT key, value FROM key_value WHERE key LIKE ?") { + Ok(stmt) => stmt, + Err(err) => { + return Err(DatastoreError::InternalError(format!( + "Failed to prepare get_value SQL statement: {err}" + ))) + } + }; let mut output = HashMap::::new(); // Rusqlite's get wants index and item type as parameters. diff --git a/aw-datastore/src/privacy_filter.rs b/aw-datastore/src/privacy_filter.rs index 6818d2c0..4d53136b 100644 --- a/aw-datastore/src/privacy_filter.rs +++ b/aw-datastore/src/privacy_filter.rs @@ -143,7 +143,7 @@ impl PrivacyFilterEngine { .map_err(|e| format!("Failed to parse privacy filter rules: {e}"))?; for rule in &rules { if rule.action == PrivacyFilterAction::Redact - && rule.replacement.as_deref().map_or(true, str::is_empty) + && rule.replacement.as_deref().is_none_or(str::is_empty) { return Err(format!( "Redact rule with pattern {:?} is missing `replacement` — add a non-empty replacement string or use action=drop", diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index 9a3123aa..4df7ef4d 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -19,8 +19,6 @@ use crate::DatastoreError; use crate::DatastoreInstance; use crate::DatastoreMethod; -use mpsc_requests::ResponseReceiver; - type RequestSender = mpsc_requests::RequestSender>; type RequestReceiver = mpsc_requests::RequestReceiver>; @@ -83,15 +81,10 @@ pub enum Command { Close(), } -fn _unwrap_response( - receiver: ResponseReceiver>, -) -> Result<(), DatastoreError> { - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), +fn _unwrap_empty_response(response: Response) -> Result<(), DatastoreError> { + match response { + Response::Empty() => Ok(()), + _ => panic!("Invalid response"), } } @@ -408,50 +401,50 @@ impl Datastore { Datastore { requester } } + /// Send a command to the worker thread and wait for its response. + /// + /// Fails with `InternalError` instead of panicking when the worker thread + /// is gone (e.g. it panicked on an earlier request), so callers such as + /// HTTP endpoints can degrade to a 5xx response instead of crashing the + /// request. + fn request(&self, cmd: Command) -> Result { + let receiver = self.requester.request(cmd).map_err(|e| { + DatastoreError::InternalError(format!( + "Failed to send request, datastore worker is gone: {e:?}" + )) + })?; + receiver.collect().map_err(|e| { + DatastoreError::InternalError(format!( + "Failed to receive response, datastore worker died while handling request: {e:?}" + )) + })? + } + pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> { let cmd = Command::CreateBucket(bucket.clone()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(_) => Ok(()), - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError> { let cmd = Command::DeleteBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn get_bucket(&self, bucket_id: &str) -> Result { let cmd = Command::GetBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Bucket(b) => Ok(b), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Bucket(b) => Ok(b), + _ => panic!("Invalid response"), } } pub fn get_buckets(&self) -> Result, DatastoreError> { let cmd = Command::GetBuckets(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::BucketMap(bm) => Ok(bm), - e => Err(DatastoreError::InternalError(format!( - "Invalid response: {e:?}" - ))), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::BucketMap(bm) => Ok(bm), + e => Err(DatastoreError::InternalError(format!( + "Invalid response: {e:?}" + ))), } } @@ -461,13 +454,9 @@ impl Datastore { events: &[Event], ) -> Result, DatastoreError> { let cmd = Command::InsertEvents(bucket_id.to_string(), events.to_vec()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(events) => Ok(events), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(events) => Ok(events), + _ => panic!("Invalid response"), } } @@ -478,25 +467,17 @@ impl Datastore { pulsetime: f64, ) -> Result { let cmd = Command::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Event(e) => Ok(e), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Event(e) => Ok(e), + _ => panic!("Invalid response"), } } pub fn get_event(&self, bucket_id: &str, event_id: i64) -> Result { let cmd = Command::GetEvent(bucket_id.to_string(), event_id); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Event(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Event(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -514,13 +495,9 @@ impl Datastore { limit_opt, false, ); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -538,13 +515,9 @@ impl Datastore { limit_opt, true, ); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::EventList(el) => Ok(el), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::EventList(el) => Ok(el), + _ => panic!("Invalid response"), } } @@ -555,13 +528,9 @@ impl Datastore { endtime_opt: Option>, ) -> Result { let cmd = Command::GetEventCount(bucket_id.to_string(), starttime_opt, endtime_opt); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Count(n) => Ok(n), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::Count(n) => Ok(n), + _ => panic!("Invalid response"), } } @@ -571,87 +540,52 @@ impl Datastore { event_ids: Vec, ) -> Result<(), DatastoreError> { let cmd = Command::DeleteEventsById(bucket_id.to_string(), event_ids); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn force_commit(&self) -> Result<(), DatastoreError> { let cmd = Command::ForceCommit(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => Ok(()), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), - } + _unwrap_empty_response(self.request(cmd)?) } pub fn get_key_values(&self, pattern: &str) -> Result, DatastoreError> { let cmd = Command::GetKeyValues(pattern.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::KeyValues(value) => Ok(value), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::KeyValues(value) => Ok(value), + _ => panic!("Invalid response"), } } pub fn get_key_value(&self, key: &str) -> Result { let cmd = Command::GetKeyValue(key.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::KeyValue(kv) => Ok(kv), - _ => panic!("Invalid response"), - }, - Err(e) => Err(e), + match self.request(cmd)? { + Response::KeyValue(kv) => Ok(kv), + _ => panic!("Invalid response"), } } pub fn set_key_value(&self, key: &str, data: &str) -> Result<(), DatastoreError> { let cmd = Command::SetKeyValue(key.to_string(), data.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - _unwrap_response(receiver) + _unwrap_empty_response(self.request(cmd)?) } pub fn delete_key_value(&self, key: &str) -> Result<(), DatastoreError> { let cmd = Command::DeleteKeyValue(key.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - - _unwrap_response(receiver) + _unwrap_empty_response(self.request(cmd)?) } pub fn refresh_privacy_filter(&self) -> Result<(), DatastoreError> { - let receiver = self - .requester - .request(Command::RefreshPrivacyFilter()) - .unwrap(); - _unwrap_response(receiver) + _unwrap_empty_response(self.request(Command::RefreshPrivacyFilter())?) } // Should block until worker has stopped pub fn close(&self) { info!("Sending close request to database"); - let receiver = self.requester.request(Command::Close()).unwrap(); - - match receiver.collect().unwrap() { - Ok(r) => match r { - Response::Empty() => (), - _ => panic!("Invalid response"), - }, - Err(e) => panic!("Error closing database: {:?}", e), + match self.request(Command::Close()) { + Ok(Response::Empty()) => (), + Ok(_) => panic!("Invalid response"), + // Worker already gone means there is nothing left to close + Err(e) => warn!("Error closing database: {e:?}"), } } } diff --git a/aw-datastore/tests/datastore.rs b/aw-datastore/tests/datastore.rs index cb6cdddd..fe353e2a 100644 --- a/aw-datastore/tests/datastore.rs +++ b/aw-datastore/tests/datastore.rs @@ -470,6 +470,99 @@ mod datastore_tests { } } + #[test] + fn test_migration_v4_to_v5() { + let mut db_path = get_cache_dir().unwrap(); + db_path.push("datastore-unittest-migration-v4.db"); + let db_path_str = db_path.to_str().unwrap().to_string(); + + if db_path.exists() { + std::fs::remove_file(&db_path) + .expect("Failed to remove datastore-unittest-migration-v4.db file"); + } + + // Construct a database with the v4 schema (single-column indexes) + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + conn.execute_batch( + r#" + CREATE TABLE buckets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + type TEXT NOT NULL, + client TEXT NOT NULL, + hostname TEXT NOT NULL, + created TEXT NOT NULL, + data TEXT NOT NULL DEFAULT '{}' + ); + CREATE INDEX bucket_id_index ON buckets(id); + CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucketrow INTEGER NOT NULL, + starttime INTEGER NOT NULL, + endtime INTEGER NOT NULL, + data TEXT NOT NULL, + FOREIGN KEY (bucketrow) REFERENCES buckets(id) + ); + CREATE INDEX events_bucketrow_index ON events(bucketrow); + CREATE INDEX events_starttime_index ON events(starttime); + CREATE INDEX events_endtime_index ON events(endtime); + CREATE TABLE key_value ( + key TEXT PRIMARY KEY, + value TEXT, + last_modified NUMBER NOT NULL + ); + INSERT INTO buckets (name, type, client, hostname, created, data) + VALUES ('testid', 'testtype', 'testclient', 'testhost', + '2024-01-01T00:00:00+00:00', '{}'); + INSERT INTO events (bucketrow, starttime, endtime, data) + VALUES (1, 1000000000, 2000000000, '{"key": "value"}'); + PRAGMA user_version = 4; + "#, + ) + .unwrap(); + } + + // Opening the datastore migrates to the newest version + { + let ds = Datastore::new(db_path_str, false); + let events = ds.get_events("testid", None, None, None).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].data, json_map! {"key": json!("value")}); + ds.close(); + } + + // Verify version bump and index replacement + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + let version: i32 = conn + .pragma_query_value(None, "user_version", |row| row.get(0)) + .unwrap(); + assert_eq!(version, 5); + let old_indexes: i64 = conn + .query_row( + "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name IN + ('events_bucketrow_index', 'events_starttime_index', 'events_endtime_index')", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(old_indexes, 0, "single-column indexes should be dropped"); + let new_index: i64 = conn + .query_row( + "SELECT count(*) FROM sqlite_master WHERE type = 'index' + AND name = 'events_bucketrow_starttime_endtime_index'", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(new_index, 1, "composite index should exist"); + } + + std::fs::remove_file(&db_path) + .expect("Failed to remove datastore-unittest-migration-v4.db file"); + } + #[test] fn test_datastore_reload() { // Create tmp datastore path @@ -497,7 +590,7 @@ mod datastore_tests { ds.create_bucket(&empty_bucket).unwrap(); ds.create_bucket(&populated_bucket).unwrap(); // Insert event - ds.insert_events(&populated_bucket.id, &[e1.clone()]) + ds.insert_events(&populated_bucket.id, std::slice::from_ref(&e1)) .unwrap(); // Check that all cached bucket data is correct diff --git a/aw-query/src/datatype.rs b/aw-query/src/datatype.rs index f92c1c44..c2983b0c 100644 --- a/aw-query/src/datatype.rs +++ b/aw-query/src/datatype.rs @@ -96,52 +96,79 @@ impl PartialEq for DataType { } } -impl TryFrom<&DataType> for Vec { +/* Conversions out of DataType. + * + * The by-value impls move the contained data out and are what query functions + * should use, since functions own their argument vector. The by-ref impls + * delegate via a clone and only exist for callers that cannot give up + * ownership — going through them deep-copies every contained event. */ + +impl TryFrom for Vec { type Error = QueryError; - fn try_from(value: &DataType) -> Result { + fn try_from(value: DataType) -> Result { match value { - DataType::List(ref s) => Ok(s.clone()), - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + DataType::List(s) => Ok(s), + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type List, got {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for String { +impl TryFrom<&DataType> for Vec { type Error = QueryError; fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + +impl TryFrom for String { + type Error = QueryError; + fn try_from(value: DataType) -> Result { match value { - DataType::String(s) => Ok(s.clone()), - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + DataType::String(s) => Ok(s), + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type String, list contains {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for String { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_strings: Vec = value.try_into()?; - let mut strings = Vec::new(); - for string in tagged_strings.drain(..) { - let s: String = (&string).try_into()?; - strings.push(s); + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_strings: Vec = value.try_into()?; + let mut strings = Vec::with_capacity(tagged_strings.len()); + for string in tagged_strings { + strings.push(string.try_into()?); } Ok(strings) } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for Vec { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_events: Vec = value.try_into()?; - let mut events = Vec::new(); - for event in tagged_events.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_events: Vec = value.try_into()?; + let mut events = Vec::with_capacity(tagged_events.len()); + for event in tagged_events { match event { - DataType::Event(e) => events.push(e.clone()), - ref invalid_type => { + DataType::Event(e) => events.push(e), + invalid_type => { return Err(QueryError::InvalidFunctionParameters(format!( "Expected function parameter of type List of Events, list contains {invalid_type:?}" ))) @@ -152,15 +179,29 @@ impl TryFrom<&DataType> for Vec { } } +impl TryFrom<&DataType> for Vec { + type Error = QueryError; + fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + impl TryFrom<&DataType> for Vec<(String, Rule)> { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_lists: Vec = value.try_into()?; - let mut lists: Vec<(String, Rule)> = Vec::new(); - for list in tagged_lists.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec<(String, Rule)> { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_lists: Vec = value.try_into()?; + let mut lists: Vec<(String, Rule)> = Vec::with_capacity(tagged_lists.len()); + for list in tagged_lists { match list { DataType::List(ref l) => { - let tag: String = match l.get(0) { + let tag: String = match l.first() { Some(tag) => tag.try_into()?, None => return Err(QueryError::InvalidFunctionParameters( format!("Expected function parameter of type list of (tag, rule) tuples, list contains {l:?}"))) @@ -186,12 +227,19 @@ impl TryFrom<&DataType> for Vec<(String, Rule)> { impl TryFrom<&DataType> for Vec<(Vec, Rule)> { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_lists: Vec = value.try_into()?; - let mut lists: Vec<(Vec, Rule)> = Vec::new(); - for list in tagged_lists.drain(..) { + value.clone().try_into() + } +} + +impl TryFrom for Vec<(Vec, Rule)> { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_lists: Vec = value.try_into()?; + let mut lists: Vec<(Vec, Rule)> = Vec::with_capacity(tagged_lists.len()); + for list in tagged_lists { match list { DataType::List(ref l) => { - let category: Vec = match l.get(0) { + let category: Vec = match l.first() { Some(category) => category.try_into()?, None => return Err(QueryError::InvalidFunctionParameters( format!("Expected function parameter of type list of (category, rule) tuples, list contains {l:?}"))) @@ -226,6 +274,13 @@ impl TryFrom<&DataType> for f64 { } } +impl TryFrom for f64 { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + (&value).try_into() + } +} + impl TryFrom<&DataType> for usize { type Error = QueryError; fn try_from(value: &DataType) -> Result { @@ -234,41 +289,61 @@ impl TryFrom<&DataType> for usize { } } -impl TryFrom<&DataType> for Value { +impl TryFrom for usize { type Error = QueryError; - fn try_from(value: &DataType) -> Result { + fn try_from(value: DataType) -> Result { + (&value).try_into() + } +} + +impl TryFrom for Value { + type Error = QueryError; + fn try_from(value: DataType) -> Result { match value { DataType::None() => Ok(Value::Null), - DataType::Bool(b) => Ok(Value::Bool(*b)), - DataType::Number(n) => Ok(Value::Number(Number::from_f64(*n).unwrap())), - DataType::String(s) => Ok(Value::String(s.to_string())), - DataType::List(_l) => { - let mut tagged_values: Vec = value.try_into()?; - let mut values: Vec = Vec::new(); - for value in tagged_values.drain(..) { - values.push((&value).try_into()?); + DataType::Bool(b) => Ok(Value::Bool(b)), + DataType::Number(n) => Ok(Value::Number(Number::from_f64(n).unwrap())), + DataType::String(s) => Ok(Value::String(s)), + DataType::List(l) => { + let mut values: Vec = Vec::with_capacity(l.len()); + for value in l { + values.push(value.try_into()?); } Ok(Value::Array(values)) } - ref invalid_type => Err(QueryError::InvalidFunctionParameters(format!( + invalid_type => Err(QueryError::InvalidFunctionParameters(format!( "Query2 support for parsing values is limited, does not support parsing {invalid_type:?}" ))), } } } -impl TryFrom<&DataType> for Vec { +impl TryFrom<&DataType> for Value { type Error = QueryError; fn try_from(value: &DataType) -> Result { - let mut tagged_values: Vec = value.try_into()?; - let mut values: Vec = Vec::new(); - for value in tagged_values.drain(..) { - values.push((&value).try_into()?); + value.clone().try_into() + } +} + +impl TryFrom for Vec { + type Error = QueryError; + fn try_from(value: DataType) -> Result { + let tagged_values: Vec = value.try_into()?; + let mut values: Vec = Vec::with_capacity(tagged_values.len()); + for value in tagged_values { + values.push(value.try_into()?); } Ok(values) } } +impl TryFrom<&DataType> for Vec { + type Error = QueryError; + fn try_from(value: &DataType) -> Result { + value.clone().try_into() + } +} + impl TryFrom<&DataType> for Rule { type Error = QueryError; diff --git a/aw-query/src/functions.rs b/aw-query/src/functions.rs index 4a52c4a2..582ef62b 100644 --- a/aw-query/src/functions.rs +++ b/aw-query/src/functions.rs @@ -144,7 +144,7 @@ mod qfunctions { // Typecheck validate::args_length(&args, 1)?; - let bucket_id: String = (&args[0]).try_into()?; + let bucket_id: String = args.into_iter().next().unwrap().try_into()?; let interval = validate::get_timeinterval(env)?; let events = match ds.get_events( @@ -195,10 +195,11 @@ mod qfunctions { ) -> Result { validate::args_length(&args, 1).or_else(|_| validate::args_length(&args, 2))?; - let bucket_filter: String = (&args[0]).try_into()?; - let hostname_filter: Option = match args.len() { - 2 => Some((&args[1]).try_into()?), - _ => None, + let mut args = args.into_iter(); + let bucket_filter: String = args.next().unwrap().try_into()?; + let hostname_filter: Option = match args.next() { + Some(arg) => Some(arg.try_into()?), + None => None, }; let buckets = match ds.get_buckets() { @@ -236,7 +237,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - match args.get(0).unwrap() { + match args.first().unwrap() { DataType::List(ref list) => Ok(DataType::Bool(list.contains(&args[1]))), DataType::Dict(ref dict) => { let s = match &args[1] { @@ -264,7 +265,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Run flood let mut flooded_events = aw_transform::flood(events, chrono::Duration::seconds(5)); // Put events back into DataType::Event container @@ -282,8 +283,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = Vec::try_from(&args[0])?; - let rules: Vec<(Vec, Rule)> = Vec::try_from(&args[1])?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let rules: Vec<(Vec, Rule)> = args.next().unwrap().try_into()?; // Run categorize let mut flooded_events = aw_transform::classify::categorize(events, &rules); // Put events back into DataType::Event container @@ -301,8 +303,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = Vec::try_from(&args[0])?; - let rules: Vec<(String, Rule)> = Vec::try_from(&args[1])?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let rules: Vec<(String, Rule)> = args.next().unwrap().try_into()?; // Run categorize let mut flooded_events = aw_transform::classify::tag(events, &rules); // Put events back into DataType::Event container @@ -320,7 +323,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sorted_events = aw_transform::sort_by_duration(events); @@ -339,8 +342,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let mut events: Vec = (&args[0]).try_into()?; - let mut limit: usize = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let mut events: Vec = args.next().unwrap().try_into()?; + let mut limit: usize = args.next().unwrap().try_into()?; if events.len() < limit { limit = events.len() @@ -359,7 +363,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let events: Vec = (&args[0]).try_into()?; + let events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sorted_events = aw_transform::sort_by_timestamp(events); @@ -378,12 +382,12 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let mut events: Vec = (&args[0]).try_into()?; + let mut events: Vec = args.into_iter().next().unwrap().try_into()?; // Sort by duration let mut sum_durations = chrono::Duration::zero(); for event in events.drain(..) { - sum_durations = sum_durations + event.duration; + sum_durations += event.duration; } Ok(DataType::Number( (sum_durations.num_milliseconds() as f64) / 1000.0, @@ -397,8 +401,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let keys: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let keys: Vec = args.next().unwrap().try_into()?; let mut merged_events = aw_transform::merge_events_by_keys(events, keys); let mut merged_tagged_events = Vec::new(); @@ -415,8 +420,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; let mut merged_events = aw_transform::chunk_events_by_key(events, &key); let mut merged_tagged_events = Vec::new(); @@ -433,9 +439,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let vals: Vec<_> = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let vals: Vec<_> = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::filter_keyvals(events, &key, &vals); let mut filtered_tagged_events = Vec::new(); @@ -454,9 +461,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let regex_str: String = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let regex_str: String = args.next().unwrap().try_into()?; let regex = match RegexBuilder::new(®ex_str).build() { Ok(regex) => regex, Err(e) => { @@ -481,9 +489,10 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 3)?; - let events = (&args[0]).try_into()?; - let key: String = (&args[1]).try_into()?; - let vals: Vec<_> = (&args[2]).try_into()?; + let mut args = args.into_iter(); + let events = args.next().unwrap().try_into()?; + let key: String = args.next().unwrap().try_into()?; + let vals: Vec<_> = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::exclude_keyvals(events, &key, &vals); let mut filtered_tagged_events = Vec::new(); @@ -500,8 +509,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events: Vec = (&args[0]).try_into()?; - let filter_events: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events: Vec = args.next().unwrap().try_into()?; + let filter_events: Vec = args.next().unwrap().try_into()?; let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events); let mut filtered_tagged_events = Vec::new(); @@ -518,7 +528,7 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 1)?; - let mut events: Vec = (&args[0]).try_into()?; + let mut events: Vec = args.into_iter().next().unwrap().try_into()?; let mut tagged_split_url_events = Vec::new(); for mut event in events.drain(..) { @@ -535,7 +545,7 @@ mod qfunctions { ) -> Result { let mut event_list = Vec::new(); for arg in args { - let mut events: Vec = (&arg).try_into()?; + let mut events: Vec = arg.try_into()?; for event in events.drain(..) { event_list.push(DataType::Event(event)); } @@ -550,8 +560,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events1: Vec = (&args[0]).try_into()?; - let events2: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events1: Vec = args.next().unwrap().try_into()?; + let events2: Vec = args.next().unwrap().try_into()?; let mut result = aw_transform::period_union(&events1, &events2); let mut result_tagged = Vec::new(); @@ -568,8 +579,9 @@ mod qfunctions { ) -> Result { // typecheck validate::args_length(&args, 2)?; - let events1: Vec = (&args[0]).try_into()?; - let events2: Vec = (&args[1]).try_into()?; + let mut args = args.into_iter(); + let events1: Vec = args.next().unwrap().try_into()?; + let events2: Vec = args.next().unwrap().try_into()?; let mut result = aw_transform::union_no_overlap(events1, events2); let mut result_tagged = Vec::new(); diff --git a/aw-query/src/interpret.rs b/aw-query/src/interpret.rs index 80f4aa97..26020874 100644 --- a/aw-query/src/interpret.rs +++ b/aw-query/src/interpret.rs @@ -184,7 +184,9 @@ fn interpret_expr( env.insert(var, val); Ok(DataType::None()) } - // FIXME: avoid clone, it's slow + // This clone is the one necessary copy per variable reference: the env + // must retain the value since it may be referenced again, while + // function arguments and transforms need owned events. Var(var) => match env.get(&var) { Some(v) => Ok(v.clone()), None => Err(QueryError::VariableNotDefined(var.to_string())), @@ -237,7 +239,7 @@ fn interpret_expr( let mut dict = HashMap::new(); for (key, val_uninterpreted) in d { let val = interpret_expr(env, ds, val_uninterpreted)?; - dict.insert(key.clone(), val); + dict.insert(key, val); } Ok(DataType::Dict(dict)) } diff --git a/aw-query/src/parser.rs b/aw-query/src/parser.rs index b7541a1c..f4d6e776 100644 --- a/aw-query/src/parser.rs +++ b/aw-query/src/parser.rs @@ -1,3 +1,6 @@ +#![allow(clippy::ptr_arg)] +#![allow(clippy::vec_init_then_push)] + use crate::ast::*; use crate::lexer::Token::*; use crate::lexer::*; diff --git a/aw-query/tests/query.rs b/aw-query/tests/query.rs index 9916df5d..90cc7480 100644 --- a/aw-query/tests/query.rs +++ b/aw-query/tests/query.rs @@ -76,9 +76,7 @@ mod query_tests { e_replace.data = json_map! {"key": json!("value2")}; e_replace.duration = Duration::seconds(2); - let mut event_list = Vec::new(); - event_list.push(e1); - event_list.push(e2); + let event_list = vec![e1, e2]; ds.insert_events(BUCKET_ID, &event_list).unwrap(); @@ -622,9 +620,7 @@ mod query_tests { // Append lists let code = String::from("return [1]+[2];"); let res = aw_query::query(&code, &interval, &ds).unwrap(); - let mut v = Vec::new(); - v.push(DataType::Number(1.0)); - v.push(DataType::Number(2.0)); + let v = vec![DataType::Number(1.0), DataType::Number(2.0)]; assert_eq!(res, DataType::List(v)); // Append strings diff --git a/aw-server/src/android/mod.rs b/aw-server/src/android/mod.rs index cb41c509..fae6140c 100644 --- a/aw-server/src/android/mod.rs +++ b/aw-server/src/android/mod.rs @@ -36,7 +36,6 @@ pub mod android { use super::*; use std::path::PathBuf; - use std::sync::Mutex; use crate::endpoints; use crate::endpoints::ServerState; @@ -120,7 +119,7 @@ pub mod android { // FIXME: Why is unsafe needed here? Can we get rid of it? unsafe { let server_state: ServerState = endpoints::ServerState { - datastore: Mutex::new(openDatastore()), + datastore: openDatastore(), asset_resolver: endpoints::AssetResolver::new(None), device_id: device_id::get_device_id(), }; diff --git a/aw-server/src/config.rs b/aw-server/src/config.rs index 6e09bf42..e0aafc2a 100644 --- a/aw-server/src/config.rs +++ b/aw-server/src/config.rs @@ -89,7 +89,6 @@ impl AWConfig { config.address = self.address.parse().unwrap(); config.port = self.port; - config.keep_alive = 0; config.limits = limits; config diff --git a/aw-server/src/endpoints/apikey.rs b/aw-server/src/endpoints/apikey.rs index f7bfcfe7..618894a0 100644 --- a/aw-server/src/endpoints/apikey.rs +++ b/aw-server/src/endpoints/apikey.rs @@ -149,7 +149,6 @@ impl Fairing for ApiKeyCheck { #[cfg(test)] mod tests { - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use rocket::Rocket; @@ -159,7 +158,7 @@ mod tests { fn setup_testserver(api_key: Option) -> Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; diff --git a/aw-server/src/endpoints/bucket.rs b/aw-server/src/endpoints/bucket.rs index b53d12af..fc97937c 100644 --- a/aw-server/src/endpoints/bucket.rs +++ b/aw-server/src/endpoints/bucket.rs @@ -21,7 +21,7 @@ use crate::endpoints::{HttpErrorJson, ServerState}; pub fn buckets_get( state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.get_buckets() { Ok(bucketlist) => Ok(Json(bucketlist)), Err(err) => Err(err.into()), @@ -33,8 +33,8 @@ pub fn bucket_get( bucket_id: &str, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); - match datastore.get_bucket(&bucket_id) { + let datastore = &state.datastore; + match datastore.get_bucket(bucket_id) { Ok(bucket) => Ok(Json(bucket)), Err(e) => Err(e.into()), } @@ -62,7 +62,7 @@ pub fn bucket_new( .data .insert("device_id".to_string(), state.device_id.clone().into()); } - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let ret = datastore.create_bucket(&bucket); match ret { Ok(_) => Ok(()), @@ -103,7 +103,7 @@ pub fn bucket_events_get( }, None => None, }; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_events(bucket_id, starttime, endtime, limit); match res { Ok(events) => Ok(Json(events)), @@ -120,7 +120,7 @@ pub fn bucket_events_get_single( _unused: Option, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_event(bucket_id, event_id); match res { Ok(events) => Ok(Json(events)), @@ -134,7 +134,7 @@ pub fn bucket_events_create( events: Json>, state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.insert_events(bucket_id, &events); match res { Ok(events) => Ok(Json(events)), @@ -154,7 +154,7 @@ pub fn bucket_events_heartbeat( state: &State, ) -> Result, HttpErrorJson> { let heartbeat = heartbeat_json.into_inner(); - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.heartbeat(bucket_id, heartbeat, pulsetime) { Ok(e) => Ok(Json(e)), Err(err) => Err(err.into()), @@ -166,7 +166,7 @@ pub fn bucket_event_count( bucket_id: &str, state: &State, ) -> Result, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let res = datastore.get_event_count(bucket_id, None, None); match res { Ok(eventcount) => Ok(Json(eventcount as u64)), @@ -180,7 +180,7 @@ pub fn bucket_events_delete_by_id( event_id: i64, state: &State, ) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.delete_events_by_id(bucket_id, vec![event_id]) { Ok(_) => Ok(()), Err(err) => Err(err.into()), @@ -192,14 +192,11 @@ pub fn bucket_export( bucket_id: &str, state: &State, ) -> Result { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let mut export = BucketsExport { buckets: HashMap::new(), }; - let mut bucket = match datastore.get_bucket(bucket_id) { - Ok(bucket) => bucket, - Err(err) => return Err(err.into()), - }; + let mut bucket = datastore.get_bucket(bucket_id)?; /* TODO: Replace expect with http error */ let events = datastore .get_events(bucket_id, None, None, None) @@ -212,7 +209,7 @@ pub fn bucket_export( #[delete("/")] pub fn bucket_delete(bucket_id: &str, state: &State) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.delete_bucket(bucket_id) { Ok(_) => Ok(()), Err(err) => Err(err.into()), diff --git a/aw-server/src/endpoints/export.rs b/aw-server/src/endpoints/export.rs index eced6ba6..4b7653d8 100644 --- a/aw-server/src/endpoints/export.rs +++ b/aw-server/src/endpoints/export.rs @@ -10,19 +10,13 @@ use crate::endpoints::{HttpErrorJson, ServerState}; #[get("/")] pub fn buckets_export(state: &State) -> Result { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let mut export = BucketsExport { buckets: HashMap::new(), }; - let mut buckets = match datastore.get_buckets() { - Ok(buckets) => buckets, - Err(err) => return Err(err.into()), - }; + let mut buckets = datastore.get_buckets()?; for (bid, mut bucket) in buckets.drain() { - let events = match datastore.get_events(&bid, None, None, None) { - Ok(events) => events, - Err(err) => return Err(err.into()), - }; + let events = datastore.get_events(&bid, None, None, None)?; bucket.events = Some(TryVec::new(events)); export.buckets.insert(bid, bucket); } diff --git a/aw-server/src/endpoints/hostcheck.rs b/aw-server/src/endpoints/hostcheck.rs index 6f583cdf..e5cd9478 100644 --- a/aw-server/src/endpoints/hostcheck.rs +++ b/aw-server/src/endpoints/hostcheck.rs @@ -114,7 +114,6 @@ impl Fairing for HostCheck { #[cfg(test)] mod tests { - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use rocket::Rocket; @@ -124,12 +123,14 @@ mod tests { fn setup_testserver(address: String) -> Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; - let mut aw_config = AWConfig::default(); - aw_config.address = address; + let aw_config = AWConfig { + address, + ..AWConfig::default() + }; endpoints::build_rocket(state, aw_config) } diff --git a/aw-server/src/endpoints/import.rs b/aw-server/src/endpoints/import.rs index df79cc77..d6d0ca2e 100644 --- a/aw-server/src/endpoints/import.rs +++ b/aw-server/src/endpoints/import.rs @@ -4,7 +4,6 @@ use rocket::serde::json::Json; use rocket::State; use std::collections::{BTreeMap, HashSet}; -use std::sync::Mutex; use aw_models::{BucketsExport, Event}; @@ -39,8 +38,7 @@ fn event_identity( Ok((event.timestamp, duration_ns, data_json)) } -fn import(datastore_mutex: &Mutex, import: BucketsExport) -> Result<(), HttpErrorJson> { - let datastore = endpoints_get_lock!(datastore_mutex); +fn import(datastore: &Datastore, import: BucketsExport) -> Result<(), HttpErrorJson> { for (_bucketname, mut bucket) in import.buckets { match datastore.create_bucket(&bucket) { Ok(_) => (), diff --git a/aw-server/src/endpoints/mod.rs b/aw-server/src/endpoints/mod.rs index f0621196..761d548e 100644 --- a/aw-server/src/endpoints/mod.rs +++ b/aw-server/src/endpoints/mod.rs @@ -1,7 +1,6 @@ use rust_embed::RustEmbed; use std::ffi::OsStr; use std::path::{Path, PathBuf}; -use std::sync::Mutex; use gethostname::gethostname; use rocket::fs::FileServer; @@ -38,8 +37,12 @@ impl AssetResolver { } } +// The Datastore is just a cheap handle to the DB worker thread (a crossbeam +// channel sender), which serializes all DB access internally. No mutex is +// needed here — wrapping it in one would serialize all HTTP requests, letting +// a slow query block every heartbeat. pub struct ServerState { - pub datastore: Mutex, + pub datastore: Datastore, pub asset_resolver: AssetResolver, pub device_id: String, } diff --git a/aw-server/src/endpoints/query.rs b/aw-server/src/endpoints/query.rs index 91f6aea4..6e717a71 100644 --- a/aw-server/src/endpoints/query.rs +++ b/aw-server/src/endpoints/query.rs @@ -11,9 +11,9 @@ pub fn query(query_req: Json, state: &State) -> Result data, Err(e) => { warn!("Query failed: {:?}", e); diff --git a/aw-server/src/endpoints/settings.rs b/aw-server/src/endpoints/settings.rs index 02ff98d9..5a2aeb5a 100644 --- a/aw-server/src/endpoints/settings.rs +++ b/aw-server/src/endpoints/settings.rs @@ -3,9 +3,8 @@ use rocket::http::Status; use rocket::serde::json::Json; use rocket::State; use std::collections::HashMap; -use std::sync::MutexGuard; -use aw_datastore::{Datastore, DatastoreError}; +use aw_datastore::DatastoreError; use crate::endpoints::HttpErrorJson; @@ -25,7 +24,7 @@ fn parse_key(key: String) -> Result { pub fn settings_get( state: &State, ) -> Result>, HttpErrorJson> { - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let queryresults = match datastore.get_key_values("settings.%") { Ok(result) => Ok(result), Err(err) => Err(err.into()), @@ -53,7 +52,7 @@ pub fn setting_get( key: String, ) -> Result, HttpErrorJson> { let setting_key = parse_key(key)?; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; match datastore.get_key_value(&setting_key) { Ok(value) => Ok(Json(serde_json::from_str(&value).unwrap())), @@ -79,7 +78,7 @@ pub fn setting_set( } }; - let datastore: MutexGuard<'_, Datastore> = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let result = datastore.set_key_value(&setting_key, &value_str); match result { @@ -92,7 +91,7 @@ pub fn setting_set( pub fn setting_delete(state: &State, key: String) -> Result<(), HttpErrorJson> { let setting_key = parse_key(key)?; - let datastore = endpoints_get_lock!(state.datastore); + let datastore = &state.datastore; let result = datastore.delete_key_value(&setting_key); match result { diff --git a/aw-server/src/endpoints/util.rs b/aw-server/src/endpoints/util.rs index fdf4a992..80a758ad 100644 --- a/aw-server/src/endpoints/util.rs +++ b/aw-server/src/endpoints/util.rs @@ -101,18 +101,3 @@ impl From for HttpErrorJson { } } } - -#[macro_export] -macro_rules! endpoints_get_lock { - ( $lock:expr ) => { - match $lock.lock() { - Ok(r) => r, - Err(e) => { - use rocket::http::Status; - let err_msg = format!("Taking datastore lock failed, returning 504: {}", e); - warn!("{}", err_msg); - return Err(HttpErrorJson::new(Status::ServiceUnavailable, err_msg)); - } - } - }; -} diff --git a/aw-server/src/main.rs b/aw-server/src/main.rs index 4589c8c3..9e6e6a1f 100644 --- a/aw-server/src/main.rs +++ b/aw-server/src/main.rs @@ -71,6 +71,7 @@ struct Opts { } #[rocket::main] +#[allow(clippy::result_large_err)] async fn main() -> Result<(), rocket::Error> { let opts: Opts = Opts::parse(); @@ -79,8 +80,6 @@ async fn main() -> Result<(), rocket::Error> { #[cfg(any(feature = "encryption", feature = "encryption-vendored"))] std::env::remove_var("AW_DB_PASSWORD"); - use std::sync::Mutex; - let mut testing = opts.testing; // Always override environment if --testing is specified @@ -142,7 +141,7 @@ async fn main() -> Result<(), rocket::Error> { }; info!("Using DB at path {:?}", db_path); - let asset_path = opts.webpath.map(|webpath| PathBuf::from(webpath)); + let asset_path = opts.webpath.map(PathBuf::from); info!("Using aw-webui assets at path {:?}", asset_path); // Only use legacy import if opts.dbpath is not set @@ -187,7 +186,7 @@ async fn main() -> Result<(), rocket::Error> { let server_state = endpoints::ServerState { // Even if legacy_import is set to true it is disabled on Android so // it will not happen there - datastore: Mutex::new(datastore), + datastore, asset_resolver: endpoints::AssetResolver::new(asset_path), device_id, }; diff --git a/aw-server/tests/api.rs b/aw-server/tests/api.rs index d45b4872..daea97ba 100644 --- a/aw-server/tests/api.rs +++ b/aw-server/tests/api.rs @@ -8,7 +8,6 @@ extern crate aw_server; #[cfg(test)] mod api_tests { use std::collections::HashMap; - use std::sync::Mutex; use rocket::http::{ContentType, Header, Status}; use serde_json::{json, Value}; @@ -21,7 +20,7 @@ mod api_tests { fn setup_testserver() -> rocket::Rocket { let state = endpoints::ServerState { - datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + datastore: aw_datastore::Datastore::new_in_memory(false), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), }; diff --git a/aw-sync/src/sync.rs b/aw-sync/src/sync.rs index bb0bec48..64618561 100644 --- a/aw-sync/src/sync.rs +++ b/aw-sync/src/sync.rs @@ -334,7 +334,7 @@ fn sync_one( // Sort ascending // FIXME: What happens here if two events have the same timestamp? - events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + events.sort_by_key(|a| a.timestamp); // TODO: Do bulk insert using insert_events instead? (for performance) // Client-side heartbeat queueing should keep things somewhat performant though? diff --git a/aw-transform/src/chunk.rs b/aw-transform/src/chunk.rs index 51c9f735..2b8d2588 100644 --- a/aw-transform/src/chunk.rs +++ b/aw-transform/src/chunk.rs @@ -34,7 +34,7 @@ pub fn chunk_events_by_key(events: Vec, key: &str) -> Vec { let last_val = last_event.data.get(key).unwrap().clone(); if &last_val == val { // TODO: Add sub-chunks - last_event.duration = last_event.duration + event.duration; + last_event.duration += event.duration; } chunked_events.push(last_event); if &last_val != val { diff --git a/aw-transform/src/filter_keyvals.rs b/aw-transform/src/filter_keyvals.rs index 3d5d44d1..c4198061 100644 --- a/aw-transform/src/filter_keyvals.rs +++ b/aw-transform/src/filter_keyvals.rs @@ -100,7 +100,7 @@ mod tests { e2.data = json_map! {"test": json!(1), "test2": json!(1)}; let mut e3 = e1.clone(); e3.data = json_map! {"test2": json!(2)}; - let res = filter_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &vec![json!(1)]); + let res = filter_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &[json!(1)]); assert_eq!(vec![e1, e2], res); } @@ -160,7 +160,7 @@ mod tests { e2.data = json_map! {"test": json!(1), "test2": json!(2)}; let mut e3 = e1.clone(); e3.data = json_map! {"test": json!(2)}; - let res = exclude_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &vec![json!(2)]); + let res = exclude_keyvals(vec![e1.clone(), e2.clone(), e3], "test", &[json!(2)]); assert_eq!(vec![e1, e2], res); } } diff --git a/aw-transform/src/filter_period.rs b/aw-transform/src/filter_period.rs index 6af8d5c2..b69f6567 100644 --- a/aw-transform/src/filter_period.rs +++ b/aw-transform/src/filter_period.rs @@ -18,7 +18,7 @@ use crate::sort_by_timestamp; /// output: [a ] [a ][b ] /// ``` pub fn filter_period_intersect(events: Vec, filter_events: Vec) -> Vec { - if events.len() == 0 || filter_events.len() == 0 { + if events.is_empty() || filter_events.is_empty() { return Vec::new(); } diff --git a/aw-transform/src/flood.rs b/aw-transform/src/flood.rs index 8cfeef70..7d6e7ba7 100644 --- a/aw-transform/src/flood.rs +++ b/aw-transform/src/flood.rs @@ -56,7 +56,7 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { } { if let Some(gap) = gap_prev { e1.timestamp -= gap / 2; - e1.duration = e1.duration + (gap / 2); + e1.duration += gap / 2; gap_prev = None; } let e2 = match e1_iter.peek() { @@ -148,7 +148,7 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { continue; } else { // Extend e1 to middle of the gap. - e1.duration = e1.duration + (gap / 2); + e1.duration += gap / 2; // Make sure next event (e2) is gets extended before it's processed gap_prev = Some(gap); diff --git a/aw-transform/src/merge.rs b/aw-transform/src/merge.rs index 81c81336..f2034dda 100644 --- a/aw-transform/src/merge.rs +++ b/aw-transform/src/merge.rs @@ -56,7 +56,7 @@ pub fn merge_events_by_keys(events: Vec, keys: Vec) -> Vec let summed_key = key_values.join("."); if merged_events_map.contains_key(&summed_key) { let merged_event = merged_events_map.get_mut(&summed_key).unwrap(); - merged_event.duration = merged_event.duration + event.duration; + merged_event.duration += event.duration; } else { let mut data = HashMap::new(); for key in &keys { diff --git a/aw-transform/src/sort.rs b/aw-transform/src/sort.rs index fddcd2ef..756d34d4 100644 --- a/aw-transform/src/sort.rs +++ b/aw-transform/src/sort.rs @@ -2,7 +2,7 @@ use aw_models::Event; /// Sort a list of events by timestamp pub fn sort_by_timestamp(mut events: Vec) -> Vec { - events.sort_by(|e1, e2| e1.timestamp.cmp(&e2.timestamp)); + events.sort_by_key(|e1| e1.timestamp); events } diff --git a/aw-webui b/aw-webui index f4b9379c..b01f9145 160000 --- a/aw-webui +++ b/aw-webui @@ -1 +1 @@ -Subproject commit f4b9379c4259f66f5e080bfd93ab8bc60dc7c2cb +Subproject commit b01f914534c74dbed2a8cfb5e237d454d9120bee