-
-
Notifications
You must be signed in to change notification settings - Fork 92
Fix cargo warnings #619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
0xbrayo
wants to merge
4
commits into
ActivityWatch:master
Choose a base branch
from
0xbrayo:fix-cargo-warnings
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Fix cargo warnings #619
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
e5df56a
fix(deps): upgrade reqwest 0.11 -> 0.12 to fix rustls-webpki vuln
0xbrayo fb4d81c
feat(datastore): transparent zstd compression of event data (#493)
0xbrayo e7a9ea0
fix: expose compression-zstd in aw-server and remove dead code
0xbrayo 3c5a5aa
fix: resolve cargo build warnings
0xbrayo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,248 @@ | ||
| // Transparent compression of event data using zstd with a shared trained dictionary. | ||
| // | ||
| // ActivityWatch events are tiny (most < 128 bytes) and their redundancy is | ||
| // *across* rows: the same app names, JSON keys and window titles repeat | ||
| // thousands of times, while a single row holds little repetition for zstd to | ||
| // exploit on its own. To capture that cross-row redundancy we train one | ||
| // dictionary on a sample of the data, store it once in the database, and | ||
| // compress every row against it (the same idea as sqlite-zstd). On real data | ||
| // this reduces the stored event JSON by roughly 47%. | ||
| // | ||
| // Stored BLOB format: | ||
| // [0xCC][u32 LE original_len][zstd frame] -> dictionary-compressed | ||
| // <raw UTF-8 JSON bytes> -> stored uncompressed | ||
| // | ||
| // A row is stored uncompressed when compression would not make it smaller (or | ||
| // when no dictionary exists yet), so the worst case is never larger than the | ||
| // plain JSON. JSON event data always starts with '{' (0x7B), so the 0xCC marker | ||
| // is unambiguous. | ||
| // | ||
| // Tradeoff: once rows are compressed, the database can only be read by a build | ||
| // with this feature enabled (and with the stored dictionary intact). That is | ||
| // why the feature is opt-in at build time. | ||
|
|
||
| /// Marker byte prefixed to dictionary-compressed blobs. | ||
| const COMPRESSION_MARKER: u8 = 0xCC; | ||
|
|
||
| /// Target size of the trained dictionary. 64 KiB was the sweet spot in | ||
| /// benchmarks (larger dictionaries started to hurt the ratio). | ||
| #[cfg(feature = "compression-zstd")] | ||
| const DICT_SIZE: usize = 64 * 1024; | ||
|
|
||
| /// zstd compression level. 6 is a good balance for this data: the dictionary | ||
| /// provides essentially all of the savings, and higher levels add cost for only | ||
| /// ~1% extra reduction. | ||
| #[cfg(feature = "compression-zstd")] | ||
| const COMPRESSION_LEVEL: i32 = 6; | ||
|
|
||
| /// Minimum number of events before it is worth training a dictionary. Below | ||
| /// this the database is small enough that the savings are negligible, and there | ||
| /// is too little data to train a good dictionary. | ||
| #[cfg(feature = "compression-zstd")] | ||
| pub const MIN_EVENTS_TO_TRAIN: i64 = 2000; | ||
|
|
||
| /// Holds the reusable zstd compressor/decompressor (with the loaded | ||
| /// dictionary, if any) for the lifetime of a datastore connection. Lives on the | ||
| /// single-threaded datastore worker, so the compressor/decompressor — which | ||
| /// need `&mut` per call — are wrapped in `RefCell` and reused across calls | ||
| /// rather than reallocated each time (allocating a fresh context per row is the | ||
| /// dominant cost when compressing hundreds of thousands of tiny events). | ||
| pub struct CompressionContext { | ||
| #[cfg(feature = "compression-zstd")] | ||
| dict: Option<Codec>, | ||
| } | ||
|
|
||
| #[cfg(feature = "compression-zstd")] | ||
| struct Codec { | ||
| compressor: std::cell::RefCell<zstd::bulk::Compressor<'static>>, | ||
| decompressor: std::cell::RefCell<zstd::bulk::Decompressor<'static>>, | ||
| } | ||
|
|
||
| impl CompressionContext { | ||
| /// A context with no dictionary: writes are stored uncompressed, reads still | ||
| /// transparently handle both uncompressed and (if a dictionary is later | ||
| /// available) compressed data. | ||
| pub fn empty() -> Self { | ||
| CompressionContext { | ||
| #[cfg(feature = "compression-zstd")] | ||
| dict: None, | ||
| } | ||
| } | ||
|
|
||
| /// Build a context from raw trained-dictionary bytes. Falls back to an empty | ||
| /// (no-compression) context if the dictionary can't be loaded. | ||
| #[cfg(feature = "compression-zstd")] | ||
| pub fn from_dictionary(dict_bytes: &[u8]) -> Self { | ||
| // with_dictionary copies the dictionary into the (de)compression | ||
| // context, so the returned values own it and are 'static. | ||
| match ( | ||
| zstd::bulk::Compressor::with_dictionary(COMPRESSION_LEVEL, dict_bytes), | ||
| zstd::bulk::Decompressor::with_dictionary(dict_bytes), | ||
| ) { | ||
| (Ok(compressor), Ok(decompressor)) => CompressionContext { | ||
| dict: Some(Codec { | ||
| compressor: std::cell::RefCell::new(compressor), | ||
| decompressor: std::cell::RefCell::new(decompressor), | ||
| }), | ||
| }, | ||
| _ => { | ||
| warn!("Failed to load compression dictionary; storing events uncompressed"); | ||
| CompressionContext::empty() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(not(feature = "compression-zstd"))] | ||
| pub fn from_dictionary(_dict_bytes: &[u8]) -> Self { | ||
| CompressionContext {} | ||
| } | ||
|
|
||
| /// Compress a JSON string into the stored blob representation. | ||
| /// | ||
| /// Never fails: if there is no dictionary, the feature is disabled, or | ||
| /// compression would not shrink the row, the raw UTF-8 bytes are returned. | ||
| pub fn compress(&self, json: &str) -> Vec<u8> { | ||
| #[cfg(feature = "compression-zstd")] | ||
| { | ||
| if let Some(codec) = &self.dict { | ||
| if let Ok(frame) = codec.compressor.borrow_mut().compress(json.as_bytes()) { | ||
| // [marker][u32 LE original length][zstd frame] | ||
| if 5 + frame.len() < json.len() { | ||
| let mut blob = Vec::with_capacity(5 + frame.len()); | ||
| blob.push(COMPRESSION_MARKER); | ||
| blob.extend_from_slice(&(json.len() as u32).to_le_bytes()); | ||
| blob.extend_from_slice(&frame); | ||
| return blob; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| json.as_bytes().to_vec() | ||
| } | ||
|
|
||
| /// Decompress a stored blob back into its JSON string. | ||
| pub fn decompress(&self, bytes: &[u8]) -> Result<String, String> { | ||
| if bytes.first() == Some(&COMPRESSION_MARKER) { | ||
| #[cfg(feature = "compression-zstd")] | ||
| { | ||
| if bytes.len() < 5 { | ||
| return Err("compressed blob too short to contain a length header".to_string()); | ||
| } | ||
| let orig_len = | ||
| u32::from_le_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]) as usize; | ||
| let frame = &bytes[5..]; | ||
| let codec = self | ||
| .dict | ||
| .as_ref() | ||
| .ok_or("event data is compressed but no dictionary is loaded")?; | ||
| let out = codec | ||
| .decompressor | ||
| .borrow_mut() | ||
| .decompress(frame, orig_len) | ||
| .map_err(|e| format!("failed to decompress event data: {e}"))?; | ||
| String::from_utf8(out) | ||
| .map_err(|e| format!("decompressed data is not valid UTF-8: {e}")) | ||
| } | ||
| #[cfg(not(feature = "compression-zstd"))] | ||
| { | ||
| Err( | ||
| "event data is zstd-compressed but the compression-zstd feature is disabled" | ||
| .to_string(), | ||
| ) | ||
| } | ||
| } else { | ||
| String::from_utf8(bytes.to_vec()) | ||
| .map_err(|e| format!("event data is not valid UTF-8: {e}")) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Train a zstd dictionary from a set of JSON samples. Returns `None` if there | ||
| /// is not enough data to train a usable dictionary. | ||
| #[cfg(feature = "compression-zstd")] | ||
| pub fn train_dictionary(samples: &[&[u8]]) -> Option<Vec<u8>> { | ||
| if (samples.len() as i64) < MIN_EVENTS_TO_TRAIN { | ||
| return None; | ||
| } | ||
| match zstd::dict::from_samples(samples, DICT_SIZE) { | ||
| Ok(dict) => Some(dict), | ||
| Err(e) => { | ||
| warn!("Failed to train zstd dictionary: {e}"); | ||
| None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_uncompressed_roundtrip() { | ||
| // empty() context stores raw and reads it back | ||
| let ctx = CompressionContext::empty(); | ||
| let json = r#"{"app":"Firefox","title":"Hello"}"#; | ||
| let stored = ctx.compress(json); | ||
| assert_eq!(stored, json.as_bytes()); | ||
| assert_eq!(ctx.decompress(&stored).unwrap(), json); | ||
| } | ||
|
|
||
| #[cfg(feature = "compression-zstd")] | ||
| #[test] | ||
| fn test_dictionary_roundtrip_and_size() { | ||
| // Build a realistic corpus with heavy cross-row redundancy. | ||
| let mut samples: Vec<String> = Vec::new(); | ||
| for i in 0..5000 { | ||
| let app = ["Firefox", "Terminal", "Code", "Slack"][i % 4]; | ||
| samples.push(format!( | ||
| r#"{{"app":"{app}","title":"Some window title number {}"}}"#, | ||
| i % 50 | ||
| )); | ||
| } | ||
| let sample_refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); | ||
| let dict = train_dictionary(&sample_refs).expect("training should succeed"); | ||
| let ctx = CompressionContext::from_dictionary(&dict); | ||
|
|
||
| let mut total_raw = 0usize; | ||
| let mut total_stored = 0usize; | ||
| for s in &samples { | ||
| let blob = ctx.compress(s); | ||
| // roundtrip is exact | ||
| assert_eq!(ctx.decompress(&blob).unwrap(), *s); | ||
| total_raw += s.len(); | ||
| total_stored += blob.len(); | ||
| } | ||
| // dictionary compression must save substantial space on this corpus | ||
| // (without a dictionary, per-row zstd saves ~0% on data this small) | ||
| assert!( | ||
| total_stored * 10 < total_raw * 6, | ||
| "expected >40% reduction, got {total_stored} vs {total_raw}" | ||
| ); | ||
| } | ||
|
|
||
| #[cfg(feature = "compression-zstd")] | ||
| #[test] | ||
| fn test_incompressible_row_stored_raw() { | ||
| // A context with a dictionary still stores tiny/incompressible rows raw | ||
| // when compression would not help, so a row is never larger than raw. | ||
| let samples: Vec<String> = (0..2000) | ||
| .map(|i| format!(r#"{{"app":"A","title":"{i}"}}"#)) | ||
| .collect(); | ||
| let refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); | ||
| let dict = train_dictionary(&refs).expect("training should succeed"); | ||
| let ctx = CompressionContext::from_dictionary(&dict); | ||
|
|
||
| let tiny = r#"{"a":1}"#; | ||
| let blob = ctx.compress(tiny); | ||
| assert!(blob.len() <= tiny.len()); | ||
| assert_eq!(ctx.decompress(&blob).unwrap(), tiny); | ||
| } | ||
|
|
||
| #[cfg(feature = "compression-zstd")] | ||
| #[test] | ||
| fn test_too_few_samples_no_dict() { | ||
| let samples: Vec<String> = (0..10).map(|i| format!("{{\"x\":{i}}}")).collect(); | ||
| let refs: Vec<&[u8]> = samples.iter().map(|s| s.as_bytes()).collect(); | ||
| assert!(train_dictionary(&refs).is_none()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
json.len()is cast tou32without bounds checking. If a JSON string exceeds 4 GiB the storedorig_lensilently truncates, producing an incorrect header that causes afailed to decompress event dataerror on the next read. Using a checked cast is safer.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!