Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/user-guide/costs/costrbf.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ algo = rpt.Dynp(custom_cost=c)
algo = rpt.Dynp(model="rbf")
```

### Memory usage

By default, the full $n \times n$ Gram matrix is precomputed and stored in memory after `fit()` (`quadratic_precompute=True`, $O(n^2)$ memory).
For very large signals, set `quadratic_precompute=False` to evaluate the Gram matrix on demand instead, reducing memory to $O(n)$ at the cost of extra computation at inference time.
Setting `quadratic_precompute=None` switches automatically: explicit storage for $n \leq 4096$, on-demand otherwise.

```python
c = rpt.costs.CostRbf(quadratic_precompute=False)
```

## References

<a id="Garreau2018">[Garreau2018]</a>
Expand Down
195 changes: 172 additions & 23 deletions src/ruptures/costs/costrbf.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,193 @@
r"""Kernelized mean change."""

import typing
import functools

import numpy as np
from scipy.spatial.distance import pdist, squareform

from ruptures.exceptions import NotEnoughPoints
from ruptures.base import BaseCost


class _ImplicitSquareDistanceMatrix:
_signal: np.typing.NDArray[np.number]

def __init__(self, signal: np.typing.NDArray[np.number]) -> None:
self._signal = signal

@functools.cached_property
def _centered_signal(self) -> np.ndarray:
return self._signal - self._signal.mean(axis=0)[None, :]

@functools.cached_property
def _sqnorms(self) -> np.ndarray:
return (self._centered_signal**2).sum(axis=1)

@functools.cached_property
def shape(self) -> tuple[int, int]:
return (self._signal.shape[0],) * 2

def __getitem__(
self,
idx: tuple[int | slice | np.ndarray, int | slice | np.ndarray],
) -> np.typing.NDArray[np.number]:
idxr, idxc = idx
sqnorms_row = self._sqnorms[idxr]
sqnorms_row_1d = np.atleast_1d(sqnorms_row)
sqnorms_col = self._sqnorms[idxc]
sqnorms_col_1d = np.atleast_1d(sqnorms_col)
mat = (-2 * np.atleast_2d(self._centered_signal[idxr])) @ np.atleast_2d(
self._centered_signal[idxc]
).T
mat += sqnorms_row_1d[:, None]
mat += sqnorms_col_1d[None, :]
np.maximum(mat, 0, out=mat)
if sqnorms_col.ndim < 1:
mat = mat[:, 0]
if sqnorms_row.ndim < 1:
mat = mat[0]
return mat


def _explicit_square_distance_matrix(signal: np.typing.NDArray[np.number]):
centered_signal = signal - signal.mean(axis=0)[None, :]
sqnorms = (centered_signal**2).sum(axis=1)
mat = (-2 * centered_signal) @ centered_signal.T
mat += sqnorms[:, None]
mat += sqnorms[None, :]

np.maximum(mat, 0, out=mat)
return mat


class _ImplicitGramMatrix:
_square_distance_matrix: _ImplicitSquareDistanceMatrix
_gamma: float

def __init__(
self, square_distance_matrix: _ImplicitSquareDistanceMatrix, gamma: float
):
self._square_distance_matrix = square_distance_matrix
self._gamma = gamma

@functools.cached_property
def shape(self):
return self._square_distance_matrix.shape

def __getitem__(
self,
idx: tuple[int | slice | np.ndarray, int | slice | np.ndarray],
) -> np.typing.NDArray[np.number]:
mat = self._square_distance_matrix[idx]
mat *= -self._gamma
np.exp(mat, out=mat)
return mat


class CostRbf(BaseCost):
r"""Kernel cost function (rbf kernel)."""
r"""Kernel cost function using the radial basis function (RBF) kernel.

The cost of a segment :math:`[a, b)` is defined as:

.. math::
c(a, b) = \sum_{i=a}^{b-1} K(x_i, x_i)
- \frac{1}{b - a} \sum_{i=a}^{b-1} \sum_{j=a}^{b-1} K(x_i, x_j)

where :math:`K(x, y) = \exp(-\gamma \|x - y\|^2)` is the RBF kernel.
"""

model = "rbf"

def __init__(self, gamma=None):
"""Initialize the object."""
min_size: int
gamma: typing.Optional[float]
signal: typing.Optional[np.typing.NDArray[np.number]]
_gram: typing.Optional[np.typing.NDArray[np.number]]
quadratic_precompute: typing.Optional[bool]

def __init__(
self,
gamma: typing.Optional[float] = None,
quadratic_precompute: typing.Optional[bool] = True,
):
"""Initialize the CostRbf instance.

Args:
gamma (float or None): Bandwidth parameter of the RBF kernel,
defined as :math:`K(x, y) = \\exp(-\\gamma \\|x - y\\|^2)`.
Must be strictly positive when provided.
If ``None`` (default), ``gamma`` is set automatically after
``fit()`` using the median heuristic:
:math:`\\gamma = 1 / \\operatorname{median}(\\|x_i - x_j\\|^2)`.
For signals with more than 4096 samples, the median is estimated
on a regularly-spaced subsample to keep the cost tractable.

quadratic_precompute (bool or None): Controls whether the full
:math:`n \\times n` Gram matrix is computed and stored in memory
after ``fit()``.

- ``True`` (default): always precompute and store the full Gram
matrix. Memory complexity is :math:`O(n^2)`. Fastest at
inference time.
- ``False``: never precompute. The Gram matrix is evaluated
on-demand for each queried block. Memory complexity is
:math:`O(n)`. Recommended for very large signals.
- ``None``: automatic mode. Behaves like ``True`` when
:math:`n \\leq 4096`, and like ``False`` otherwise.
"""
self.min_size = 1
self.gamma = gamma
self._gram = None
self.signal = None
self.quadratic_precompute = quadratic_precompute

@property
def gram(self):
@functools.cached_property
def gram(self) -> typing.Union[np.typing.NDArray[np.number], _ImplicitGramMatrix]:
"""Generate the gram matrix (lazy loading).

Only access this function after a `.fit()` (otherwise
`self.signal` is not defined).
"""
if self._gram is None:
K = pdist(self.signal, metric="sqeuclidean")

if self.signal is None:
raise ValueError(".fit() must be used before accessing this matrix")

if (
self.quadratic_precompute
or self.quadratic_precompute is None
and self.signal.shape[0] <= 4096
):
mat = _explicit_square_distance_matrix(self.signal)
if self.gamma is None:
# median heuristic (on whole matrix if n<=4096, on subsample
# else)
subsample_factor = int(np.ceil(self.signal.shape[0] / 4096))
med: float = np.median(
mat[::subsample_factor, (subsample_factor // 2) :: subsample_factor]
)
self.gamma = 1.0
# median heuristics
K_median = np.median(K)
if K_median != 0:
# K /= K_median
self.gamma = 1 / K_median
K *= self.gamma
np.clip(K, 1e-2, 1e2, K) # clipping to avoid exponential under/overflow
self._gram = np.exp(squareform(-K))
return self._gram

def fit(self, signal) -> "CostRbf":
if med > 0:
self.gamma = 1 / med
mat *= -self.gamma
np.exp(mat, out=mat)
return mat

square_distance_matrix = _ImplicitSquareDistanceMatrix(self.signal)
gamma = self.gamma
if gamma is None:
# median heuristic on a subsample
subsample_factor = int(np.ceil(self.signal.shape[0] / 4096))
med: float = np.median(
square_distance_matrix[
::subsample_factor, (subsample_factor // 2) :: subsample_factor
]
)
gamma = 1.0
if med > 0:
gamma = 1 / med
self.gamma = gamma
return _ImplicitGramMatrix(square_distance_matrix, gamma)

def fit(self, signal: np.typing.ArrayLike) -> "CostRbf":
"""Sets parameters of the instance.

Args:
Expand All @@ -48,10 +196,11 @@ def fit(self, signal) -> "CostRbf":
Returns:
self
"""
if signal.ndim == 1:
self.signal = signal.reshape(-1, 1)
signal_ndarray: np.typing.NDArray[np.number] = np.asarray(signal)
if signal_ndarray.ndim == 1:
self.signal = signal_ndarray.reshape(-1, 1)
else:
self.signal = signal
self.signal = signal_ndarray

# If gamma is none, set it using the median heuristic.
# This heuristic involves computing the gram matrix which is lazy loaded
Expand All @@ -61,7 +210,7 @@ def fit(self, signal) -> "CostRbf":

return self

def error(self, start, end) -> float:
def error(self, start: int, end: int) -> float:
"""Return the approximation cost on the segment [start:end].

Args:
Expand Down
51 changes: 50 additions & 1 deletion tests/test_costs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import numpy as np
import scipy
import pytest
from ruptures import Binseg
from ruptures.costs import CostLinear, CostNormal, cost_factory
from ruptures.costs import CostLinear, CostNormal, CostRbf, cost_factory
from ruptures.costs.costml import CostMl
from ruptures.datasets import pw_constant
from ruptures.exceptions import NotEnoughPoints
Expand Down Expand Up @@ -237,3 +238,51 @@ def test_costl2_small_data():
"got {computed_break_dict}.",
)
assert expected_break_dict == computed_break_dict, err_msg


@pytest.mark.parametrize("explicit", (True, False))
def test_costrbf_explicit_implicit(explicit):
"""Test if the cost RBF use a valid kernel."""

rng = np.random.default_rng(seed=123567890)

data = rng.normal(size=(2048, 3))
examinated = slice(512, 1024)
gamma = 1.5
cost = CostRbf(quadratic_precompute=explicit, gamma=gamma)
cost.fit(data)

dist_sq = scipy.spatial.distance.squareform(
scipy.spatial.distance.pdist(data[examinated], "sqeuclidean")
)
gram = np.exp(-gamma * dist_sq)
assert np.allclose(gram, cost.gram[examinated, examinated])


@pytest.mark.parametrize(
"n_samples_explicit",
[
(n_samples, explicit)
for n_samples in (5, 100, 2_000, 50_000, 1_000_000, 20_000_000)
for explicit in (True, False)
if explicit is False or n_samples < 1000
],
)
def test_cost_rbf_gamma_heuristic(n_samples_explicit):
"""Test if the heuristic formula works as expected for computing median in
cost RBF."""

n_samples, explicit = n_samples_explicit
cost = CostRbf(quadratic_precompute=explicit)
cost.fit(np.array(np.arange(n_samples), dtype=np.float64)[:, None])
dist_median_from_heuristic = cost.gamma ** (-0.5)
dist_median_from_asympto_formula = (1 - 0.5**0.5) * n_samples

# two source of errors:
# - absolute error of 1 is allowed (median of integers)
# - relative error of 1e-3 is allowed
absolute_error = np.abs(
dist_median_from_heuristic - dist_median_from_asympto_formula
)
relative_error = absolute_error / dist_median_from_asympto_formula
assert absolute_error <= 1 or relative_error <= 1e-3
Loading