diff --git a/lib/ddsketch-agent/src/lib.rs b/lib/ddsketch-agent/src/lib.rs index d3d287d1ef..16babe914e 100644 --- a/lib/ddsketch-agent/src/lib.rs +++ b/lib/ddsketch-agent/src/lib.rs @@ -215,6 +215,20 @@ pub struct DDSketch { } impl DDSketch { + fn with_config(config: Config) -> Self { + let initial_bins = cmp::min(INITIAL_BINS, config.bin_limit) as usize; + + Self { + config, + bins: Vec::with_capacity(initial_bins), + count: 0, + min: f64::MAX, + max: f64::MIN, + sum: 0.0, + avg: 0.0, + } + } + #[cfg(test)] fn bin_count(&self) -> usize { self.bins.len() @@ -434,16 +448,31 @@ impl DDSketch { let key = self.config.key(v); - // Fast path for adding to an existing bin. - for b in &mut self.bins { - if b.k == key && b.n < MAX_BIN_WIDTH { - b.n += 1; - return; + let mut insert_at = None; + + for (bin_idx, b) in self.bins.iter_mut().enumerate() { + if b.k == key { + if b.n < MAX_BIN_WIDTH { + // Fast path for adding to an existing bin without overflow. + b.n += 1; + return; + } else { + insert_at = Some(bin_idx); + break; + } + } + if b.k > key { + insert_at = Some(bin_idx); + break; } } - // Slow path could be also optimized. - self.insert_keys(vec![key]); + if let Some(bin_idx) = insert_at { + self.bins.insert(bin_idx, Bin { k: key, n: 1 }); + } else { + self.bins.push(Bin { k: key, n: 1 }); + } + trim_left(&mut self.bins, self.config.bin_limit); } /// Inserts many values into the sketch. @@ -709,17 +738,8 @@ impl PartialEq for DDSketch { impl Default for DDSketch { fn default() -> Self { let config = Config::default(); - let initial_bins = cmp::min(INITIAL_BINS, config.bin_limit) as usize; - Self { - config, - bins: Vec::with_capacity(initial_bins), - count: 0, - min: f64::MAX, - max: f64::MIN, - sum: 0.0, - avg: 0.0, - } + Self::with_config(config) } } @@ -821,6 +841,8 @@ mod tests { use rand::thread_rng; use rand_distr::{Distribution, Pareto}; + use crate::AGENT_DEFAULT_MIN_VALUE; + use super::{Bucket, Config, DDSketch, AGENT_DEFAULT_EPS, MAX_KEY}; const FLOATING_POINT_ACCEPTABLE_ERROR: f64 = 1.0e-10; @@ -1096,11 +1118,12 @@ mod tests { test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value) } - fn parse_sketch_from_string_bins(layout: &str) -> DDSketch { + fn parse_sketch_from_string_bins_with_custom_blank_ddsketch(blank_sketch: DDSketch, layout: &str) -> DDSketch { layout .split(' ') + .filter(|v| !v.is_empty()) .map(|pair| pair.split(':').map(ToOwned::to_owned).collect::>()) - .fold(DDSketch::default(), |mut sketch, mut kn| { + .fold(blank_sketch, |mut sketch, mut kn| { let k = kn.remove(0).parse::().unwrap(); let n = kn.remove(0).parse::().unwrap(); @@ -1109,6 +1132,17 @@ mod tests { }) } + fn parse_sketch_from_string_bins(layout: &str) -> DDSketch { + parse_sketch_from_string_bins_with_custom_blank_ddsketch(DDSketch::default(), layout) + } + + fn parse_sketch_from_string_bins_with_bin_limit(layout: &str, bin_limit: u16) -> DDSketch { + let config = Config::new(AGENT_DEFAULT_EPS, AGENT_DEFAULT_MIN_VALUE, bin_limit); + let sketch = DDSketch::with_config(config); + + parse_sketch_from_string_bins_with_custom_blank_ddsketch(sketch, layout) + } + fn compare_sketches(actual: &DDSketch, expected: &DDSketch, allowed_err: f64) { let actual_sum = actual.sum().unwrap(); let expected_sum = expected.sum().unwrap(); @@ -1124,6 +1158,251 @@ mod tests { assert_eq!(actual.bins(), expected.bins()); } + #[test] + fn test_sketch_trimleft() { + /// values to insert into a sketch + #[allow(dead_code)] + enum Value { + Float(f64), + Vec(Vec), + NFloats(u32, f64), + } + /// ways to insert values into a sketch + #[derive(Debug)] + enum InsertFn { + Insert, + InsertMany, + InsertN, + } + struct Case { + description: &'static str, + start: &'static str, + insert: Value, + expected: &'static str, + max_bins: u16, + } + + let cases = &[ + Case { + description: "baseline: inserting from empty up to bin limit", + start: "", + insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5]), + expected: "0:1 1293:1 1338:1 1364:1", + max_bins: 4, + }, + Case { + description: "inserting from empty to over bin limit", + start: "", + insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5]), + expected: "1293:2 1338:1 1364:1", + max_bins: 3, + }, + Case { + description: "inserting from empty to well over bin limit", + start: "", + insert: Value::Vec(vec![0.0, 0.5, 1.0, 1.5, 0.0, 0.0]), + expected: "1293:4 1338:1 1364:1", + max_bins: 3, + }, + Case { + description: "inserting from empty to over bin limit with overflow", + start: "", + insert: Value::NFloats(65535 * 5, 0.0), + // longstanding trimLeft bug + expected: "0:65535 0:65535 0:65535 0:65535 0:65535", + // actual expected: "0:65535 0:65535 0:65535" + max_bins: 3, + }, + Case { + description: "inserting early bin over the bin limit", + start: "0:65535 0:65535 1338:65535", + insert: Value::Float(0.0), + // longstanding trimLeft bug + expected: "0:1 0:65535 0:65535 1338:65535", + // actual expected: "0:65535 0:65535 1338:65535" + max_bins: 3, + }, + Case { + description: "inserting last bin over the bin limit", + start: "0:65535 0:65535 1338:65535", + insert: Value::Float(1.0), + // This is a bug. I'm not sure what ought to happen here. Need to review the DDSketch paper. + expected: "0:65535 0:65535 1338:1 1338:65535", + // actual expected: something like "1338:65535 1338:65535 1338:65535" ? + max_bins: 3, + }, + ]; + + for case in cases { + for insert_fn in &[InsertFn::Insert, InsertFn::InsertMany, InsertFn::InsertN] { + let mut sketch = parse_sketch_from_string_bins_with_bin_limit(case.start, case.max_bins); + + match insert_fn { + InsertFn::Insert => match &case.insert { + Value::Float(v) => sketch.insert(*v), + Value::Vec(vs) => { + for v in vs { + sketch.insert(*v); + } + } + Value::NFloats(n, v) => { + for _ in 0..*n { + sketch.insert(*v); + } + } + }, + InsertFn::InsertMany => match &case.insert { + Value::Float(v) => sketch.insert_many(&[*v]), + Value::Vec(vs) => sketch.insert_many(vs), + Value::NFloats(n, v) => { + for _ in 0..*n { + sketch.insert_many(&[*v]); + } + } + }, + InsertFn::InsertN => match &case.insert { + Value::Float(v) => sketch.insert_n(*v, 1), + Value::Vec(vs) => { + for v in vs { + sketch.insert_n(*v, 1); + } + } + Value::NFloats(n, v) => sketch.insert_n(*v, *n), + }, + } + + let expected = parse_sketch_from_string_bins_with_bin_limit(case.expected, case.max_bins); + assert_eq!(expected.bins(), sketch.bins(), "{:?}: {}", insert_fn, case.description); + } + } + } + + #[test] + fn test_sketch_insert_and_overflow() { + /// values to insert into a sketch + #[allow(dead_code)] + enum Value { + Float(f64), + Vec(Vec), + NFloats(u32, f64), + } + /// ways to insert values into a sketch + #[derive(Debug)] + enum InsertFn { + Insert, + InsertMany, + InsertN, + } + struct Case { + description: &'static str, + start: &'static str, + insert: Value, + expected: &'static str, + } + + let cases = &[ + Case { + description: "baseline: inserting into an empty sketch", + start: "", + insert: Value::Float(0.0), + expected: "0:1", + }, + Case { + description: "inserting a value into an existing bin", + start: "0:1", + insert: Value::Float(0.0), + expected: "0:2", + }, + Case { + description: "inserting a value into a new bin at the start", + start: "1338:1", + insert: Value::Float(0.0), + expected: "0:1 1338:1", + }, + Case { + description: "inserting a value into a new bin in the middle", + start: "0:1 1338:1", + insert: Value::Float(0.5), + expected: "0:1 1293:1 1338:1", + }, + Case { + description: "inserting a value into a new bin at the end", + start: "0:1", + insert: Value::Float(1.0), + expected: "0:1 1338:1", + }, + Case { + description: "inserting a value into an existing bin and filling it", + start: "0:65534", + insert: Value::Float(0.0), + expected: "0:65535", + }, + Case { + description: "inserting a value into an existing bin and causing an overflow", + start: "0:65535", + insert: Value::Float(0.0), + expected: "0:1 0:65535", + }, + Case { + description: "inserting many values into a sketch and causing an overflow", + start: "0:100", + insert: Value::NFloats(65535, 0.0), + expected: "0:100 0:65535", + }, + Case { + description: "inserting a value into a new bin in the middle and causing an overflow", + start: "0:1 1338:1", + insert: Value::NFloats(65536, 0.5), + expected: "0:1 1293:1 1293:65535 1338:1", + }, + ]; + + for case in cases { + for insert_fn in &[InsertFn::Insert, InsertFn::InsertMany, InsertFn::InsertN] { + // Insert each value every way possible. + + let mut sketch = parse_sketch_from_string_bins(case.start); + + match insert_fn { + InsertFn::Insert => match &case.insert { + Value::Float(v) => sketch.insert(*v), + Value::Vec(vs) => { + for v in vs { + sketch.insert(*v); + } + } + Value::NFloats(n, v) => { + for _ in 0..*n { + sketch.insert(*v); + } + } + }, + InsertFn::InsertMany => match &case.insert { + Value::Float(v) => sketch.insert_many(&[*v]), + Value::Vec(vs) => sketch.insert_many(vs), + Value::NFloats(n, v) => { + for _ in 0..*n { + sketch.insert_many(&[*v]); + } + } + }, + InsertFn::InsertN => match &case.insert { + Value::Float(v) => sketch.insert_n(*v, 1), + Value::Vec(vs) => { + for v in vs { + sketch.insert_n(*v, 1); + } + } + Value::NFloats(n, v) => sketch.insert_n(*v, *n), + }, + } + + let expected = parse_sketch_from_string_bins(case.expected); + assert_eq!(expected.bins(), sketch.bins(), "{:?}: {}", insert_fn, case.description); + } + } + } + #[test] fn test_histogram_interpolation_agent_similarity() { #[derive(Clone)] diff --git a/lib/ddsketch-agent/tests/one_thousand_batched_points_ddsketch.rs b/lib/ddsketch-agent/tests/one_thousand_batched_points_ddsketch.rs new file mode 100644 index 0000000000..44de50869d --- /dev/null +++ b/lib/ddsketch-agent/tests/one_thousand_batched_points_ddsketch.rs @@ -0,0 +1,30 @@ +//! Allocation test for sketch insertion. +//! +//! Note: this is in an integration test so that it will run in its own process +//! and avoid interference from other tests. See notes at: +//! https://docs.rs/dhat/latest/dhat/#heap-usage-testing. + +use crate::common::{insert_many_and_serialize, make_points, MathableHeapStats}; + +mod common; + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +#[test] +fn test_one_thousand_single_points_ddsketch() { + let _profiler = dhat::Profiler::builder().testing().build(); + let points = make_points(1000); + + let before: MathableHeapStats = dhat::HeapStats::get().into(); + insert_many_and_serialize(&points); + let after: MathableHeapStats = dhat::HeapStats::get().into(); + + let diff = after - before; + dhat::assert_eq!(diff.total_blocks, 22); + dhat::assert_eq!(diff.total_bytes, 8096); + dhat::assert_eq!(diff.max_blocks, 3); + dhat::assert_eq!(diff.max_bytes, 3072); + dhat::assert_eq!(diff.curr_blocks, 0); + dhat::assert_eq!(diff.curr_bytes, 0); +} diff --git a/lib/ddsketch-agent/tests/one_thousand_single_points_ddsketch.rs b/lib/ddsketch-agent/tests/one_thousand_single_points_ddsketch.rs index 5254fd65ad..4f2a0432d6 100644 --- a/lib/ddsketch-agent/tests/one_thousand_single_points_ddsketch.rs +++ b/lib/ddsketch-agent/tests/one_thousand_single_points_ddsketch.rs @@ -21,8 +21,8 @@ fn test_one_thousand_single_points_ddsketch() { let after: MathableHeapStats = dhat::HeapStats::get().into(); let diff = after - before; - dhat::assert_eq!(diff.total_blocks, 1426); - dhat::assert_eq!(diff.total_bytes, 249802); + dhat::assert_eq!(diff.total_blocks, 21); + dhat::assert_eq!(diff.total_bytes, 6096); dhat::assert_eq!(diff.max_blocks, 3); dhat::assert_eq!(diff.max_bytes, 3072); dhat::assert_eq!(diff.curr_blocks, 0); diff --git a/lib/ddsketch-agent/tests/ten_single_points_ddsketch.rs b/lib/ddsketch-agent/tests/ten_single_points_ddsketch.rs index c468865ed3..094ab80583 100644 --- a/lib/ddsketch-agent/tests/ten_single_points_ddsketch.rs +++ b/lib/ddsketch-agent/tests/ten_single_points_ddsketch.rs @@ -21,10 +21,10 @@ fn test_ten_single_points_ddsketch() { let after: MathableHeapStats = dhat::HeapStats::get().into(); let diff = after - before; - dhat::assert_eq!(diff.total_blocks, 33); - dhat::assert_eq!(diff.total_bytes, 668); + dhat::assert_eq!(diff.total_blocks, 9); + dhat::assert_eq!(diff.total_bytes, 336); dhat::assert_eq!(diff.max_blocks, 3); - dhat::assert_eq!(diff.max_bytes, 168); + dhat::assert_eq!(diff.max_bytes, 192); dhat::assert_eq!(diff.curr_blocks, 0); dhat::assert_eq!(diff.curr_bytes, 0); }