From f0a6d6088d7d1680e46dd7681164c5f493b14eba Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:13:50 +0100 Subject: [PATCH 1/5] Add `closed` option to ResamplerConfig to define resampling window behavior - Introduced the `closed` parameter in ResamplerConfig with options `right` (default) and `left`. - Updated the resampling logic to respect the `closed` configuration: - `right`: Includes samples at the end of the window, excludes those at the start. - `left`: Includes samples at the start of the window, excludes those at the end. - Adjusted documentation to reflect the new `closed` parameter and its behavior. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_config.py | 11 +++++++- .../sdk/timeseries/_resampling/_resampler.py | 25 +++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 047892803..b6079aeb8 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -10,7 +10,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import Protocol +from typing import Literal, Protocol from frequenz.core.datetime import UNIX_EPOCH @@ -126,6 +126,15 @@ class ResamplerConfig: value. """ + closed: Literal["right", "left"] = "right" + """Indicates which side of the resampling window is closed. + + If "right", the resampling window includes samples with timestamps + equal to the end of the window, but not those equal to the start. + If "left", the resampling window includes samples with timestamps + equal to the start of the window, but not those equal to the end. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 81471fc0d..68b2931d5 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -9,7 +9,7 @@ import itertools import logging import math -from bisect import bisect +from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone from typing import assert_never @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: """Generate a new sample based on all the current *relevant* samples. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The reference timestamp for the resampling process. This + timestamp indicates the end of the resampling period. Returns: A new sample generated by calling the resampling function with all @@ -437,12 +438,20 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: ) minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods - min_index = bisect( - self._buffer, - minimum_relevant_timestamp, - key=lambda s: s[0], - ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) + if self._config.closed == "left": + min_index = bisect_left( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect_left(self._buffer, timestamp, key=lambda s: s[0]) + else: + min_index = bisect( + self._buffer, + minimum_relevant_timestamp, + key=lambda s: s[0], + ) + max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: From 11c356b54f8376671f44e51f50376ea24c42e67c Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:22:07 +0100 Subject: [PATCH 2/5] Add `label` option to ResamplerConfig to define timestamp labeling of resampled data - Introduced the `label` parameter in ResamplerConfig with options `end` (default) and `start`. - Updated the resampling logic to respect the `label` configuration: - `end`: The timestamp of the resampled data corresponds to the end of the resampling window. - `start`: The timestamp of the resampled data corresponds to the start of the resampling window. - Adjusted the logic for setting `sample_time` to use the `label` configuration. - Updated documentation to reflect the new `label` parameter and its behavior. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/_resampling/_config.py | 8 ++++++++ src/frequenz/sdk/timeseries/_resampling/_resampler.py | 8 +++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index b6079aeb8..72515df66 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -135,6 +135,14 @@ class ResamplerConfig: equal to the start of the window, but not those equal to the end. """ + label: Literal["start", "end"] = "end" + """Indicates the timestamp label of the resampled data. + + If "end", the timestamp of the resampled data corresponds to the end + of the resampling window. If "start", the timestamp corresponds to + the start of the resampling window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 68b2931d5..a1e2dc488 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -467,7 +467,13 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) - return Sample(timestamp, None if value is None else Quantity(value)) + + sample_time = ( + timestamp - conf.resampling_period + if self._config.label == "start" + else timestamp + ) + return Sample(sample_time, None if value is None else Quantity(value)) def _log_no_relevant_samples( self, minimum_relevant_timestamp: datetime, timestamp: datetime From c71298b7333c13dc37ecb6025a733285386273c7 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 10:59:16 +0100 Subject: [PATCH 3/5] Add test for `closed` option in ResamplerConfig with additional samples - Enhanced the `test_resampler_closed_option` to include additional samples at 2.5, 3, and 4 seconds. - Verified the behavior of the `closed` option ("right" and "left") with the extended timeline. - Added assertions to ensure correct resampling function calls and sink outputs for the new samples. - Confirmed that source properties and buffer length are updated correctly after processing the additional samples. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 122 ++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 3629b5f4e..ff6778f13 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -8,6 +8,7 @@ import logging from collections.abc import AsyncIterator from datetime import datetime, timedelta, timezone +from typing import Literal from unittest.mock import AsyncMock, MagicMock import async_solipsism @@ -1504,6 +1505,127 @@ async def test_resampling_all_zeros( assert _get_buffer_len(resampler, source_receiver) == 3 +@pytest.mark.parametrize("closed", ["right", "left"]) +async def test_resampler_closed_option( + closed: Literal["right", "left"], + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `closed` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=closed, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + source_props = resampler.get_source_properties(source_receiver) + + # Test timeline + # + # t(s) 0 1 2 2.5 3 4 + # |----------|----------R----|-----|----------R-----> (no more samples) + # value 5.0 10.0 15.0 1.0 4.0 5.0 + # + # R = resampling is done + + # Send a few samples and run a resample tick, advancing the fake time by one period + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + await source_sender.send(sample3) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + assert datetime.now(timezone.utc).timestamp() == 2 + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(expected_resampled_value), + ) + ) + # Assert the behavior based on the `closed` option + if closed == "right": + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)), + config, + source_props, + ) + elif closed == "left": + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=3, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + sink_mock.reset_mock() + resampling_fun_mock.reset_mock() + + # Additional samples at 2.5, 3, and 4 seconds + sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0)) + sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0)) + sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0)) + await source_sender.send(sample4) + await source_sender.send(sample5) + await source_sender.send(sample6) + + # Advance time to 4 seconds and resample again + await _advance_time(fake_time, resampling_period_s * 2) + await resampler.resample(one_shot=True) + + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s * 2), + Quantity(expected_resampled_value), + ) + ) + if closed == "right": + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample4), + as_float_tuple(sample5), + as_float_tuple(sample6), + ), + config, + source_props, + ) + elif closed == "left": + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample3), + as_float_tuple(sample4), + as_float_tuple(sample5), + ), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=6, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From 414d63d719e70facf24e28113bdb3908caed495a Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 11:01:35 +0100 Subject: [PATCH 4/5] Add test for `label` option in ResamplerConfig to verify timestamp behavior - Introduced `test_resampler_label_option` to validate the `label` configuration in ResamplerConfig. - Tested both `start` and `end` options to ensure the resampled datas timestamp corresponds to the start or end of the resampling window, respectively. - Verified sink outputs with the expected timestamp and resampled value. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index ff6778f13..101bff8e9 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -1626,6 +1626,56 @@ async def test_resampler_closed_option( assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len +@pytest.mark.parametrize("label", ["start", "end"]) +async def test_resampler_label_option( + label: Literal["start", "end"], + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `label` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + label=label, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + # Send samples and resample + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + # Assert the timestamp of the resampled sample + expected_timestamp = ( + timestamp + if label == "start" + else timestamp + timedelta(seconds=resampling_period_s) + ) + sink_mock.assert_called_once_with( + Sample(expected_timestamp, Quantity(expected_resampled_value)) + ) + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From 297ec7cc9458f4467a23d35873efa065dba6eb7f Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 17:06:34 +0100 Subject: [PATCH 5/5] Update release notes Signed-off-by: Malte Schaaf --- RELEASE_NOTES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 34cef0f7a..a4d81aa96 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,8 @@ ## New Features - +- Added the `closed` parameter to `ResamplerConfig`, allowing users to configure the openness of the resampling window (`"right"` or `"left"`). +- Introduced the `label` parameter to `ResamplerConfig`, enabling users to specify whether the resampled timestamp represents the start or end of the resampling window. ## Bug Fixes