diff --git a/docs/user-guide/costs/costrbf.md b/docs/user-guide/costs/costrbf.md index 0365d550..d7477d00 100644 --- a/docs/user-guide/costs/costrbf.md +++ b/docs/user-guide/costs/costrbf.md @@ -59,6 +59,16 @@ algo = rpt.Dynp(custom_cost=c) algo = rpt.Dynp(model="rbf") ``` +### Memory usage + +By default, the full $n \times n$ Gram matrix is precomputed and stored in memory after `fit()` (`quadratic_precompute=True`, $O(n^2)$ memory). +For very large signals, set `quadratic_precompute=False` to evaluate the Gram matrix on demand instead, reducing memory to $O(n)$ at the cost of extra computation at inference time. +Setting `quadratic_precompute=None` switches automatically: explicit storage for $n \leq 4096$, on-demand otherwise. + +```python +c = rpt.costs.CostRbf(quadratic_precompute=False) +``` + ## References [Garreau2018] diff --git a/src/ruptures/costs/costrbf.py b/src/ruptures/costs/costrbf.py index 788b86ca..1aff349e 100644 --- a/src/ruptures/costs/costrbf.py +++ b/src/ruptures/costs/costrbf.py @@ -1,45 +1,193 @@ r"""Kernelized mean change.""" +import typing +import functools + import numpy as np -from scipy.spatial.distance import pdist, squareform from ruptures.exceptions import NotEnoughPoints from ruptures.base import BaseCost +class _ImplicitSquareDistanceMatrix: + _signal: np.typing.NDArray[np.number] + + def __init__(self, signal: np.typing.NDArray[np.number]) -> None: + self._signal = signal + + @functools.cached_property + def _centered_signal(self) -> np.ndarray: + return self._signal - self._signal.mean(axis=0)[None, :] + + @functools.cached_property + def _sqnorms(self) -> np.ndarray: + return (self._centered_signal**2).sum(axis=1) + + @functools.cached_property + def shape(self) -> tuple[int, int]: + return (self._signal.shape[0],) * 2 + + def __getitem__( + self, + idx: tuple[int | slice | np.ndarray, int | slice | np.ndarray], + ) -> np.typing.NDArray[np.number]: + idxr, idxc = idx + sqnorms_row = self._sqnorms[idxr] + sqnorms_row_1d = np.atleast_1d(sqnorms_row) + sqnorms_col = self._sqnorms[idxc] + sqnorms_col_1d = np.atleast_1d(sqnorms_col) + mat = (-2 * np.atleast_2d(self._centered_signal[idxr])) @ np.atleast_2d( + self._centered_signal[idxc] + ).T + mat += sqnorms_row_1d[:, None] + mat += sqnorms_col_1d[None, :] + np.maximum(mat, 0, out=mat) + if sqnorms_col.ndim < 1: + mat = mat[:, 0] + if sqnorms_row.ndim < 1: + mat = mat[0] + return mat + + +def _explicit_square_distance_matrix(signal: np.typing.NDArray[np.number]): + centered_signal = signal - signal.mean(axis=0)[None, :] + sqnorms = (centered_signal**2).sum(axis=1) + mat = (-2 * centered_signal) @ centered_signal.T + mat += sqnorms[:, None] + mat += sqnorms[None, :] + + np.maximum(mat, 0, out=mat) + return mat + + +class _ImplicitGramMatrix: + _square_distance_matrix: _ImplicitSquareDistanceMatrix + _gamma: float + + def __init__( + self, square_distance_matrix: _ImplicitSquareDistanceMatrix, gamma: float + ): + self._square_distance_matrix = square_distance_matrix + self._gamma = gamma + + @functools.cached_property + def shape(self): + return self._square_distance_matrix.shape + + def __getitem__( + self, + idx: tuple[int | slice | np.ndarray, int | slice | np.ndarray], + ) -> np.typing.NDArray[np.number]: + mat = self._square_distance_matrix[idx] + mat *= -self._gamma + np.exp(mat, out=mat) + return mat + + class CostRbf(BaseCost): - r"""Kernel cost function (rbf kernel).""" + r"""Kernel cost function using the radial basis function (RBF) kernel. + + The cost of a segment :math:`[a, b)` is defined as: + + .. math:: + c(a, b) = \sum_{i=a}^{b-1} K(x_i, x_i) + - \frac{1}{b - a} \sum_{i=a}^{b-1} \sum_{j=a}^{b-1} K(x_i, x_j) + + where :math:`K(x, y) = \exp(-\gamma \|x - y\|^2)` is the RBF kernel. + """ model = "rbf" - def __init__(self, gamma=None): - """Initialize the object.""" + min_size: int + gamma: typing.Optional[float] + signal: typing.Optional[np.typing.NDArray[np.number]] + _gram: typing.Optional[np.typing.NDArray[np.number]] + quadratic_precompute: typing.Optional[bool] + + def __init__( + self, + gamma: typing.Optional[float] = None, + quadratic_precompute: typing.Optional[bool] = True, + ): + """Initialize the CostRbf instance. + + Args: + gamma (float or None): Bandwidth parameter of the RBF kernel, + defined as :math:`K(x, y) = \\exp(-\\gamma \\|x - y\\|^2)`. + Must be strictly positive when provided. + If ``None`` (default), ``gamma`` is set automatically after + ``fit()`` using the median heuristic: + :math:`\\gamma = 1 / \\operatorname{median}(\\|x_i - x_j\\|^2)`. + For signals with more than 4096 samples, the median is estimated + on a regularly-spaced subsample to keep the cost tractable. + + quadratic_precompute (bool or None): Controls whether the full + :math:`n \\times n` Gram matrix is computed and stored in memory + after ``fit()``. + + - ``True`` (default): always precompute and store the full Gram + matrix. Memory complexity is :math:`O(n^2)`. Fastest at + inference time. + - ``False``: never precompute. The Gram matrix is evaluated + on-demand for each queried block. Memory complexity is + :math:`O(n)`. Recommended for very large signals. + - ``None``: automatic mode. Behaves like ``True`` when + :math:`n \\leq 4096`, and like ``False`` otherwise. + """ self.min_size = 1 self.gamma = gamma self._gram = None + self.signal = None + self.quadratic_precompute = quadratic_precompute - @property - def gram(self): + @functools.cached_property + def gram(self) -> typing.Union[np.typing.NDArray[np.number], _ImplicitGramMatrix]: """Generate the gram matrix (lazy loading). Only access this function after a `.fit()` (otherwise `self.signal` is not defined). """ - if self._gram is None: - K = pdist(self.signal, metric="sqeuclidean") + + if self.signal is None: + raise ValueError(".fit() must be used before accessing this matrix") + + if ( + self.quadratic_precompute + or self.quadratic_precompute is None + and self.signal.shape[0] <= 4096 + ): + mat = _explicit_square_distance_matrix(self.signal) if self.gamma is None: + # median heuristic (on whole matrix if n<=4096, on subsample + # else) + subsample_factor = int(np.ceil(self.signal.shape[0] / 4096)) + med: float = np.median( + mat[::subsample_factor, (subsample_factor // 2) :: subsample_factor] + ) self.gamma = 1.0 - # median heuristics - K_median = np.median(K) - if K_median != 0: - # K /= K_median - self.gamma = 1 / K_median - K *= self.gamma - np.clip(K, 1e-2, 1e2, K) # clipping to avoid exponential under/overflow - self._gram = np.exp(squareform(-K)) - return self._gram - - def fit(self, signal) -> "CostRbf": + if med > 0: + self.gamma = 1 / med + mat *= -self.gamma + np.exp(mat, out=mat) + return mat + + square_distance_matrix = _ImplicitSquareDistanceMatrix(self.signal) + gamma = self.gamma + if gamma is None: + # median heuristic on a subsample + subsample_factor = int(np.ceil(self.signal.shape[0] / 4096)) + med: float = np.median( + square_distance_matrix[ + ::subsample_factor, (subsample_factor // 2) :: subsample_factor + ] + ) + gamma = 1.0 + if med > 0: + gamma = 1 / med + self.gamma = gamma + return _ImplicitGramMatrix(square_distance_matrix, gamma) + + def fit(self, signal: np.typing.ArrayLike) -> "CostRbf": """Sets parameters of the instance. Args: @@ -48,10 +196,11 @@ def fit(self, signal) -> "CostRbf": Returns: self """ - if signal.ndim == 1: - self.signal = signal.reshape(-1, 1) + signal_ndarray: np.typing.NDArray[np.number] = np.asarray(signal) + if signal_ndarray.ndim == 1: + self.signal = signal_ndarray.reshape(-1, 1) else: - self.signal = signal + self.signal = signal_ndarray # If gamma is none, set it using the median heuristic. # This heuristic involves computing the gram matrix which is lazy loaded @@ -61,7 +210,7 @@ def fit(self, signal) -> "CostRbf": return self - def error(self, start, end) -> float: + def error(self, start: int, end: int) -> float: """Return the approximation cost on the segment [start:end]. Args: diff --git a/tests/test_costs.py b/tests/test_costs.py index 6a680d66..9886e78f 100644 --- a/tests/test_costs.py +++ b/tests/test_costs.py @@ -1,7 +1,8 @@ import numpy as np +import scipy import pytest from ruptures import Binseg -from ruptures.costs import CostLinear, CostNormal, cost_factory +from ruptures.costs import CostLinear, CostNormal, CostRbf, cost_factory from ruptures.costs.costml import CostMl from ruptures.datasets import pw_constant from ruptures.exceptions import NotEnoughPoints @@ -237,3 +238,51 @@ def test_costl2_small_data(): "got {computed_break_dict}.", ) assert expected_break_dict == computed_break_dict, err_msg + + +@pytest.mark.parametrize("explicit", (True, False)) +def test_costrbf_explicit_implicit(explicit): + """Test if the cost RBF use a valid kernel.""" + + rng = np.random.default_rng(seed=123567890) + + data = rng.normal(size=(2048, 3)) + examinated = slice(512, 1024) + gamma = 1.5 + cost = CostRbf(quadratic_precompute=explicit, gamma=gamma) + cost.fit(data) + + dist_sq = scipy.spatial.distance.squareform( + scipy.spatial.distance.pdist(data[examinated], "sqeuclidean") + ) + gram = np.exp(-gamma * dist_sq) + assert np.allclose(gram, cost.gram[examinated, examinated]) + + +@pytest.mark.parametrize( + "n_samples_explicit", + [ + (n_samples, explicit) + for n_samples in (5, 100, 2_000, 50_000, 1_000_000, 20_000_000) + for explicit in (True, False) + if explicit is False or n_samples < 1000 + ], +) +def test_cost_rbf_gamma_heuristic(n_samples_explicit): + """Test if the heuristic formula works as expected for computing median in + cost RBF.""" + + n_samples, explicit = n_samples_explicit + cost = CostRbf(quadratic_precompute=explicit) + cost.fit(np.array(np.arange(n_samples), dtype=np.float64)[:, None]) + dist_median_from_heuristic = cost.gamma ** (-0.5) + dist_median_from_asympto_formula = (1 - 0.5**0.5) * n_samples + + # two source of errors: + # - absolute error of 1 is allowed (median of integers) + # - relative error of 1e-3 is allowed + absolute_error = np.abs( + dist_median_from_heuristic - dist_median_from_asympto_formula + ) + relative_error = absolute_error / dist_median_from_asympto_formula + assert absolute_error <= 1 or relative_error <= 1e-3