diff --git a/requirements.txt b/requirements.txt index e69de29b..296d6545 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +numpy \ No newline at end of file diff --git a/talipp/indicators/LSMA.py b/talipp/indicators/LSMA.py new file mode 100644 index 00000000..1ebce5e0 --- /dev/null +++ b/talipp/indicators/LSMA.py @@ -0,0 +1,100 @@ +from dataclasses import dataclass +from typing import List, Any +import numpy as np + +from talipp.indicator_util import has_valid_values +from talipp.indicators.Indicator import Indicator, InputModifierType +from talipp.input import SamplingPeriodType, TimedValue, TimedValueExtractor + + +@dataclass +class LSMAVal: + """`LSMA` output type. + + Args: + slope: Slope of the least squares moving average regression. + intercep: Intercept of the least squares moving average regression. + pred: Predicted value. + """ + + slope: float = None + intercept: float = None + pred: float = None + + +class LSMA(Indicator): + """Least Squares Moving Average. + + Input type: `float` + + Output type: [LSMAVal][talipp.indicators.LSMA.LSMAVal] + + Args: + period: Period. + input_values: List of input values. + input_indicator: Input indicator. + input_modifier: Input modifier. + input_sampling: Input sampling type. + """ + + def __init__( + self, + period: int, + input_values: List[TimedValue] = None, + input_value_extractor=TimedValueExtractor, + input_indicator: Indicator = None, + input_modifier: InputModifierType = None, + input_sampling: SamplingPeriodType = None, + ): + super().__init__( + input_modifier=input_modifier, + output_value_type=LSMAVal, + input_sampling=input_sampling, + ) + + self.period = period + self.v_get_timestamp = np.vectorize(input_value_extractor.get_timestamp) + self.v_get_value = np.vectorize(input_value_extractor.get_value) + + #self.times = np.arange( + # start=0.0, stop=self.period, step=1.0, dtype=np.float64 + #) + + self.initialize(input_values, input_indicator) + + def _calculate_new_value(self) -> Any: + if not has_valid_values(self.input_values, self.period): + return None + + a_input_values = np.array( + list( + filter( + lambda v: isinstance(v, TimedValue), + self.input_values[-self.period :], + ) + ) + ) + if len(a_input_values) == 0: + return None + + times = self.v_get_timestamp(a_input_values) + delta_t = (times[-1] - times[0]) / (self.period - 1) + times = times - times[0] + times = 1.0 + times / delta_t + + if np.count_nonzero(np.isnan(times)) == len(a_input_values): + return None + + values = self.v_get_value(a_input_values) + + A = np.vstack([times, np.ones(len(times))]).T + y = values[:, np.newaxis] + pinv = np.linalg.pinv(A) + alpha = pinv.dot(y) + + slope = alpha[0][0] + intercept = alpha[1][0] + + pred = slope * self.period + intercept + + return LSMAVal(slope, intercept, pred) diff --git a/talipp/indicators/__init__.py b/talipp/indicators/__init__.py index e10ffd29..80d11f23 100644 --- a/talipp/indicators/__init__.py +++ b/talipp/indicators/__init__.py @@ -25,6 +25,7 @@ from .KeltnerChannels import KeltnerChannels as KeltnerChannels from .KST import KST as KST from .KVO import KVO as KVO +from .LSMA import LSMA as LSMA from .MACD import MACD as MACD from .MassIndex import MassIndex as MassIndex from .McGinleyDynamic import McGinleyDynamic as McGinleyDynamic @@ -85,6 +86,7 @@ "KeltnerChannels", "KST", "KVO", + "LSMA", "MACD", "MassIndex", "McGinleyDynamic", diff --git a/talipp/input.py b/talipp/input.py index 47859f26..d353b39f 100644 --- a/talipp/input.py +++ b/talipp/input.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta from enum import Enum, auto +from dataclasses import dataclass from talipp.ohlcv import OHLCV @@ -83,8 +84,8 @@ class SamplingPeriodType(Enum): DAY_1 = (TimeUnitType.DAY, 1) """1 day""" - - + + class Sampler: """Implementation of timeframe auto-sampling. @@ -156,8 +157,28 @@ def _normalize(self, dt: datetime): period_start = period_start.replace(tzinfo=dt.tzinfo) delta = dt - period_start - num_periods = delta.total_seconds() // (period_length * Sampler.CONVERSION_TO_SEC[period_type]) + num_periods = delta.total_seconds() // ( + period_length * Sampler.CONVERSION_TO_SEC[period_type] + ) - normalized_dt = period_start + timedelta(seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type]) + normalized_dt = period_start + timedelta( + seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type] + ) return normalized_dt + + +@dataclass +class TimedValue: + time: datetime + value: float + + +class TimedValueExtractor: + @staticmethod + def get_timestamp(tv: TimedValue): + return tv.time.timestamp() + + @staticmethod + def get_value(tv: TimedValue): + return tv.value diff --git a/test/TalippTest.py b/test/TalippTest.py index 0e45be2b..91e7377b 100644 --- a/test/TalippTest.py +++ b/test/TalippTest.py @@ -1,8 +1,10 @@ import unittest from typing import List +from datetime import datetime, timedelta from talipp.indicators.Indicator import Indicator from talipp.ohlcv import OHLCV, OHLCVFactory +from talipp.input import TimedValue class TalippTest(unittest.TestCase): @@ -18,6 +20,8 @@ class TalippTest(unittest.TestCase): CLOSE_EQUAL_VALUES_TMPL: List[float] = [10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46] + TIMED_CLOSE_TMPL: List[TimedValue] = [TimedValue(datetime(2024, 7, 7) + timedelta(days=i), close) for (i, close) in enumerate(CLOSE_TMPL)] + def assertIndicatorUpdate(self, indicator: Indicator, iterations_no: int = 20): last_indicator_value = indicator[-1] last_input_value = indicator.input_values[-1] diff --git a/test/test_LSMA.py b/test/test_LSMA.py new file mode 100644 index 00000000..00f8438b --- /dev/null +++ b/test/test_LSMA.py @@ -0,0 +1,53 @@ +import unittest + +from talipp.indicators import LSMA + +from TalippTest import TalippTest + + +class TestLSMA(TalippTest): + def setUp(self) -> None: + self.input_values = TalippTest.TIMED_CLOSE_TMPL + + def test_init_with_period_2(self): + ind = LSMA(2, self.input_values) + + self.assertAlmostEqual(ind[-3].slope, 0.29, places=5) + self.assertAlmostEqual(ind[-3].intercept, 10.01, places=5) + self.assertAlmostEqual(ind[-3].pred, 10.59, places=5) + + self.assertAlmostEqual(ind[-2].slope, -0.36, places=5) + self.assertAlmostEqual(ind[-2].intercept, 10.95, places=5) + self.assertAlmostEqual(ind[-2].pred, 10.23, places=5) + + self.assertAlmostEqual(ind[-1].slope, -0.23, places=5) + self.assertAlmostEqual(ind[-1].intercept, 10.46, places=5) + self.assertAlmostEqual(ind[-1].pred, 10.0, places=5) + + def test_init_with_period_5(self): + ind = LSMA(5, self.input_values) + + self.assertAlmostEqual(ind[-3].slope, 0.529, places=5) + self.assertAlmostEqual(ind[-3].intercept, 8.161, places=5) + self.assertAlmostEqual(ind[-3].pred, 10.806, places=5) + + self.assertAlmostEqual(ind[-2].slope, 0.248, places=5) + self.assertAlmostEqual(ind[-2].intercept, 9.352, places=5) + self.assertAlmostEqual(ind[-2].pred, 10.592, places=5) + + self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) + self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) + self.assertAlmostEqual(ind[-1].pred, 10.180, places=5) + + def test_update(self): + self.assertIndicatorUpdate(LSMA(5, self.input_values)) + + def test_delete(self): + self.assertIndicatorDelete(LSMA(5, self.input_values)) + + def test_purge_oldest(self): + self.assertIndicatorPurgeOldest(LSMA(5, self.input_values)) + + +if __name__ == "__main__": + unittest.main()