From 5c0190ec15038d505f0974d0f853a77c5e0d7716 Mon Sep 17 00:00:00 2001 From: femtotrader Date: Sun, 7 Jul 2024 21:57:48 +0200 Subject: [PATCH 1/4] Implement LSMA with Numpy --- talipp/indicators/LSMA.py | 96 +++++++++++++++++++++++++++++++++++ talipp/indicators/__init__.py | 2 + talipp/input.py | 29 +++++++++-- test/TalippTest.py | 4 ++ test/test_LSMA.py | 51 +++++++++++++++++++ 5 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 talipp/indicators/LSMA.py create mode 100644 test/test_LSMA.py diff --git a/talipp/indicators/LSMA.py b/talipp/indicators/LSMA.py new file mode 100644 index 00000000..96e154ff --- /dev/null +++ b/talipp/indicators/LSMA.py @@ -0,0 +1,96 @@ +from dataclasses import dataclass +from typing import List, Any +import numpy as np + +from talipp.indicator_util import has_valid_values +from talipp.indicators.Indicator import Indicator, InputModifierType +from talipp.input import SamplingPeriodType, TimedValue, TimedValueExtractor + + +@dataclass +class LSMAVal: + """`LSMA` output type. + + Args: + slope: Slope of the least squares moving average regression. + intercep: Intercept of the least squares moving average regression. + pred: Predicted value. + """ + + slope: float = None + intercept: float = None + pred: float = None + + +class LSMA(Indicator): + """Least Squares Moving Average. + + Input type: `float` + + Output type: [LSMAVal][talipp.indicators.LSMA.LSMAVal] + + Args: + period: Period. + input_values: List of input values. + input_indicator: Input indicator. + input_modifier: Input modifier. + input_sampling: Input sampling type. + """ + + def __init__( + self, + period: int, + input_values: List[TimedValue] = None, + input_value_extractor=TimedValueExtractor, + input_indicator: Indicator = None, + input_modifier: InputModifierType = None, + input_sampling: SamplingPeriodType = None, + ): + super().__init__( + input_modifier=input_modifier, + output_value_type=LSMAVal, + input_sampling=input_sampling, + ) + + self.period = period + #self.v_get_timestamp = np.vectorize(input_value_extractor.get_timestamp) + self.v_get_value = np.vectorize(input_value_extractor.get_value) + + self.times = np.arange( + start=1.0, stop=self.period + 1.0, step=1.0, dtype=np.float64 + ) + + self.initialize(input_values, input_indicator) + + def _calculate_new_value(self) -> Any: + if not has_valid_values(self.input_values, self.period): + return None + + a_input_values = np.array( + list( + filter( + lambda v: isinstance(v, TimedValue), + self.input_values[-self.period :], + ) + ) + ) + if len(a_input_values) == 0: + return None + + # times = self.v_get_timestamp(a_input_values) / (3600 * 24) + # times = times - times[0] + # print(times) + times = self.times[: len(a_input_values)] + values = self.v_get_value(a_input_values) + + A = np.vstack([times, np.ones(len(times))]).T + y = values[:, np.newaxis] + pinv = np.linalg.pinv(A) + alpha = pinv.dot(y) + + slope = alpha[0][0] + intercept = alpha[1][0] + + pred = slope * self.period + intercept + + return LSMAVal(slope, intercept, pred) diff --git a/talipp/indicators/__init__.py b/talipp/indicators/__init__.py index e10ffd29..80d11f23 100644 --- a/talipp/indicators/__init__.py +++ b/talipp/indicators/__init__.py @@ -25,6 +25,7 @@ from .KeltnerChannels import KeltnerChannels as KeltnerChannels from .KST import KST as KST from .KVO import KVO as KVO +from .LSMA import LSMA as LSMA from .MACD import MACD as MACD from .MassIndex import MassIndex as MassIndex from .McGinleyDynamic import McGinleyDynamic as McGinleyDynamic @@ -85,6 +86,7 @@ "KeltnerChannels", "KST", "KVO", + "LSMA", "MACD", "MassIndex", "McGinleyDynamic", diff --git a/talipp/input.py b/talipp/input.py index 47859f26..d353b39f 100644 --- a/talipp/input.py +++ b/talipp/input.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta from enum import Enum, auto +from dataclasses import dataclass from talipp.ohlcv import OHLCV @@ -83,8 +84,8 @@ class SamplingPeriodType(Enum): DAY_1 = (TimeUnitType.DAY, 1) """1 day""" - - + + class Sampler: """Implementation of timeframe auto-sampling. @@ -156,8 +157,28 @@ def _normalize(self, dt: datetime): period_start = period_start.replace(tzinfo=dt.tzinfo) delta = dt - period_start - num_periods = delta.total_seconds() // (period_length * Sampler.CONVERSION_TO_SEC[period_type]) + num_periods = delta.total_seconds() // ( + period_length * Sampler.CONVERSION_TO_SEC[period_type] + ) - normalized_dt = period_start + timedelta(seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type]) + normalized_dt = period_start + timedelta( + seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type] + ) return normalized_dt + + +@dataclass +class TimedValue: + time: datetime + value: float + + +class TimedValueExtractor: + @staticmethod + def get_timestamp(tv: TimedValue): + return tv.time.timestamp() + + @staticmethod + def get_value(tv: TimedValue): + return tv.value diff --git a/test/TalippTest.py b/test/TalippTest.py index 0e45be2b..91e7377b 100644 --- a/test/TalippTest.py +++ b/test/TalippTest.py @@ -1,8 +1,10 @@ import unittest from typing import List +from datetime import datetime, timedelta from talipp.indicators.Indicator import Indicator from talipp.ohlcv import OHLCV, OHLCVFactory +from talipp.input import TimedValue class TalippTest(unittest.TestCase): @@ -18,6 +20,8 @@ class TalippTest(unittest.TestCase): CLOSE_EQUAL_VALUES_TMPL: List[float] = [10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46, 10.46] + TIMED_CLOSE_TMPL: List[TimedValue] = [TimedValue(datetime(2024, 7, 7) + timedelta(days=i), close) for (i, close) in enumerate(CLOSE_TMPL)] + def assertIndicatorUpdate(self, indicator: Indicator, iterations_no: int = 20): last_indicator_value = indicator[-1] last_input_value = indicator.input_values[-1] diff --git a/test/test_LSMA.py b/test/test_LSMA.py new file mode 100644 index 00000000..04164757 --- /dev/null +++ b/test/test_LSMA.py @@ -0,0 +1,51 @@ +import unittest + +from talipp.indicators import LSMA + +from TalippTest import TalippTest + + +class TestLSMA(TalippTest): + def setUp(self) -> None: + self.input_values = TalippTest.TIMED_CLOSE_TMPL + + def test_init_with_period_2(self): + ind = LSMA(2, self.input_values) + self.assertAlmostEqual(ind[-3].slope, 0.29, places=5) + self.assertAlmostEqual(ind[-3].intercept, 10.01, places=5) + self.assertAlmostEqual(ind[-3].pred, 10.59, places=5) + + self.assertAlmostEqual(ind[-2].slope, -0.36, places=5) + self.assertAlmostEqual(ind[-2].intercept, 10.95, places=5) + self.assertAlmostEqual(ind[-2].pred, 10.23, places=5) + + self.assertAlmostEqual(ind[-1].slope, -0.23, places=5) + self.assertAlmostEqual(ind[-1].intercept, 10.46, places=5) + self.assertAlmostEqual(ind[-1].pred, 10.0, places=5) + + def test_init_with_period_5(self): + ind = LSMA(5, self.input_values) + self.assertAlmostEqual(ind[-3].slope, 0.529, places=5) + self.assertAlmostEqual(ind[-3].intercept, 8.161, places=5) + self.assertAlmostEqual(ind[-3].pred, 10.806, places=5) + + self.assertAlmostEqual(ind[-2].slope, 0.248, places=5) + self.assertAlmostEqual(ind[-2].intercept, 9.352, places=5) + self.assertAlmostEqual(ind[-2].pred, 10.592, places=5) + + self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) + self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) + self.assertAlmostEqual(ind[-1].pred, 10.180, places=5) + + def test_update(self): + self.assertIndicatorUpdate(LSMA(5, self.input_values)) + + def test_delete(self): + self.assertIndicatorDelete(LSMA(5, self.input_values)) + + def test_purge_oldest(self): + self.assertIndicatorPurgeOldest(LSMA(5, self.input_values)) + + +if __name__ == "__main__": + unittest.main() From f567ecf818965397240c8a875163a02293c7d5c5 Mon Sep 17 00:00:00 2001 From: femtotrader Date: Mon, 8 Jul 2024 12:19:56 +0200 Subject: [PATCH 2/4] Add numpy to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index e69de29b..296d6545 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +numpy \ No newline at end of file From 3b97bc090012a3e09b850cb2a9d1e0b538077044 Mon Sep 17 00:00:00 2001 From: femtotrader Date: Mon, 8 Jul 2024 12:47:58 +0200 Subject: [PATCH 3/4] Fix for irregularly time spaced inputs without altering tests --- talipp/indicators/LSMA.py | 20 ++++++++++++-------- test/test_LSMA.py | 6 ++++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/talipp/indicators/LSMA.py b/talipp/indicators/LSMA.py index 96e154ff..1ebce5e0 100644 --- a/talipp/indicators/LSMA.py +++ b/talipp/indicators/LSMA.py @@ -53,12 +53,12 @@ def __init__( ) self.period = period - #self.v_get_timestamp = np.vectorize(input_value_extractor.get_timestamp) + self.v_get_timestamp = np.vectorize(input_value_extractor.get_timestamp) self.v_get_value = np.vectorize(input_value_extractor.get_value) - self.times = np.arange( - start=1.0, stop=self.period + 1.0, step=1.0, dtype=np.float64 - ) + #self.times = np.arange( + # start=0.0, stop=self.period, step=1.0, dtype=np.float64 + #) self.initialize(input_values, input_indicator) @@ -77,10 +77,14 @@ def _calculate_new_value(self) -> Any: if len(a_input_values) == 0: return None - # times = self.v_get_timestamp(a_input_values) / (3600 * 24) - # times = times - times[0] - # print(times) - times = self.times[: len(a_input_values)] + times = self.v_get_timestamp(a_input_values) + delta_t = (times[-1] - times[0]) / (self.period - 1) + times = times - times[0] + times = 1.0 + times / delta_t + + if np.count_nonzero(np.isnan(times)) == len(a_input_values): + return None + values = self.v_get_value(a_input_values) A = np.vstack([times, np.ones(len(times))]).T diff --git a/test/test_LSMA.py b/test/test_LSMA.py index 04164757..70f27df6 100644 --- a/test/test_LSMA.py +++ b/test/test_LSMA.py @@ -11,6 +11,7 @@ def setUp(self) -> None: def test_init_with_period_2(self): ind = LSMA(2, self.input_values) + self.assertAlmostEqual(ind[-3].slope, 0.29, places=5) self.assertAlmostEqual(ind[-3].intercept, 10.01, places=5) self.assertAlmostEqual(ind[-3].pred, 10.59, places=5) @@ -25,6 +26,7 @@ def test_init_with_period_2(self): def test_init_with_period_5(self): ind = LSMA(5, self.input_values) + self.assertAlmostEqual(ind[-3].slope, 0.529, places=5) self.assertAlmostEqual(ind[-3].intercept, 8.161, places=5) self.assertAlmostEqual(ind[-3].pred, 10.806, places=5) @@ -33,8 +35,8 @@ def test_init_with_period_5(self): self.assertAlmostEqual(ind[-2].intercept, 9.352, places=5) self.assertAlmostEqual(ind[-2].pred, 10.592, places=5) - self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) - self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) + #self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) + #self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) self.assertAlmostEqual(ind[-1].pred, 10.180, places=5) def test_update(self): From 2017a607db6493805a19ab2caa8d2d46639355c5 Mon Sep 17 00:00:00 2001 From: femtotrader Date: Mon, 8 Jul 2024 12:48:50 +0200 Subject: [PATCH 4/4] uncomment some asserts - have a warning in test_delete --- test/test_LSMA.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_LSMA.py b/test/test_LSMA.py index 70f27df6..00f8438b 100644 --- a/test/test_LSMA.py +++ b/test/test_LSMA.py @@ -35,8 +35,8 @@ def test_init_with_period_5(self): self.assertAlmostEqual(ind[-2].intercept, 9.352, places=5) self.assertAlmostEqual(ind[-2].pred, 10.592, places=5) - #self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) - #self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) + self.assertAlmostEqual(ind[-1].slope, -0.037, places=5) + self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5) self.assertAlmostEqual(ind[-1].pred, 10.180, places=5) def test_update(self):