diff --git a/Cargo.lock b/Cargo.lock index 951cd269..93be1883 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,7 @@ dependencies = [ "serde", "serde_json", "zeroize", + "zstd", ] [[package]] @@ -417,6 +418,8 @@ version = "1.2.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -1556,6 +1559,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.3", + "libc", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -4017,3 +4030,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/aw-datastore/Cargo.toml b/aw-datastore/Cargo.toml index e00bed4f..1807afb8 100644 --- a/aw-datastore/Cargo.toml +++ b/aw-datastore/Cargo.toml @@ -14,6 +14,17 @@ bundled = ["rusqlite/bundled"] encryption = ["rusqlite/bundled-sqlcipher", "dep:zeroize"] # Like 'encryption' but also vendors OpenSSL (fully self-contained) encryption-vendored = ["rusqlite/bundled-sqlcipher-vendored-openssl", "dep:zeroize"] +# Enable transparent zstd compression of event data, using a dictionary trained +# on the database's own events (~47% smaller event JSON on real data). +# Opt-in: once rows are compressed, the database can only be read by a build with +# this feature enabled. +# +# One-time cost: the first startup after upgrading to db version 6 rebuilds the +# events table (TEXT -> BLOB column) and, once the database has enough rows, +# trains a dictionary and recompresses every existing row in a single +# transaction. Benchmark on a real ~545k-event database: +# https://github.com/ActivityWatch/aw-server-rust/pull/618#issuecomment-4698391405 +compression-zstd = ["dep:zstd"] legacy_import_tests = [] [dependencies] @@ -25,6 +36,7 @@ rusqlite = { version = "0.30", features = ["chrono", "serde_json"] } mpsc_requests = "0.3" log = "0.4" zeroize = { version = "1", optional = true, features = ["alloc"] } +zstd = { version = "0.13", optional = true } aw-models = { path = "../aw-models" } aw-transform = { path = "../aw-transform" } diff --git a/aw-datastore/src/compression.rs b/aw-datastore/src/compression.rs new file mode 100644 index 00000000..48599e14 --- /dev/null +++ b/aw-datastore/src/compression.rs @@ -0,0 +1,243 @@ +// Transparent compression of event data using zstd with a shared trained dictionary. +// +// ActivityWatch events are tiny (most < 128 bytes) and their redundancy is +// *across* rows: the same app names, JSON keys and window titles repeat +// thousands of times, while a single row holds little repetition for zstd to +// exploit on its own. To capture that cross-row redundancy we train one +// dictionary on a sample of the data, store it once in the database, and +// compress every row against it (the same idea as sqlite-zstd). On real data +// this reduces the stored event JSON by roughly 47%. +// +// Stored BLOB format: +// [zstd frame] -> dictionary-compressed +// -> stored uncompressed +// +// A row is stored uncompressed when compression would not make it smaller (or +// when no dictionary exists yet), so the worst case is never larger than the +// plain JSON. Zstd frames reliably start with the 4-byte magic number +// 0xFD2FB528. ActivityWatch JSON event data always starts with `{` (0x7B), so +// we can unambiguously infer whether a blob is compressed just by checking +// its first 4 bytes against the Zstd magic number. +// +// Tradeoff: once rows are compressed, the database can only be read by a build +// with this feature enabled (and with the stored dictionary intact). That is +// why the feature is opt-in at build time. + +/// Target size of the trained dictionary. 64 KiB was the sweet spot in +/// benchmarks (larger dictionaries started to hurt the ratio). +#[cfg(feature = "compression-zstd")] +const DICT_SIZE: usize = 64 * 1024; + +/// zstd compression level. 6 is a good balance for this data: the dictionary +/// provides essentially all of the savings, and higher levels add cost for only +/// ~1% extra reduction. +#[cfg(feature = "compression-zstd")] +const COMPRESSION_LEVEL: i32 = 6; + +/// Minimum number of events before it is worth training a dictionary. Below +/// this the database is small enough that the savings are negligible, and there +/// is too little data to train a good dictionary. +pub const MIN_EVENTS_TO_TRAIN: i64 = 2000; + +/// Holds the reusable zstd compressor/decompressor (with the loaded +/// dictionary, if any) for the lifetime of a datastore connection. Lives on the +/// single-threaded datastore worker, so the compressor/decompressor — which +/// need `&mut` per call — are wrapped in `RefCell` and reused across calls +/// rather than reallocated each time (allocating a fresh context per row is the +/// dominant cost when compressing hundreds of thousands of tiny events). +pub struct CompressionContext { + #[cfg(feature = "compression-zstd")] + dict: Option, +} + +#[cfg(feature = "compression-zstd")] +struct Codec { + compressor: std::cell::RefCell>, + decompressor: std::cell::RefCell>, +} + +impl CompressionContext { + /// A context with no dictionary: writes are stored uncompressed, reads still + /// transparently handle both uncompressed and (if a dictionary is later + /// available) compressed data. + pub fn empty() -> Self { + CompressionContext { + #[cfg(feature = "compression-zstd")] + dict: None, + } + } + + /// Build a context from raw trained-dictionary bytes. Falls back to an empty + /// (no-compression) context if the dictionary can't be loaded. + #[cfg(feature = "compression-zstd")] + pub fn from_dictionary(dict_bytes: &[u8]) -> Self { + // with_dictionary copies the dictionary into the (de)compression + // context, so the returned values own it and are 'static. + match ( + zstd::bulk::Compressor::with_dictionary(COMPRESSION_LEVEL, dict_bytes), + zstd::bulk::Decompressor::with_dictionary(dict_bytes), + ) { + (Ok(compressor), Ok(decompressor)) => CompressionContext { + dict: Some(Codec { + compressor: std::cell::RefCell::new(compressor), + decompressor: std::cell::RefCell::new(decompressor), + }), + }, + _ => { + warn!("Failed to load compression dictionary; storing events uncompressed"); + CompressionContext::empty() + } + } + } + + #[cfg(not(feature = "compression-zstd"))] + pub fn from_dictionary(_dict_bytes: &[u8]) -> Self { + CompressionContext {} + } + + /// Compress a JSON string into the stored blob representation. + /// + /// Never fails: if there is no dictionary, the feature is disabled, or + /// compression would not shrink the row, the raw UTF-8 bytes are returned. + pub fn compress(&self, json: &str) -> Vec { + #[cfg(feature = "compression-zstd")] + { + if let Some(codec) = &self.dict { + if let Ok(frame) = codec.compressor.borrow_mut().compress(json.as_bytes()) { + // Return raw zstd frame without custom prefixes + if frame.len() < json.len() { + return frame; + } + } + } + } + json.as_bytes().to_vec() + } + + /// Decompress a stored blob back into its JSON string. + pub fn decompress(&self, bytes: &[u8]) -> Result { + // Standard zstd frames always start with 0xFD2FB528 (LE bytes: 28 B5 2F FD) + if bytes.starts_with(&[0x28, 0xB5, 0x2F, 0xFD]) { + #[cfg(feature = "compression-zstd")] + { + // The zstd compressor embeds the uncompressed size by default + let orig_len = zstd::zstd_safe::get_frame_content_size(bytes) + .map_err(|e| format!("invalid zstd frame: {e}"))? + .ok_or_else(|| "zstd frame missing content size header".to_string())? + as usize; + + let codec = self + .dict + .as_ref() + .ok_or("event data is compressed but no dictionary is loaded")?; + let out = codec + .decompressor + .borrow_mut() + .decompress(bytes, orig_len) + .map_err(|e| format!("failed to decompress event data: {e}"))?; + return String::from_utf8(out) + .map_err(|e| format!("decompressed data is not valid UTF-8: {e}")); + } + #[cfg(not(feature = "compression-zstd"))] + { + return Err( + "event data is zstd-compressed but the compression-zstd feature is disabled" + .to_string(), + ); + } + } + + // Uncompressed JSON data + String::from_utf8(bytes.to_vec()).map_err(|e| format!("event data is not valid UTF-8: {e}")) + } +} + +/// Train a zstd dictionary from a set of JSON samples. Returns `None` if there +/// is not enough data to train a usable dictionary. +#[cfg(feature = "compression-zstd")] +pub fn train_dictionary(samples: &[&[u8]]) -> Option> { + if (samples.len() as i64) < MIN_EVENTS_TO_TRAIN { + return None; + } + match zstd::dict::from_samples(samples, DICT_SIZE) { + Ok(dict) => Some(dict), + Err(e) => { + warn!("Failed to train zstd dictionary: {e}"); + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uncompressed_roundtrip() { + // empty() context stores raw and reads it back + let ctx = CompressionContext::empty(); + let json = r#"{"app":"Firefox","title":"Hello"}"#; + let stored = ctx.compress(json); + assert_eq!(stored, json.as_bytes()); + assert_eq!(ctx.decompress(&stored).unwrap(), json); + } + + #[cfg(feature = "compression-zstd")] + #[test] + fn test_dictionary_roundtrip_and_size() { + // Build a realistic corpus with heavy cross-row redundancy. + let mut samples: Vec = Vec::new(); + for i in 0..5000 { + let app = ["Firefox", "Terminal", "Code", "Slack"][i % 4]; + samples.push(format!( + r#"{{"app":"{app}","title":"Some window title number {}"}}"#, + i % 50 + )); + } + let sample_refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); + let dict = train_dictionary(&sample_refs).expect("training should succeed"); + let ctx = CompressionContext::from_dictionary(&dict); + + let mut total_raw = 0usize; + let mut total_stored = 0usize; + for s in &samples { + let blob = ctx.compress(s); + // roundtrip is exact + assert_eq!(ctx.decompress(&blob).unwrap(), *s); + total_raw += s.len(); + total_stored += blob.len(); + } + // dictionary compression must save substantial space on this corpus + // (without a dictionary, per-row zstd saves ~0% on data this small) + assert!( + total_stored * 10 < total_raw * 6, + "expected >40% reduction, got {total_stored} vs {total_raw}" + ); + } + + #[cfg(feature = "compression-zstd")] + #[test] + fn test_incompressible_row_stored_raw() { + // A context with a dictionary still stores tiny/incompressible rows raw + // when compression would not help, so a row is never larger than raw. + let samples: Vec = (0..2000) + .map(|i| format!(r#"{{"app":"A","title":"{i}"}}"#)) + .collect(); + let refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); + let dict = train_dictionary(&refs).expect("training should succeed"); + let ctx = CompressionContext::from_dictionary(&dict); + + let tiny = r#"{"a":1}"#; + let blob = ctx.compress(tiny); + assert!(blob.len() <= tiny.len()); + assert_eq!(ctx.decompress(&blob).unwrap(), tiny); + } + + #[cfg(feature = "compression-zstd")] + #[test] + fn test_too_few_samples_no_dict() { + let samples: Vec = (0..10).map(|i| format!("{{\"x\":{i}}}")).collect(); + let refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); + assert!(train_dictionary(&refs).is_none()); + } +} diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index 20cd3634..652c6326 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -15,6 +15,7 @@ use aw_models::Event; use rusqlite::params; use rusqlite::types::ToSql; +use super::compression; use super::DatastoreError; fn _get_db_version(conn: &Connection) -> i32 { @@ -22,6 +23,23 @@ fn _get_db_version(conn: &Connection) -> i32 { .unwrap() } +/// Load the stored zstd compression dictionary, if one has been trained. +fn _load_dictionary(conn: &Connection) -> Option> { + conn.query_row( + "SELECT dict FROM compression_dict WHERE id = 0", + [], + |row| row.get::<_, Vec>(0), + ) + .ok() +} + +/// Count of events in the database (used to decide whether to train a dictionary). +#[cfg(feature = "compression-zstd")] +fn _event_count(conn: &Connection) -> i64 { + conn.query_row("SELECT count(*) FROM events", [], |row| row.get(0)) + .unwrap_or(0) +} + /* * ### Database version changelog ### * 0: Uninitialized database @@ -30,8 +48,9 @@ fn _get_db_version(conn: &Connection) -> i32 { * 3: see: https://github.com/ActivityWatch/aw-server-rust/pull/52 * 4: Added 'key_value' table for storing key - value pairs * 5: Replaced single-column events indexes with a composite index + * 6: Enable zstd compression for event data column */ -static NEWEST_DB_VERSION: i32 = 5; +static NEWEST_DB_VERSION: i32 = 6; fn _create_tables(conn: &Connection, version: i32) -> bool { let mut first_init = false; @@ -57,6 +76,10 @@ fn _create_tables(conn: &Connection, version: i32) -> bool { _migrate_v4_to_v5(conn); } + if version < 6 { + _migrate_v5_to_v6(conn); + } + first_init } @@ -207,10 +230,64 @@ fn _migrate_v4_to_v5(conn: &Connection) { .expect("Failed to run v5 migration transaction"); } +fn _migrate_v5_to_v6(conn: &Connection) { + info!("Upgrading database to v6, converting event data column to BLOB for zstd compression"); + + // The data column becomes a BLOB so compressed binary can be stored without + // any text-encoding overhead. The actual dictionary training and + // compression of existing rows happens once at startup in + // `ensure_compression`, which is shared with the path that compresses + // databases that grow large enough after a fresh install. + // + // CAST(data AS BLOB) reinterprets each TEXT value as its raw UTF-8 bytes, + // converting the whole column in a single statement. The table is rebuilt + // (rather than altered in place) so the column's declared type — and thus + // the type returned to readers — is BLOB. + // + // The new table's foreign key uses ON DELETE CASCADE so deleting a bucket + // removes its events automatically (foreign keys are enforced on the + // connection, see work_loop). Older databases can contain orphan events + // whose bucket was already deleted; those are dropped here (the WHERE + // clause copies only events with an existing bucket) rather than carried + // forward, which also lets the copy satisfy the foreign key. + conn.execute_batch( + " + BEGIN EXCLUSIVE TRANSACTION; + CREATE TABLE events_v6 ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucketrow INTEGER NOT NULL, + starttime INTEGER NOT NULL, + duration INTEGER NOT NULL, + data BLOB NOT NULL, + FOREIGN KEY (bucketrow) REFERENCES buckets(id) ON DELETE CASCADE + ); + INSERT INTO events_v6 (id, bucketrow, starttime, duration, data) + SELECT id, bucketrow, starttime, endtime - starttime, CAST(data AS BLOB) + FROM events + WHERE bucketrow IN (SELECT id FROM buckets); + DROP INDEX IF EXISTS events_bucketrow_starttime_endtime_index; + DROP TABLE events; + ALTER TABLE events_v6 RENAME TO events; + CREATE INDEX events_bucketrow_starttime_index + ON events(bucketrow, starttime DESC); + CREATE TABLE IF NOT EXISTS compression_dict ( + id INTEGER PRIMARY KEY CHECK (id = 0), + dict BLOB NOT NULL + ); + PRAGMA user_version = 6; + COMMIT; + ", + ) + .expect("Failed to complete v6 migration transaction"); + + info!("Database upgrade to v6 complete"); +} + pub struct DatastoreInstance { buckets_cache: HashMap, first_init: bool, pub db_version: i32, + compression: compression::CompressionContext, } impl DatastoreInstance { @@ -235,21 +312,140 @@ impl DatastoreInstance { ))); } + // Re-read the version: migrations bump user_version, and + // ensure_compression keys off the post-migration schema (the + // compression_dict table only exists from v6). Using the pre-migration + // value here would skip compression setup on the first startup after an + // upgrade. + let db_version = _get_db_version(conn); + let mut ds = DatastoreInstance { buckets_cache: HashMap::new(), first_init, db_version, + compression: compression::CompressionContext::empty(), }; + ds.ensure_compression(conn, migrate_enabled); ds.get_stored_buckets(conn)?; Ok(ds) } + /// Load the compression dictionary, training and applying one if the + /// database is large enough and doesn't have one yet. + /// + /// Runs once at startup. Loading a dictionary always happens (so compressed + /// rows can be read); training/back-filling only happens when migration is + /// enabled (we must not write to a read-only/migration-disabled datastore). + fn ensure_compression(&mut self, conn: &Connection, migrate_enabled: bool) { + // The compression_dict table only exists from v6 onward. + if self.db_version < 6 { + return; + } + if let Some(dict) = _load_dictionary(conn) { + self.compression = compression::CompressionContext::from_dictionary(&dict); + return; + } + #[cfg(feature = "compression-zstd")] + { + if migrate_enabled && _event_count(conn) >= compression::MIN_EVENTS_TO_TRAIN { + if let Err(e) = self.train_and_backfill(conn) { + warn!("Failed to set up event-data compression: {e:?}"); + } + } + } + #[cfg(not(feature = "compression-zstd"))] + { + let _ = migrate_enabled; + } + } + + /// Train a dictionary from existing events, store it, and recompress all + /// rows against it. Only called once, when a database first has enough data. + #[cfg(feature = "compression-zstd")] + fn train_and_backfill(&mut self, conn: &Connection) -> Result<(), DatastoreError> { + // Load every row's data (raw, uncompressed at this point). + let mut stmt = conn + .prepare("SELECT id, data FROM events") + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + let rows: Vec<(i64, Vec)> = stmt + .query_map([], |row| Ok((row.get(0)?, row.get(1)?))) + .map_err(|e| DatastoreError::InternalError(e.to_string()))? + .collect::, _>>() + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + drop(stmt); + + // Decompress (a no-op for already-raw rows) to recover the JSON to train on. + let jsons: Vec = rows + .iter() + .filter_map(|(_, bytes)| self.compression.decompress(bytes).ok()) + .collect(); + let sample_refs: Vec<&[u8]> = jsons.iter().map(|s| s.as_bytes()).collect(); + + let dict = match compression::train_dictionary(&sample_refs) { + Some(dict) => dict, + None => return Ok(()), // not enough usable data; leave uncompressed + }; + + info!( + "Trained {}-byte compression dictionary, recompressing {} events", + dict.len(), + rows.len() + ); + + self.compression = compression::CompressionContext::from_dictionary(&dict); + + // Store the dictionary and recompress every row in a single transaction. + // Without it, each statement would auto-commit and, with + // synchronous=FULL, fsync — turning a one-time backfill into hundreds of + // thousands of disk syncs. + conn.execute_batch("BEGIN IMMEDIATE TRANSACTION;") + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + + let result = (|| -> Result<(), DatastoreError> { + conn.execute( + "INSERT OR REPLACE INTO compression_dict (id, dict) VALUES (0, ?1)", + params![dict], + ) + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + + let mut update = conn + .prepare("UPDATE events SET data = ?1 WHERE id = ?2") + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + for (id, bytes) in &rows { + let json = match self.compression.decompress(bytes) { + Ok(json) => json, + Err(_) => continue, + }; + let compressed = self.compression.compress(&json); + update + .execute(params![compressed, id]) + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + } + Ok(()) + })(); + + match result { + Ok(()) => { + conn.execute_batch("COMMIT;") + .map_err(|e| DatastoreError::InternalError(e.to_string()))?; + Ok(()) + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK;"); + // Revert to no compression so reads don't expect compressed rows + // that were rolled back. + self.compression = compression::CompressionContext::empty(); + Err(e) + } + } + } + fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> { let mut stmt = match conn.prepare_cached( " SELECT buckets.id, buckets.name, buckets.type, buckets.client, buckets.hostname, buckets.created, - min(events.starttime), max(events.endtime), + min(events.starttime), max(events.starttime + events.duration), buckets.data FROM buckets LEFT OUTER JOIN events ON buckets.id = events.bucketrow @@ -470,7 +666,7 @@ impl DatastoreInstance { let mut stmt = match conn.prepare_cached( " - INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data) + INSERT OR REPLACE INTO events(bucketrow, id, starttime, duration, data) VALUES (?1, ?2, ?3, ?4, ?5)", ) { Ok(stmt) => stmt, @@ -490,13 +686,13 @@ impl DatastoreInstance { )) } }; - let endtime_nanos = starttime_nanos + duration_nanos; - let data = serde_json::to_string(&event.data).unwrap(); + let json_data = serde_json::to_string(&event.data).unwrap(); + let data = self.compression.compress(&json_data); let res = stmt.execute([ &bucket.bid.unwrap(), &event.id as &dyn ToSql, &starttime_nanos, - &endtime_nanos, + &duration_nanos, &data as &dyn ToSql, ]); match res { @@ -598,7 +794,7 @@ impl DatastoreInstance { let mut stmt = match conn.prepare_cached( " UPDATE events - SET starttime = ?2, endtime = ?3, data = ?4 + SET starttime = ?2, duration = ?3, data = ?4 WHERE bucketrow = ?1 AND id = ?5 ", ) { @@ -618,12 +814,12 @@ impl DatastoreInstance { )) } }; - let endtime_nanos = starttime_nanos + duration_nanos; - let data = serde_json::to_string(&event.data).unwrap(); + let json_data = serde_json::to_string(&event.data).unwrap(); + let data = self.compression.compress(&json_data); match stmt.execute([ &bucket.bid.unwrap(), &starttime_nanos, - &endtime_nanos, + &duration_nanos, &data as &dyn ToSql, &event_id, ]) { @@ -705,7 +901,7 @@ impl DatastoreInstance { let mut stmt = match conn.prepare_cached( " - SELECT id, starttime, endtime, data + SELECT id, starttime, duration, data FROM events WHERE bucketrow = ?1 AND id = ?2 @@ -721,17 +917,32 @@ impl DatastoreInstance { }; // TODO: Refactor to share row-parsing logic with get_events + let compression = &self.compression; let row = match stmt.query_row([&bucket.bid.unwrap(), &event_id], |row| { let id = row.get(0)?; let starttime_ns: i64 = row.get(1)?; - let endtime_ns: i64 = row.get(2)?; - let data_str: String = row.get(3)?; + let duration_ns: i64 = row.get(2)?; + let data_bytes: Vec = row.get(3)?; let time_seconds: i64 = starttime_ns / 1_000_000_000; let time_subnanos: u32 = (starttime_ns % 1_000_000_000) as u32; - let duration_ns = endtime_ns - starttime_ns; + + // On decompression or JSON-parse failure, surface a row error rather + // than fabricating a string and unwrapping (which would panic the + // worker thread, e.g. when a compressed row is read without the + // dictionary or feature). + let decompressed_data_str = compression.decompress(&data_bytes).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(3, rusqlite::types::Type::Blob, e.into()) + })?; + let data: serde_json::map::Map = - serde_json::from_str(&data_str).unwrap(); + serde_json::from_str(&decompressed_data_str).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 3, + rusqlite::types::Type::Text, + Box::new(e), + ) + })?; Ok(Event { id: Some(id), @@ -781,12 +992,14 @@ impl DatastoreInstance { None => -1, }; + let compression = &self.compression; + let mut stmt = match conn.prepare_cached( " - SELECT id, starttime, endtime, data + SELECT id, starttime, duration, data FROM events WHERE bucketrow = ?1 - AND endtime >= ?2 + AND (starttime + duration) >= ?2 AND starttime <= ?3 ORDER BY starttime DESC LIMIT ?4 @@ -810,8 +1023,9 @@ impl DatastoreInstance { |row| { let id = row.get(0)?; let mut starttime_ns: i64 = row.get(1)?; - let mut endtime_ns: i64 = row.get(2)?; - let data_str: String = row.get(3)?; + let duration_ns: i64 = row.get(2)?; + let mut endtime_ns = starttime_ns + duration_ns; + let data_bytes: Vec = row.get(3)?; if clip_to_query_range { if starttime_ns < starttime_filter_ns { @@ -825,8 +1039,27 @@ impl DatastoreInstance { let time_seconds: i64 = starttime_ns / 1_000_000_000; let time_subnanos: u32 = (starttime_ns % 1_000_000_000) as u32; + + // On decompression or JSON-parse failure, surface a row error so + // the loop below skips this row with a warning, rather than + // fabricating a string and unwrapping (which would panic the + // worker thread). + let decompressed_data_str = compression.decompress(&data_bytes).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 3, + rusqlite::types::Type::Blob, + e.into(), + ) + })?; + let data: serde_json::map::Map = - serde_json::from_str(&data_str).unwrap(); + serde_json::from_str(&decompressed_data_str).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 3, + rusqlite::types::Type::Text, + Box::new(e), + ) + })?; Ok(Event { id: Some(id), @@ -908,7 +1141,7 @@ impl DatastoreInstance { " SELECT count(*) FROM events WHERE bucketrow = ?1 - AND endtime >= ?2 + AND (starttime + duration) >= ?2 AND starttime <= ?3", ) { Ok(stmt) => stmt, diff --git a/aw-datastore/src/lib.rs b/aw-datastore/src/lib.rs index 37df34ea..e857bd0a 100644 --- a/aw-datastore/src/lib.rs +++ b/aw-datastore/src/lib.rs @@ -17,6 +17,7 @@ macro_rules! json_map { }}; } +mod compression; mod datastore; mod legacy_import; mod privacy_filter; diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index e9481da1..9edd1b77 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -156,6 +156,13 @@ impl DatastoreWorker { conn.pragma_update(None, "synchronous", "FULL") .expect("Failed to set synchronous=FULL"); + // Enforce foreign keys so deleting a bucket cascades to its events + // (events.bucketrow REFERENCES buckets(id) ON DELETE CASCADE). SQLite + // defaults this off per connection; set it before migrations run and + // outside any transaction so it takes effect. + conn.pragma_update(None, "foreign_keys", "ON") + .expect("Failed to enable foreign_keys"); + let mut ds = DatastoreInstance::new(&conn, true).unwrap(); // Ensure legacy import diff --git a/aw-datastore/tests/datastore.rs b/aw-datastore/tests/datastore.rs index fe353e2a..ec4eb269 100644 --- a/aw-datastore/tests/datastore.rs +++ b/aw-datastore/tests/datastore.rs @@ -538,7 +538,8 @@ mod datastore_tests { let version: i32 = conn .pragma_query_value(None, "user_version", |row| row.get(0)) .unwrap(); - assert_eq!(version, 5); + // Database should be upgraded to v6 (compression) automatically + assert_eq!(version, 6); let old_indexes: i64 = conn .query_row( "SELECT count(*) FROM sqlite_master WHERE type = 'index' AND name IN @@ -551,7 +552,7 @@ mod datastore_tests { let new_index: i64 = conn .query_row( "SELECT count(*) FROM sqlite_master WHERE type = 'index' - AND name = 'events_bucketrow_starttime_endtime_index'", + AND name = 'events_bucketrow_starttime_index'", [], |row| row.get(0), ) @@ -667,4 +668,259 @@ mod datastore_tests { let _ = fs::remove_file(&db_path); } + + /// With the compression feature, a database that grows past the training + /// threshold should get a dictionary on the next open, recompress its rows, + /// and still return every event's data intact. + #[test] + #[cfg(feature = "compression-zstd")] + fn test_compression_dictionary_roundtrip() { + use rusqlite::Connection; + + let mut db_path = get_cache_dir().unwrap(); + db_path.push("datastore-unittest-compression.db"); + let db_path_str = db_path.to_str().unwrap().to_string(); + if db_path.exists() { + std::fs::remove_file(&db_path).unwrap(); + } + + let bucket = test_bucket(); + // Realistic, highly-repetitive event data, well above MIN_EVENTS_TO_TRAIN. + let n = 4000; + let make_event = |i: usize| Event { + id: None, + timestamp: Utc::now() + Duration::milliseconds(i as i64), + duration: Duration::seconds(1), + data: json_map! { + "app": json!(["Firefox", "Terminal", "Code", "Slack"][i % 4]), + "title": json!(format!("Working on window {}", i % 100)) + }, + }; + + // Session 1: no dictionary exists yet (DB started empty), rows stored raw. + { + let ds = Datastore::new(db_path_str.clone(), false); + ds.create_bucket(&bucket).unwrap(); + let events: Vec = (0..n).map(make_event).collect(); + ds.insert_events(&bucket.id, &events).unwrap(); + ds.force_commit().unwrap(); + ds.close(); + } + + // No dictionary should have been trained during session 1. + { + let conn = Connection::open(&db_path).unwrap(); + let dict_rows: i64 = conn + .query_row("SELECT count(*) FROM compression_dict", [], |r| r.get(0)) + .unwrap(); + assert_eq!(dict_rows, 0, "no dictionary expected before reopen"); + } + + // Session 2: reopening trains a dictionary and recompresses existing rows. + { + let ds = Datastore::new(db_path_str.clone(), false); + let events = ds.get_events(&bucket.id, None, None, None).unwrap(); + assert_eq!(events.len(), n); + // Data must survive the train + recompress roundtrip exactly. + for e in &events { + let i = e.data["title"] + .as_str() + .unwrap() + .strip_prefix("Working on window ") + .unwrap() + .parse::() + .unwrap(); + assert!(i < 100); + assert!(e.data["app"].is_string()); + } + // Inserting more after the dictionary exists must also roundtrip. + ds.insert_events(&bucket.id, &[make_event(99999)]).unwrap(); + ds.force_commit().unwrap(); + ds.close(); + } + + // A dictionary now exists and at least some rows are stored compressed. + { + let conn = Connection::open(&db_path).unwrap(); + let dict_rows: i64 = conn + .query_row("SELECT count(*) FROM compression_dict", [], |r| r.get(0)) + .unwrap(); + assert_eq!(dict_rows, 1, "dictionary should exist after reopen"); + // 0x28 0xB5 0x2F 0xFD is the zstd magic number (Little Endian). + let compressed: i64 = conn + .query_row( + "SELECT count(*) FROM events WHERE hex(substr(data, 1, 4)) = '28B52FFD'", + [], + |r| r.get(0), + ) + .unwrap(); + assert!(compressed > 0, "expected some rows to be stored compressed"); + } + + // Best-effort cleanup: on Windows the worker may still hold the file + // handle briefly after close(), which would make a hard unwrap flaky. + let _ = std::fs::remove_file(&db_path); + } + + /// Upgrading a populated pre-v6 database should train and apply compression + /// on the very first open (not only after a second restart). Regression test + /// for the db_version being captured before migrations ran. + #[test] + #[cfg(feature = "compression-zstd")] + fn test_migration_v5_to_v6_trains_on_first_open() { + let mut db_path = get_cache_dir().unwrap(); + db_path.push("datastore-unittest-migration-v6.db"); + let db_path_str = db_path.to_str().unwrap().to_string(); + if db_path.exists() { + std::fs::remove_file(&db_path).unwrap(); + } + + // Build a v5 database with enough repetitive events to train a dictionary. + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + conn.execute_batch( + r#" + CREATE TABLE buckets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, type TEXT NOT NULL, client TEXT NOT NULL, + hostname TEXT NOT NULL, created TEXT NOT NULL, + data TEXT NOT NULL DEFAULT '{}' + ); + CREATE INDEX bucket_id_index ON buckets(id); + CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucketrow INTEGER NOT NULL, starttime INTEGER NOT NULL, + endtime INTEGER NOT NULL, data TEXT NOT NULL, + FOREIGN KEY (bucketrow) REFERENCES buckets(id) + ); + CREATE INDEX events_bucketrow_starttime_endtime_index + ON events(bucketrow, starttime DESC, endtime); + CREATE TABLE key_value (key TEXT PRIMARY KEY, value TEXT, last_modified NUMBER NOT NULL); + INSERT INTO buckets (name, type, client, hostname, created, data) + VALUES ('testid', 'testtype', 'testclient', 'testhost', + '2024-01-01T00:00:00+00:00', '{}'); + WITH RECURSIVE seq(x) AS ( + SELECT 1 UNION ALL SELECT x + 1 FROM seq WHERE x < 3000 + ) + INSERT INTO events (bucketrow, starttime, endtime, data) + SELECT 1, x * 1000000000, x * 1000000000 + 1000000000, + '{"app":"App' || (x % 4) || '","title":"Window title ' || (x % 50) || '"}' + FROM seq; + PRAGMA user_version = 5; + "#, + ) + .unwrap(); + } + + // First open migrates to v6 AND sets up compression in the same session. + { + let ds = Datastore::new(db_path_str, false); + let events = ds.get_events("testid", None, None, None).unwrap(); + assert_eq!(events.len(), 3000); + // newest-first ordering: last inserted was x=3000 -> App0 + assert_eq!(events[0].data["app"], json!("App0")); + ds.close(); + } + + // A dictionary was trained and some rows compressed on that first open. + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + let version: i32 = conn + .pragma_query_value(None, "user_version", |row| row.get(0)) + .unwrap(); + assert_eq!(version, 6); + let dict_rows: i64 = conn + .query_row("SELECT count(*) FROM compression_dict", [], |r| r.get(0)) + .unwrap(); + assert_eq!( + dict_rows, 1, + "dictionary should be trained on the first open after upgrade" + ); + let compressed: i64 = conn + .query_row( + "SELECT count(*) FROM events WHERE hex(substr(data, 1, 4)) = '28B52FFD'", + [], + |r| r.get(0), + ) + .unwrap(); + assert!( + compressed > 0, + "expected rows to be compressed after first open" + ); + } + + // Best-effort cleanup: on Windows the worker may still hold the file + // handle briefly after close(), which would make a hard unwrap flaky. + let _ = std::fs::remove_file(&db_path); + } + + /// A row that looks compressed (zstd magic) but can't be decompressed — e.g. + /// the dictionary is missing or the feature is disabled — must be skipped + /// with a warning, never panic the worker. Regression test for the previous + /// unwrap() on the decompression-failure path. + #[test] + fn test_unreadable_compressed_row_does_not_panic() { + let mut db_path = get_cache_dir().unwrap(); + db_path.push("datastore-unittest-badrow.db"); + let db_path_str = db_path.to_str().unwrap().to_string(); + if db_path.exists() { + std::fs::remove_file(&db_path).unwrap(); + } + + // Create a v6 database with one valid event. + { + let ds = Datastore::new(db_path_str.clone(), false); + ds.create_bucket(&test_bucket()).unwrap(); + ds.insert_events( + "testid", + &[Event { + id: None, + timestamp: Utc::now(), + duration: Duration::seconds(1), + data: json_map! {"app": json!("ok")}, + }], + ) + .unwrap(); + ds.force_commit().unwrap(); + ds.close(); + } + + // Inject a row whose data starts with the zstd magic number but is not a + // valid frame, and for which no dictionary exists. + { + let conn = rusqlite::Connection::open(&db_path).unwrap(); + let bid: i64 = conn + .query_row("SELECT id FROM buckets WHERE name = 'testid'", [], |r| { + r.get(0) + }) + .unwrap(); + conn.execute( + "INSERT INTO events (bucketrow, starttime, duration, data) VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![ + bid, + 5_000_000_000i64, + 1_000_000_000i64, + vec![0x28u8, 0xB5, 0x2F, 0xFD, 0xDE, 0xAD] + ], + ) + .unwrap(); + } + + // Reading must not panic: the bad row is skipped, the good row returned. + { + let ds = Datastore::new(db_path_str, false); + let events = ds.get_events("testid", None, None, None).unwrap(); + assert_eq!( + events.len(), + 1, + "the unreadable row should be skipped, the valid one kept" + ); + assert_eq!(events[0].data["app"], json!("ok")); + ds.close(); + } + + // Best-effort cleanup: on Windows the worker may still hold the file + // handle briefly after close(), which would make a hard unwrap flaky. + let _ = std::fs::remove_file(&db_path); + } } diff --git a/aw-server/Cargo.toml b/aw-server/Cargo.toml index 814a6215..4674e7ea 100644 --- a/aw-server/Cargo.toml +++ b/aw-server/Cargo.toml @@ -22,6 +22,8 @@ bundled = ["aw-datastore/bundled"] encryption = ["aw-datastore/encryption"] # Enable SQLCipher encryption with vendored OpenSSL (fully self-contained) encryption-vendored = ["aw-datastore/encryption-vendored"] +# Enable zstd compression for events data +compression-zstd = ["aw-datastore/compression-zstd"] [dependencies] rocket = { version = "0.5.0", features = ["json"] }