diff --git a/environment.yml b/environment.yml index 4bb5e81ac..c748fa0f5 100644 --- a/environment.yml +++ b/environment.yml @@ -27,4 +27,6 @@ dependencies: - jupyterlab-myst>=2.0.0 - myst-nb>=1.0.0 - polars>=1.14.0 - - numba-cuda>=0.24.0 + - fftw>=3.3 + - pyfftw>=0.15.0 + - numba-cuda>=0.24.0 \ No newline at end of file diff --git a/stumpy/core.py b/stumpy/core.py index 5ee2a709d..1b3d99c21 100644 --- a/stumpy/core.py +++ b/stumpy/core.py @@ -12,10 +12,9 @@ from numba import cuda, njit, prange from scipy import linalg from scipy.ndimage import maximum_filter1d, minimum_filter1d -from scipy.signal import convolve from scipy.spatial.distance import cdist -from . import config +from . import config, sdp try: from numba.cuda.cudadrv.driver import _raise_driver_not_found @@ -652,7 +651,8 @@ def check_window_size(m, max_size=None, n=None): @njit(fastmath=config.STUMPY_FASTMATH_TRUE) def _sliding_dot_product(Q, T): """ - A Numba JIT-compiled implementation of the sliding window dot product. + A wrapper for numba JIT-compiled implementation of + the sliding dot product. Parameters ---------- @@ -667,18 +667,12 @@ def _sliding_dot_product(Q, T): out : numpy.ndarray Sliding dot product between `Q` and `T`. """ - m = Q.shape[0] - l = T.shape[0] - m + 1 - out = np.empty(l) - for i in range(l): - out[i] = np.dot(Q, T[i : i + m]) - - return out + return sdp._njit_sliding_dot_product(Q, T) def sliding_dot_product(Q, T): """ - Use FFT convolution to calculate the sliding window dot product. + A wrapper for convolution implementation of the sliding dot product. Parameters ---------- @@ -692,27 +686,8 @@ def sliding_dot_product(Q, T): ------- output : numpy.ndarray Sliding dot product between `Q` and `T`. - - Notes - ----- - Calculate the sliding dot product - - `DOI: 10.1109/ICDM.2016.0179 \ - `__ - - See Table I, Figure 4 - - Following the inverse FFT, Fig. 4 states that only cells [m-1:n] - contain valid dot products - - Padding is done automatically in fftconvolve step """ - n = T.shape[0] - m = Q.shape[0] - Qr = np.flipud(Q) # Reverse/flip Q - QT = convolve(Qr, T) - - return QT.real[m - 1 : n] + return sdp._convolve_sliding_dot_product(Q, T) @njit( diff --git a/stumpy/sdp.py b/stumpy/sdp.py new file mode 100644 index 000000000..404bfe0a3 --- /dev/null +++ b/stumpy/sdp.py @@ -0,0 +1,288 @@ +import numpy as np +from numba import njit +from scipy.fft import next_fast_len +from scipy.fft._pocketfft.basic import c2r, r2c +from scipy.signal import convolve + +from . import config + +try: + import pyfftw + + FFTW_IS_AVAILABLE = True +except ImportError: # pragma: no cover + FFTW_IS_AVAILABLE = False + + +@njit(fastmath=config.STUMPY_FASTMATH_TRUE) +def _njit_sliding_dot_product(Q, T): + """ + A Numba JIT-compiled implementation of the sliding dot product. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence + + T : numpy.ndarray + Time series or sequence + + Returns + ------- + out : numpy.ndarray + Sliding dot product between `Q` and `T`. + """ + m = len(Q) + l = T.shape[0] - m + 1 + out = np.empty(l) + for i in range(l): + result = 0.0 + for j in range(m): + result += Q[j] * T[i + j] + out[i] = result + + return out + + +def _convolve_sliding_dot_product(Q, T): + """ + Use FFT or direct convolution to calculate the sliding dot product. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence + + T : numpy.ndarray + Time series or sequence + + Returns + ------- + output : numpy.ndarray + Sliding dot product between `Q` and `T`. + + Notes + ----- + Calculate the sliding dot product + + `DOI: 10.1109/ICDM.2016.0179 \ + `__ + + See Table I, Figure 4 + """ + # mode='valid' returns output of convolution where the two + # sequences fully overlap. + + return convolve(np.flipud(Q), T, mode="valid") + + +def _pocketfft_sliding_dot_product(Q, T): + """ + Use scipy.fft._pocketfft to compute + the sliding dot product. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence + + T : numpy.ndarray + Time series or sequence + + Returns + ------- + output : numpy.ndarray + Sliding dot product between `Q` and `T`. + """ + n = len(T) + m = len(Q) + next_fast_n = next_fast_len(n, real=True) + + tmp = np.empty((2, next_fast_n)) + tmp[0, :m] = Q[::-1] + tmp[0, m:] = 0.0 + tmp[1, :n] = T + tmp[1, n:] = 0.0 + fft_2d = r2c(True, tmp, axis=-1) + + return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n] + + +class _PYFFTW_SLIDING_DOT_PRODUCT: + """ + A class to compute the sliding dot product using FFTW via pyfftw. + + This class uses FFTW (via pyfftw) to efficiently compute the sliding dot product + between a query sequence Q and a time series T. It preallocates arrays and caches + FFTW objects to optimize repeated computations with similar-sized inputs. + + Parameters + ---------- + max_n : int, default=2**20 + Maximum length to preallocate arrays for. This will be the size of the + real-valued array. A complex-valued array of size `1 + (max_n // 2)` + will also be preallocated. If inputs exceed this size, arrays will be + reallocated to accommodate larger sizes. + + Attributes + ---------- + real_arr : pyfftw.empty_aligned + Preallocated real-valued array for FFTW computations. + + complex_arr : pyfftw.empty_aligned + Preallocated complex-valued array for FFTW computations. + + rfft_objects : dict + Cache of FFTW forward transform objects, keyed by + (next_fast_n, n_threads, planning_flag). + + irfft_objects : dict + Cache of FFTW inverse transform objects, keyed by + (next_fast_n, n_threads, planning_flag). + + Notes + ----- + The class maintains internal caches of FFTW objects to avoid redundant planning + operations when called multiple times with similar-sized inputs and parameters. + + Examples + -------- + >>> sdp_obj = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=1000) + >>> Q = np.array([1, 2, 3]) + >>> T = np.array([4, 5, 6, 7, 8]) + >>> result = sdp_obj(Q, T) + + References + ---------- + `FFTW documentation `__ + + `pyfftw documentation `__ + """ + + def __init__(self, max_n=2**20): + """ + Initialize the `_PYFFTW_SLIDING_DOT_PRODUCT` object, which can be called + to compute the sliding dot product using FFTW via pyfftw. + + Parameters + ---------- + max_n : int, default=2**20 + Maximum length to preallocate arrays for. This will be the size of the + real-valued array. A complex-valued array of size `1 + (max_n // 2)` + will also be preallocated. + """ + # Preallocate arrays + self.real_arr = pyfftw.empty_aligned(max_n, dtype="float64") + self.complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype="complex128") + + # Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag) + self.rfft_objects = {} + self.irfft_objects = {} + + def __call__(self, Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"): + """ + Compute the sliding dot product between `Q` and `T` using FFTW via pyfftw, + and cache FFTW objects if not already cached. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence. + + T : numpy.ndarray + Time series or sequence. + + n_threads : int, default=1 + Number of threads to use for FFTW computations. + + planning_flag : str, default="FFTW_ESTIMATE" + The planning flag that will be used in FFTW for planning. + See pyfftw documentation for details. Current options, ordered + ascendingly by the level of aggressiveness in planning, are: + "FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE". + The more aggressive the planning, the longer the planning time, but + the faster the execution time. + + Returns + ------- + out : numpy.ndarray + Sliding dot product between `Q` and `T`. + + Notes + ----- + The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with + MATLAB's FFTW usage (as of version R2025b) + See: https://www.mathworks.com/help/matlab/ref/fftw.html + + This implementation is inspired by the answer on StackOverflow: + https://stackoverflow.com/a/30615425/2955541 + """ + m = Q.shape[0] + n = T.shape[0] + next_fast_n = pyfftw.next_fast_len(n) + + # Update preallocated arrays if needed + if next_fast_n > len(self.real_arr): + self.real_arr = pyfftw.empty_aligned(next_fast_n, dtype="float64") + self.complex_arr = pyfftw.empty_aligned( + 1 + (next_fast_n // 2), dtype="complex128" + ) + + real_arr = self.real_arr[:next_fast_n] + complex_arr = self.complex_arr[: 1 + (next_fast_n // 2)] + + # Get or create FFTW objects + key = (next_fast_n, n_threads, planning_flag) + + rfft_obj = self.rfft_objects.get(key, None) + if rfft_obj is None: + rfft_obj = pyfftw.FFTW( + input_array=real_arr, + output_array=complex_arr, + direction="FFTW_FORWARD", + flags=(planning_flag,), + threads=n_threads, + ) + self.rfft_objects[key] = rfft_obj + else: + rfft_obj.update_arrays(real_arr, complex_arr) + + irfft_obj = self.irfft_objects.get(key, None) + if irfft_obj is None: + irfft_obj = pyfftw.FFTW( + input_array=complex_arr, + output_array=real_arr, + direction="FFTW_BACKWARD", + flags=(planning_flag, "FFTW_DESTROY_INPUT"), + threads=n_threads, + ) + self.irfft_objects[key] = irfft_obj + else: + irfft_obj.update_arrays(complex_arr, real_arr) + + # RFFT(T) + real_arr[:n] = T + real_arr[n:] = 0.0 + rfft_obj.execute() # output is in complex_arr + complex_arr_T = complex_arr.copy() + + # RFFT(Q) + # Scale by 1/next_fast_n to account for + # FFTW's unnormalized inverse FFT via execute() + np.multiply(Q[::-1], 1.0 / next_fast_n, out=real_arr[:m]) + real_arr[m:] = 0.0 + rfft_obj.execute() # output is in complex_arr + + # RFFT(T) * RFFT(Q) + np.multiply(complex_arr, complex_arr_T, out=complex_arr) + + # IRFFT (input is in complex_arr) + irfft_obj.execute() # output is in real_arr + + return real_arr[m - 1 : n] + + +if FFTW_IS_AVAILABLE: + _pyfftw_sliding_dot_product = _PYFFTW_SLIDING_DOT_PRODUCT() +else: # pragma: no cover + _pyfftw_sliding_dot_product = None diff --git a/tests/test_sdp.py b/tests/test_sdp.py new file mode 100644 index 000000000..eb2c3b0cf --- /dev/null +++ b/tests/test_sdp.py @@ -0,0 +1,250 @@ +import inspect +import warnings +from operator import eq, lt + +import numpy as np +import pytest +from numpy import testing as npt +from scipy.fft import next_fast_len + +from stumpy import sdp + +# README +# Real FFT algorithm performs more efficiently when the length +# of the input array `arr` is composed of small prime factors. +# The next_fast_len(arr, real=True) function from Scipy returns +# the same length if len(arr) is composed of a subset of +# prime numbers 2, 3, 5. Therefore, these radices are +# considered as the most efficient for the real FFT algorithm. + +# To ensure that the tests cover different cases, the following cases +# are considered: +# 1. len(T) is even, and len(T) == next_fast_len(len(T), real=True) +# 2. len(T) is odd, and len(T) == next_fast_len(len(T), real=True) +# 3. len(T) is even, and len(T) < next_fast_len(len(T), real=True) +# 4. len(T) is odd, and len(T) < next_fast_len(len(T), real=True) +# And 5. a special case of 1, where len(T) is power of 2. + +# Therefore: +# 1. len(T) is composed of 2 and a subset of {3, 5} +# 2. len(T) is composed of a subset of {3, 5} +# 3. len(T) is composed of a subset of {7, 11, 13, ...} and 2 +# 4. len(T) is composed of a subset of {7, 11, 13, ...} +# 5. len(T) is power of 2 + +# In some cases, the prime factors are raised to a power of +# certain degree to increase the length of array to be around +# 1000-2000. This allows us to test sliding_dot_product for +# wider range of query lengths. + +test_inputs = [ + # Input format: + # ( + # len(T), + # remainder, # from `len(T) % 2` + # comparator, # for len(T) comparator next_fast_len(len(T), real=True) + # ) + ( + 2 * (3**2) * (5**3), + 0, + eq, + ), # = 2250, Even `len(T)`, and `len(T) == next_fast_len(len(T), real=True)` + ( + (3**2) * (5**3), + 1, + eq, + ), # = 1125, Odd `len(T)`, and `len(T) == next_fast_len(len(T), real=True)`. + ( + 2 * 7 * 11 * 13, + 0, + lt, + ), # = 2002, Even `len(T)`, and `len(T) < next_fast_len(len(T), real=True)` + ( + 7 * 11 * 13, + 1, + lt, + ), # = 1001, Odd `len(T)`, and `len(T) < next_fast_len(len(T), real=True)` +] + + +def naive_sliding_dot_product(Q, T): + m = len(Q) + l = T.shape[0] - m + 1 + out = np.empty(l) + for i in range(l): + out[i] = np.dot(Q, T[i : i + m]) + return out + + +def get_sdp_functions(): + out = [] + for func_name, func in inspect.getmembers(sdp, inspect.isfunction): + if func_name.endswith("sliding_dot_product"): + out.append((func_name, func)) + + return out + + +@pytest.mark.parametrize("n_T, remainder, comparator", test_inputs) +def test_remainder(n_T, remainder, comparator): + assert n_T % 2 == remainder + + +@pytest.mark.parametrize("n_T, remainder, comparator", test_inputs) +def test_comparator(n_T, remainder, comparator): + shape = next_fast_len(n_T, real=True) + assert comparator(n_T, shape) + + +@pytest.mark.parametrize("n_T, remainder, comparator", test_inputs) +def test_sdp(n_T, remainder, comparator): + # test_sdp for cases 1-4 + + n_Q_prime = [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + ] + n_Q_power2 = [2, 4, 8, 16, 32, 64] + n_Q_values = n_Q_prime + n_Q_power2 + [n_T] + n_Q_values = sorted(n_Q for n_Q in set(n_Q_values) if n_Q <= n_T) + + # utils.import_sdp_mods() + for n_Q in n_Q_values: + Q = np.random.rand(n_Q) + T = np.random.rand(n_T) + ref = naive_sliding_dot_product(Q, T) + for func_name, func in get_sdp_functions(): + try: + comp = func(Q, T) + npt.assert_allclose(comp, ref) + except Exception as e: # pragma: no cover + msg = f"Error in {func_name}, with n_Q={n_Q} and n_T={n_T}" + warnings.warn(msg) + raise e + + return + + +def test_sdp_power2(): + # test for case 5. len(T) is power of 2 + pmin = 3 + pmax = 13 + + for func_name, func in get_sdp_functions(): + try: + for q in range(pmin, pmax + 1): + n_Q = 2**q + for p in range(q, pmax + 1): + n_T = 2**p + Q = np.random.rand(n_Q) + T = np.random.rand(n_T) + + ref = naive_sliding_dot_product(Q, T) + comp = func(Q, T) + npt.assert_allclose(comp, ref) + + except Exception as e: # pragma: no cover + msg = f"Error in {func_name}, with q={q} and p={p}" + warnings.warn(msg) + raise e + + return + + +def test_pyfftw_sdp_max_n(): + # When `len(T)` larger than `max_n` in pyfftw_sdp, + # the internal preallocated arrays should be resized. + # This test checks that functionality. + sliding_dot_product = sdp._PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**10) + + # len(T) > max_n to trigger array resizing + T = np.random.rand(2**11) + Q = np.random.rand(2**8) + + comp = sliding_dot_product(Q, T) + ref = naive_sliding_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return + + +def test_pyfftw_sdp_cache(): + # To ensure that the caching mechanism in + # pyfftw_sdp is working as intended + sliding_dot_product = sdp._PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**10) + assert sliding_dot_product.rfft_objects == {} + assert sliding_dot_product.irfft_objects == {} + + T = np.random.rand(2**5) + Q = np.random.rand(2**2) + + n_threads = 1 + planning_flag = "FFTW_ESTIMATE" + sliding_dot_product(Q, T, n_threads=n_threads, planning_flag=planning_flag) + + # Check that the FFTW objects are cached + expected_key = (len(T), n_threads, planning_flag) + assert expected_key in sliding_dot_product.rfft_objects + assert expected_key in sliding_dot_product.irfft_objects + + return + + +def test_pyfftw_sdp_update_arrays(): + # To ensure that the cached FFTW objects + # can be reused when preallocated arrays + # are updated. + sliding_dot_product = sdp._PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**10) + + n_threads = 1 + planning_flag = "FFTW_ESTIMATE" + + T1 = np.random.rand(2**5) + Q1 = np.random.rand(2**2) + sliding_dot_product(Q1, T1, n_threads=n_threads, planning_flag=planning_flag) + + # len(T2) > max_n to trigger array resizing + T2 = np.random.rand(2**11) + Q2 = np.random.rand(2**3) + sliding_dot_product(Q2, T2, n_threads=n_threads, planning_flag=planning_flag) + + # Check if the FFTW objects cached for inputs (Q1, T1) + # can be reused when preallocated arrays are resized + # after calling with (Q2, T2) + key1 = (len(T1), n_threads, planning_flag) + rfft_obj_before = sliding_dot_product.rfft_objects[key1] + irfft_obj_before = sliding_dot_product.irfft_objects[key1] + + comp = sliding_dot_product(Q1, T1, n_threads=n_threads, planning_flag=planning_flag) + ref = naive_sliding_dot_product(Q1, T1) + + # test for correctness + np.testing.assert_allclose(comp, ref) + + # Check that the same FFTW objects are reused + assert sliding_dot_product.rfft_objects[key1] is rfft_obj_before + assert sliding_dot_product.irfft_objects[key1] is irfft_obj_before