Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Frequenz Resampling Release Notes

## Bug Fixes

- Fixed `first_timestamp` parameter to only affect output timestamp labeling, not interval grouping. Previously, setting `first_timestamp=false` would shift interval boundaries, causing the first sample at `t=0` to be excluded. Now intervals are consistently `[start, end)` regardless of `first_timestamp` value.

## New Features

- `ResamplingFunction` now implements `Clone`.
7 changes: 4 additions & 3 deletions frequenz/resampling/_rust_backend.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class Resampler:
resampling_function: The resampling function.
max_age_in_intervals: The maximum age of a sample in intervals.
start: The start time of the resampling.
first_timestamp: Whether the resampled timestamp should be the first
timestamp in the buffer or the last timestamp in the buffer.
Defaults to `True`.
first_timestamp: Controls the output timestamp labeling. If `True`,
the output timestamp is set to the start of the interval. If `False`,
the output timestamp is set to the end of the interval. This does not
affect how samples are grouped into intervals. Defaults to `True`.
"""

def push_sample(self, *, timestamp: datetime, value: Optional[float]) -> None:
Expand Down
23 changes: 13 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@ let mut resampler: Resampler<f64, TestSample> =
Resampler::new(TimeDelta::seconds(5), ResamplingFunction::Average, 1, start, false);

let step = TimeDelta::seconds(1);
// Data starts at t=0 with values 1-10
// Interval [0, 5): t=0,1,2,3,4 with values 1,2,3,4,5 → avg = 3.0
// Interval [5, 10): t=5,6,7,8,9 with values 6,7,8,9,10 → avg = 8.0
let data = vec![
TestSample::new(start + step, Some(1.0)),
TestSample::new(start + step * 2, Some(2.0)),
TestSample::new(start + step * 3, Some(3.0)),
TestSample::new(start + step * 4, Some(4.0)),
TestSample::new(start + step * 5, Some(5.0)),
TestSample::new(start + step * 6, Some(6.0)),
TestSample::new(start + step * 7, Some(7.0)),
TestSample::new(start + step * 8, Some(8.0)),
TestSample::new(start + step * 9, Some(9.0)),
TestSample::new(start + step * 10, Some(10.0)),
TestSample::new(start, Some(1.0)),
TestSample::new(start + step, Some(2.0)),
TestSample::new(start + step * 2, Some(3.0)),
TestSample::new(start + step * 3, Some(4.0)),
TestSample::new(start + step * 4, Some(5.0)),
TestSample::new(start + step * 5, Some(6.0)),
TestSample::new(start + step * 6, Some(7.0)),
TestSample::new(start + step * 7, Some(8.0)),
TestSample::new(start + step * 8, Some(9.0)),
TestSample::new(start + step * 9, Some(10.0)),
];

resampler.extend(data);
Expand Down
70 changes: 28 additions & 42 deletions src/resampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,20 @@ pub struct Resampler<
input_start: Option<DateTime<Utc>>,
/// The interval between the first and the second sample in the buffer
input_interval: Option<TimeDelta>,
/// Whether the resampled timestamp should be the first timestamp (if
/// `first_timestamp` is `true`) or the last timestamp (if
/// `first_timestamp` is `false`) in the buffer.
/// If `first_timestamp` is `true`, the resampled timestamp will be the
/// timestamp of the first sample in the buffer and the aggregation will
/// be done with the samples that are `interval` in the future.
/// If `first_timestamp` is `false`, the resampled timestamp will be the
/// timestamp of the last sample in the buffer and the aggregation will
/// be done with the samples that are `interval` in the past.
/// Controls the output timestamp labeling for resampled samples.
///
/// This parameter only affects how output timestamps are labeled, not how
/// samples are grouped into intervals. Intervals are always `[start, end)`.
///
/// - If `first_timestamp` is `true`, the output timestamp is set to the
/// start of the interval.
/// - If `first_timestamp` is `false`, the output timestamp is set to the
/// end of the interval.
///
/// For example, with an interval of 5 seconds starting at t=0:
/// - Interval `[0, 5)` contains samples with timestamps 0, 1, 2, 3, 4
/// - If `first_timestamp=true`: output timestamp = 0
/// - If `first_timestamp=false`: output timestamp = 5
first_timestamp: bool,
}

Expand Down Expand Up @@ -173,13 +178,7 @@ impl<
while self.start < end {
// loop over the samples in the buffer
while next_sample
.map(|s| {
is_left_of_buffer_edge(
self.first_timestamp,
&s.timestamp(),
&(self.start + self.interval),
)
})
.map(|s| is_left_of_buffer_edge(&s.timestamp(), &(self.start + self.interval)))
.unwrap_or(false)
{
// next sample is not newer than the current interval
Expand All @@ -204,9 +203,7 @@ impl<
let input_interval = self.input_interval.unwrap_or(self.interval);
let drain_end_date =
self.start + self.interval - input_interval * self.max_age_in_intervals;
interval_buffer.retain(|s| {
is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
});
interval_buffer.retain(|s| is_right_of_buffer_edge(&s.timestamp(), &drain_end_date));

// resample the interval_buffer
res.push(Sample::new(
Expand All @@ -221,9 +218,8 @@ impl<
// Remove samples from buffer that are older than max_age
let interval = self.input_interval.unwrap_or(self.interval);
let drain_end_date = end - interval * self.max_age_in_intervals;
self.buffer.retain(|s| {
is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
});
self.buffer
.retain(|s| is_right_of_buffer_edge(&s.timestamp(), &drain_end_date));

res
}
Expand Down Expand Up @@ -259,26 +255,16 @@ pub(crate) fn epoch_align(
.unwrap_or(timestamp)
}

fn is_left_of_buffer_edge(
first_timestamp: bool,
timestamp: &DateTime<Utc>,
edge_timestamp: &DateTime<Utc>,
) -> bool {
if first_timestamp {
timestamp < edge_timestamp
} else {
timestamp <= edge_timestamp
}
/// Checks if a timestamp is within the interval for aggregation.
/// Uses exclusive upper bound: sample is included if timestamp < edge.
/// This creates intervals of the form [start, end).
fn is_left_of_buffer_edge(timestamp: &DateTime<Utc>, edge_timestamp: &DateTime<Utc>) -> bool {
timestamp < edge_timestamp
}

fn is_right_of_buffer_edge(
first_timestamp: bool,
timestamp: &DateTime<Utc>,
edge_timestamp: &DateTime<Utc>,
) -> bool {
if first_timestamp {
timestamp >= edge_timestamp
} else {
timestamp > edge_timestamp
}
/// Checks if a timestamp should be retained in the buffer.
/// Uses inclusive lower bound: sample is retained if timestamp >= edge.
/// This creates intervals of the form [start, end).
fn is_right_of_buffer_edge(timestamp: &DateTime<Utc>, edge_timestamp: &DateTime<Utc>) -> bool {
timestamp >= edge_timestamp
}
Loading