Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Added the `closed` parameter to `ResamplerConfig`, allowing users to configure the openness of the resampling window (`"right"` or `"left"`).
- Introduced the `label` parameter to `ResamplerConfig`, enabling users to specify whether the resampled timestamp represents the start or end of the resampling window.

## Bug Fixes

Expand Down
19 changes: 18 additions & 1 deletion src/frequenz/sdk/timeseries/_resampling/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Protocol
from typing import Literal, Protocol

from frequenz.core.datetime import UNIX_EPOCH

Expand Down Expand Up @@ -126,6 +126,23 @@ class ResamplerConfig:
value.
"""

closed: Literal["right", "left"] = "right"
"""Indicates which side of the resampling window is closed.

If "right", the resampling window includes samples with timestamps
equal to the end of the window, but not those equal to the start.
If "left", the resampling window includes samples with timestamps
equal to the start of the window, but not those equal to the end.
"""

label: Literal["start", "end"] = "end"
"""Indicates the timestamp label of the resampled data.

If "end", the timestamp of the resampled data corresponds to the end
of the resampling window. If "start", the timestamp corresponds to
the start of the resampling window.
"""

initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
"""The initial length of the resampling buffer.

Expand Down
33 changes: 24 additions & 9 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import itertools
import logging
import math
from bisect import bisect
from bisect import bisect, bisect_left
from collections import deque
from datetime import datetime, timedelta, timezone
from typing import assert_never
Expand Down Expand Up @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
"""Generate a new sample based on all the current *relevant* samples.

Args:
timestamp: The timestamp to be used to calculate the new sample.
timestamp: The reference timestamp for the resampling process. This
timestamp indicates the end of the resampling period.

Returns:
A new sample generated by calling the resampling function with all
Expand All @@ -437,12 +438,20 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
)
minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods

min_index = bisect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior how to resample, i.e. left or right open and the labeling should be config parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make left or right opened configurable with the corresponding label, such as:
right open: [t,t+1) -> labeled as t
left open: (t-1,t] -> labeled as t (the current behavior)
Or do you want to allow the user to additionally do something like:
right open, label in the end: [t,t+1) -> labeled as t+1
left open, label in the beginning:(t-1,t] -> labeled as t-1
Because I think the later could lead to a lot of confusion (not sure if its ever needed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the latter is also reasonable options (see e.g. https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.resample.html), but don't see a strong reason to implement this now if not needed. If it's well-documented, the users can also adjust the timestamps trivially. So your proposal sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, whatever we do, we should probably be much more explicit of how output samples are calculated and structured.

self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
if self._config.closed == "left":
min_index = bisect_left(
self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect_left(self._buffer, timestamp, key=lambda s: s[0])
else:
min_index = bisect(
self._buffer,
minimum_relevant_timestamp,
key=lambda s: s[0],
)
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
# Using itertools for slicing doesn't look very efficient, but
# experiments with a custom (ring) buffer that can slice showed that
# it is not that bad. See:
Expand All @@ -458,7 +467,13 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
if relevant_samples
else None
)
return Sample(timestamp, None if value is None else Quantity(value))

sample_time = (
timestamp - conf.resampling_period
if self._config.label == "start"
else timestamp
)
return Sample(sample_time, None if value is None else Quantity(value))

def _log_no_relevant_samples(
self, minimum_relevant_timestamp: datetime, timestamp: datetime
Expand Down
172 changes: 172 additions & 0 deletions tests/timeseries/test_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from collections.abc import AsyncIterator
from datetime import datetime, timedelta, timezone
from typing import Literal
from unittest.mock import AsyncMock, MagicMock

import async_solipsism
Expand Down Expand Up @@ -1504,6 +1505,177 @@ async def test_resampling_all_zeros(
assert _get_buffer_len(resampler, source_receiver) == 3


@pytest.mark.parametrize("closed", ["right", "left"])
async def test_resampler_closed_option(
closed: Literal["right", "left"],
fake_time: time_machine.Coordinates,
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test the `closed` option in ResamplerConfig."""
timestamp = datetime.now(timezone.utc)

resampling_period_s = 2
expected_resampled_value = 42.0

resampling_fun_mock = MagicMock(
spec=ResamplingFunction, return_value=expected_resampled_value
)
config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=1.0,
resampling_function=resampling_fun_mock,
closed=closed,
)
resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)
source_props = resampler.get_source_properties(source_receiver)

# Test timeline
#
# t(s) 0 1 2 2.5 3 4
# |----------|----------R----|-----|----------R-----> (no more samples)
# value 5.0 10.0 15.0 1.0 4.0 5.0
#
# R = resampling is done

# Send a few samples and run a resample tick, advancing the fake time by one period
sample1 = Sample(timestamp, value=Quantity(5.0))
sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0))
sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0))
await source_sender.send(sample1)
await source_sender.send(sample2)
await source_sender.send(sample3)

await _advance_time(fake_time, resampling_period_s)
await resampler.resample(one_shot=True)

assert datetime.now(timezone.utc).timestamp() == 2
sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(expected_resampled_value),
)
)
# Assert the behavior based on the `closed` option
if closed == "right":
resampling_fun_mock.assert_called_once_with(
a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)),
config,
source_props,
)
elif closed == "left":
resampling_fun_mock.assert_called_once_with(
a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)),
config,
source_props,
)
assert source_props == SourceProperties(
sampling_start=timestamp, received_samples=3, sampling_period=None
)
assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len
sink_mock.reset_mock()
resampling_fun_mock.reset_mock()

# Additional samples at 2.5, 3, and 4 seconds
sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0))
sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0))
sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0))
await source_sender.send(sample4)
await source_sender.send(sample5)
await source_sender.send(sample6)

# Advance time to 4 seconds and resample again
await _advance_time(fake_time, resampling_period_s * 2)
await resampler.resample(one_shot=True)

sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s * 2),
Quantity(expected_resampled_value),
)
)
if closed == "right":
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample4),
as_float_tuple(sample5),
as_float_tuple(sample6),
),
config,
source_props,
)
elif closed == "left":
resampling_fun_mock.assert_called_once_with(
a_sequence(
as_float_tuple(sample3),
as_float_tuple(sample4),
as_float_tuple(sample5),
),
config,
source_props,
)
assert source_props == SourceProperties(
sampling_start=timestamp, received_samples=6, sampling_period=None
)
assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len


@pytest.mark.parametrize("label", ["start", "end"])
async def test_resampler_label_option(
label: Literal["start", "end"],
fake_time: time_machine.Coordinates,
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test the `label` option in ResamplerConfig."""
timestamp = datetime.now(timezone.utc)

resampling_period_s = 2
expected_resampled_value = 42.0

resampling_fun_mock = MagicMock(
spec=ResamplingFunction, return_value=expected_resampled_value
)
config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=1.0,
resampling_function=resampling_fun_mock,
label=label,
)
resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)

# Send samples and resample
sample1 = Sample(timestamp, value=Quantity(5.0))
sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0))
await source_sender.send(sample1)
await source_sender.send(sample2)

await _advance_time(fake_time, resampling_period_s)
await resampler.resample(one_shot=True)

# Assert the timestamp of the resampled sample
expected_timestamp = (
timestamp
if label == "start"
else timestamp + timedelta(seconds=resampling_period_s)
)
sink_mock.assert_called_once_with(
Sample(expected_timestamp, Quantity(expected_resampled_value))
)


def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int:
# pylint: disable-next=protected-access
blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen
Expand Down