From e7fdb04284a4bcebf58a82b0178503c38659c6fc Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 21:47:32 +0100 Subject: [PATCH 01/10] Move the LuxtronikFieldsDictionary into a separate file --- luxtronik/collections.py | 173 ++++++++++++++++++++++++++++++++++++++ luxtronik/data_vector.py | 167 +----------------------------------- luxtronik/shi/vector.py | 3 +- tests/test_data_vector.py | 3 +- 4 files changed, 179 insertions(+), 167 deletions(-) create mode 100644 luxtronik/collections.py diff --git a/luxtronik/collections.py b/luxtronik/collections.py new file mode 100644 index 0000000..f2f89e5 --- /dev/null +++ b/luxtronik/collections.py @@ -0,0 +1,173 @@ +"""Common used collection objects.""" + +import logging + +from luxtronik.datatypes import Base +from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary + + +LOGGER = logging.getLogger(__name__) + + +############################################################################### +# Field dictionary for data vectors +############################################################################### + +class LuxtronikFieldsDictionary: + """ + Dictionary that behaves like the earlier data vector dictionaries (index-field-dictionary), + with the addition that obsolete fields are also supported and can be addressed by name. + Aliases are also supported. + """ + + def __init__(self): + # There may be several names or alias that points to one definition. + # So in order to spare memory we split the name/index-to-field-lookup + # into a name/index-to-definition-lookup and a definition-to-field-lookup + self._def_lookup = LuxtronikDefinitionsDictionary() + self._field_lookup = {} + # Furthermore stores the definition-to-field-lookup separate from the + # field-definition pairs to keep the index-sorted order when adding new entries + self._items = [] # list of tuples, 0: definition, 1: field + + def __getitem__(self, def_field_name_or_idx): + return self.get(def_field_name_or_idx) + + def __setitem__(self, def_name_or_idx, value): + assert False, "__setitem__ not implemented." + + def __len__(self): + return len(self._def_lookup._index_dict) + + def __iter__(self): + """ + Iterate over all non-obsolete indices. If an index is assigned multiple times, + only the index of the preferred definition will be output. + """ + # _items is a list of tuples, 0: definition, 1: field + all_related_defs = self._def_lookup._index_dict.values() + return iter([d.index for d in self._items if d in all_related_defs]) + + def __contains__(self, def_field_name_or_idx): + """ + Check whether the data vector contains a name, index, + or definition matching an added field, or the field itself. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Definition object, field object, field name or register index. + + Returns: + True if the searched element was found, otherwise False. + """ + if isinstance(def_field_name_or_idx, Base): + return any(def_field_name_or_idx is field for field in self._field_lookup.values()) + elif isinstance(def_field_name_or_idx, LuxtronikDefinition): + # speed-up the look-up by search only the name-dict + return def_field_name_or_idx.name in self._def_lookup._name_dict + else: + return def_field_name_or_idx in self._def_lookup + + def values(self): + """ + Iterator for all added non-obsolete fields. If an index is assigned multiple times, + only the field of the preferred definition will be output. + """ + # _items is a list of tuples, 0: definition, 1: field + all_related_defs = self._def_lookup._index_dict.values() + return iter([f for d, f in self._items if d in all_related_defs]) + + def items(self): + """ + Iterator for all non-obsolete index-field-pairs (list of tuples with + 0: index, 1: field) contained herein. If an index is assigned multiple times, + only the index-field-pair of the preferred definition will be output. + """ + # _items is a list of tuples, 0: definition, 1: field + all_related_defs = self._def_lookup._index_dict.values() + return iter([(d.index, f) for d, f in self._items if d in all_related_defs]) + + @property + def def_items(self): + """ + Iterator for all definition-field-pairs (list of tuples with + 0: definition, 1: field) contained herein. + """ + return self._items + + @property + def def_dict(self): + """Return the internal definition dictionary""" + return self._def_lookup + + def add(self, definition, field, alias=None): + """ + Add a definition-field-pair to the internal dictionaries. + + Args: + definition (LuxtronikDefinition): Definition related to the field. + field (Base): Field to add. + alias (Hashable | None): Alias, which can be used to access the field again. + """ + if definition.valid: + self._def_lookup.add(definition, alias) + self._field_lookup[definition] = field + self._items.append((definition, field)) + + def add_sorted(self, definition, field, alias=None): + if definition.valid: + self.add(definition, field, alias) + # sort _items by definition.index + # _items is a list of tuples, 0: definition, 1: field + self._items.sort(key=lambda item: item[0].index) + + def register_alias(self, def_field_name_or_idx, alias): + """ + Add an alternative name (or anything hashable else) + that can be used to access a specific field. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field to which the alias is to be added. + Either by definition, name, register index, or the field itself. + alias (Hashable): Alias, which can be used to access the field again. + + Returns: + Base | None: The field to which the alias was added, + or None if not possible + """ + # Resolve a field input + def_name_or_idx = def_field_name_or_idx + if isinstance(def_name_or_idx, Base): + def_name_or_idx = def_name_or_idx.name + # register alias + definition = self._def_lookup.register_alias(def_name_or_idx, alias) + if definition is None: + return None + return self._field_lookup.get(definition, None) + + def get(self, def_field_name_or_idx, default=None): + """ + Retrieve a field by definition, name or register index. + + Args: + def_field_name_or_idx (LuxtronikDefinition | str | int): + Definition, name, or register index to be used to search for the field. + + Returns: + Base | None: The field found or the provided default if not found. + + Note: + If multiple fields added for the same index/name, + the last added takes precedence. + """ + def_name_or_idx = def_field_name_or_idx + if isinstance(def_name_or_idx, Base): + def_name_or_idx = def_name_or_idx.name + if isinstance(def_name_or_idx, LuxtronikDefinition): + definition = def_name_or_idx + else: + definition = self._def_lookup.get(def_name_or_idx) + if definition is not None: + return self._field_lookup.get(definition, default) + return default \ No newline at end of file diff --git a/luxtronik/data_vector.py b/luxtronik/data_vector.py index 269f5b0..0b3a22a 100644 --- a/luxtronik/data_vector.py +++ b/luxtronik/data_vector.py @@ -7,176 +7,13 @@ LUXTRONIK_NAME_CHECK_OBSOLETE, ) -from luxtronik.datatypes import Base -from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary +from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.definitions import LuxtronikDefinition LOGGER = logging.getLogger(__name__) -############################################################################### -# Field dictionary for data vectors -############################################################################### - -class LuxtronikFieldsDictionary: - """ - Dictionary that behaves like the earlier data vector dictionaries (index-field-dictionary), - with the addition that obsolete fields are also supported and can be addressed by name. - Aliases are also supported. - """ - - def __init__(self): - # There may be several names or alias that points to one definition. - # So in order to spare memory we split the name/index-to-field-lookup - # into a name/index-to-definition-lookup and a definition-to-field-lookup - self._def_lookup = LuxtronikDefinitionsDictionary() - self._field_lookup = {} - # Furthermore stores the definition-to-field-lookup separate from the - # field-definition pairs to keep the index-sorted order when adding new entries - self._items = [] # list of tuples, 0: definition, 1: field - - def __getitem__(self, def_field_name_or_idx): - return self.get(def_field_name_or_idx) - - def __setitem__(self, def_name_or_idx, value): - assert False, "__setitem__ not implemented." - - def __len__(self): - return len(self._def_lookup._index_dict) - - def __iter__(self): - """ - Iterate over all non-obsolete indices. If an index is assigned multiple times, - only the index of the preferred definition will be output. - """ - # _items is a list of tuples, 0: definition, 1: field - all_related_defs = self._def_lookup._index_dict.values() - return iter([d.index for d in self._items if d in all_related_defs]) - - def __contains__(self, def_field_name_or_idx): - """ - Check whether the data vector contains a name, index, - or definition matching an added field, or the field itself. - - Args: - def_field_name_or_idx (LuxtronikDefinition | Base | str | int): - Definition object, field object, field name or register index. - - Returns: - True if the searched element was found, otherwise False. - """ - if isinstance(def_field_name_or_idx, Base): - return any(def_field_name_or_idx is field for field in self._field_lookup.values()) - elif isinstance(def_field_name_or_idx, LuxtronikDefinition): - # speed-up the look-up by search only the name-dict - return def_field_name_or_idx.name in self._def_lookup._name_dict - else: - return def_field_name_or_idx in self._def_lookup - - def values(self): - """ - Iterator for all added non-obsolete fields. If an index is assigned multiple times, - only the field of the preferred definition will be output. - """ - # _items is a list of tuples, 0: definition, 1: field - all_related_defs = self._def_lookup._index_dict.values() - return iter([f for d, f in self._items if d in all_related_defs]) - - def items(self): - """ - Iterator for all non-obsolete index-field-pairs (list of tuples with - 0: index, 1: field) contained herein. If an index is assigned multiple times, - only the index-field-pair of the preferred definition will be output. - """ - # _items is a list of tuples, 0: definition, 1: field - all_related_defs = self._def_lookup._index_dict.values() - return iter([(d.index, f) for d, f in self._items if d in all_related_defs]) - - @property - def def_items(self): - """ - Iterator for all definition-field-pairs (list of tuples with - 0: definition, 1: field) contained herein. - """ - return self._items - - @property - def def_dict(self): - """Return the internal definition dictionary""" - return self._def_lookup - - def add(self, definition, field, alias=None): - """ - Add a definition-field-pair to the internal dictionaries. - - Args: - definition (LuxtronikDefinition): Definition related to the field. - field (Base): Field to add. - alias (Hashable | None): Alias, which can be used to access the field again. - """ - if definition.valid: - self._def_lookup.add(definition, alias) - self._field_lookup[definition] = field - self._items.append((definition, field)) - - def add_sorted(self, definition, field, alias=None): - if definition.valid: - self.add(definition, field, alias) - # sort _items by definition.index - # _items is a list of tuples, 0: definition, 1: field - self._items.sort(key=lambda item: item[0].index) - - def register_alias(self, def_field_name_or_idx, alias): - """ - Add an alternative name (or anything hashable else) - that can be used to access a specific field. - - Args: - def_field_name_or_idx (LuxtronikDefinition | Base | str | int): - Field to which the alias is to be added. - Either by definition, name, register index, or the field itself. - alias (Hashable): Alias, which can be used to access the field again. - - Returns: - Base | None: The field to which the alias was added, - or None if not possible - """ - # Resolve a field input - def_name_or_idx = def_field_name_or_idx - if isinstance(def_name_or_idx, Base): - def_name_or_idx = def_name_or_idx.name - # register alias - definition = self._def_lookup.register_alias(def_name_or_idx, alias) - if definition is None: - return None - return self._field_lookup.get(definition, None) - - def get(self, def_field_name_or_idx, default=None): - """ - Retrieve a field by definition, name or register index. - - Args: - def_field_name_or_idx (LuxtronikDefinition | str | int): - Definition, name, or register index to be used to search for the field. - - Returns: - Base | None: The field found or the provided default if not found. - - Note: - If multiple fields added for the same index/name, - the last added takes precedence. - """ - def_name_or_idx = def_field_name_or_idx - if isinstance(def_name_or_idx, Base): - def_name_or_idx = def_name_or_idx.name - if isinstance(def_name_or_idx, LuxtronikDefinition): - definition = def_name_or_idx - else: - definition = self._def_lookup.get(def_name_or_idx) - if definition is not None: - return self._field_lookup.get(definition, default) - return default - ############################################################################### # Base class for all luxtronik data vectors ############################################################################### diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index cc9493a..0f41325 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -2,7 +2,8 @@ import logging from luxtronik.common import version_in_range -from luxtronik.data_vector import LuxtronikFieldsDictionary, DataVector +from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.data_vector import DataVector from luxtronik.datatypes import Base, Unknown from luxtronik.definitions import LuxtronikDefinition diff --git a/tests/test_data_vector.py b/tests/test_data_vector.py index 6c2a5be..fc8d5aa 100644 --- a/tests/test_data_vector.py +++ b/tests/test_data_vector.py @@ -3,8 +3,9 @@ # pylint: disable=too-few-public-methods,invalid-name,too-many-lines import pytest +from luxtronik.collections import LuxtronikFieldsDictionary from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary -from luxtronik.data_vector import LuxtronikFieldsDictionary, DataVector +from luxtronik.data_vector import DataVector from luxtronik.datatypes import ( Base, From 12b2f9c302a6f31ebf4c0aa68cb0c3afd555b252 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 22:07:56 +0100 Subject: [PATCH 02/10] Move the pytest class for the LuxtronikFieldsDictionary into a separate file --- tests/test_collections.py | 160 +++++++++++++++++++++++++++++++++++++ tests/test_data_vector.py | 162 +------------------------------------- 2 files changed, 162 insertions(+), 160 deletions(-) create mode 100644 tests/test_collections.py diff --git a/tests/test_collections.py b/tests/test_collections.py new file mode 100644 index 0000000..e71eb67 --- /dev/null +++ b/tests/test_collections.py @@ -0,0 +1,160 @@ + +from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary +from luxtronik.datatypes import ( + Base, + Unknown, +) + + +class TestLuxtronikFieldsDictionary: + + def test_init(self): + d = LuxtronikFieldsDictionary() + + assert type(d._def_lookup) is LuxtronikDefinitionsDictionary + assert type(d._field_lookup) is dict + assert len(d._field_lookup.values()) == 0 + assert type(d._items) is list + assert len(d._items) == 0 + + def test_add(self): + d = LuxtronikFieldsDictionary() + assert len(d) == 0 + assert len(d._items) == 0 + + u = LuxtronikDefinition.unknown(1, "test", 0) + f = u.create_field() + d.add(u, f) + assert len(d) == 1 + assert len(d._items) == 1 + assert d._items[0][0] is u + assert d._items[0][1] is f + + u = LuxtronikDefinition.unknown(2, "test", 0) + f = u.create_field() + d.add(u, f) + assert len(d) == 2 + assert len(d._items) == 2 + assert d._items[1][0] is u + assert d._items[1][1] is f + + u = LuxtronikDefinition.unknown(0, "test", 0) + f = u.create_field() + d.add_sorted(u, f) + assert len(d) == 3 + assert len(d._items) == 3 + assert d._items[0][0] is u + assert d._items[0][1] is f + + def create_instance(self): + d = LuxtronikFieldsDictionary() + u = LuxtronikDefinition.unknown(1, "test", 0) + d.add(u, u.create_field()) + u = LuxtronikDefinition.unknown(2, "test", 0) + d.add(u, u.create_field()) + b = LuxtronikDefinition({ + "index": 2, + "type": Base, + "names": ["base2"], + }, "test", 0) + f = b.create_field() + d.add(b, f) + b = LuxtronikDefinition({ + "index": 3, + "type": Base, + "names": ["base3"], + }, "test", 0) + d.add(b, b.create_field(), "base4") + return d, u, f + + def test_len(self): + d, _, _ = self.create_instance() + # 3 different indices + assert len(d) == 3 + assert len(d._items) == 4 + + def test_get_contains(self): + d, u, f = self.create_instance() + assert "1" in d + assert d["1"].name == "unknown_test_1" + assert "unknown_test_1" in d + assert d["unknown_test_1"].name == "unknown_test_1" + assert 2 in d + assert d[2].name == "base2" + assert "unknown_test_2" in d + assert d["unknown_test_2"].name == "unknown_test_2" + assert "base2" in d + assert d["base2"].name == "base2" + assert "base3" in d + assert d.get("base3").name == "base3" + assert "base4" in d + assert d.get("base4").name == "base3" + assert u in d + assert d[u].name == "unknown_test_2" + assert f in d + assert d[f].name == "base2" + assert 4 not in d + + def test_iter(self): + d, _, _ = self.create_instance() + for idx, key in enumerate(d): + if idx == 0: + assert key == 1 + if idx == 1: + assert key == 2 + if idx == 2: + assert key == 3 + + def test_values(self): + d, _, _ = self.create_instance() + for idx, value in enumerate(d.values()): + if idx == 0: + assert type(value) is Unknown + assert value.name == "unknown_test_1" + if idx == 1: + assert type(value) is Base + assert value.name == "base2" + if idx == 2: + assert type(value) is Base + assert value.name == "base3" + + def test_items(self): + d, _, _ = self.create_instance() + for idx, (key, value) in enumerate(d.items()): + if idx == 0: + assert key == 1 + assert type(value) is Unknown + assert value.name == "unknown_test_1" + if idx == 1: + assert key == 2 + assert type(value) is Base + assert value.name == "base2" + if idx == 2: + assert key == 3 + assert type(value) is Base + assert value.name == "base3" + + class MyTestClass: + pass + + def test_alias(self): + d, u, f = self.create_instance() + my = self.MyTestClass() + + d.register_alias(0, "abc") + assert d["abc"] is d[0] + + field = d.register_alias("unknown_test_1", 6) + assert d[6] is field + + field = d.register_alias(u, my) + assert d[my] is d[u] + + d.register_alias(f, my) + assert d[my] is not d[u] + assert d[my] is d[f] + + field = d.register_alias(9, my) + assert field is None + assert d[my] is d[f] \ No newline at end of file diff --git a/tests/test_data_vector.py b/tests/test_data_vector.py index fc8d5aa..9a13632 100644 --- a/tests/test_data_vector.py +++ b/tests/test_data_vector.py @@ -3,167 +3,9 @@ # pylint: disable=too-few-public-methods,invalid-name,too-many-lines import pytest -from luxtronik.collections import LuxtronikFieldsDictionary -from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary -from luxtronik.data_vector import DataVector - -from luxtronik.datatypes import ( - Base, - Unknown, -) - - -class TestLuxtronikFieldsDictionary: - - def test_init(self): - d = LuxtronikFieldsDictionary() - - assert type(d._def_lookup) is LuxtronikDefinitionsDictionary - assert type(d._field_lookup) is dict - assert len(d._field_lookup.values()) == 0 - assert type(d._items) is list - assert len(d._items) == 0 - - def test_add(self): - d = LuxtronikFieldsDictionary() - assert len(d) == 0 - assert len(d._items) == 0 - - u = LuxtronikDefinition.unknown(1, "test", 0) - f = u.create_field() - d.add(u, f) - assert len(d) == 1 - assert len(d._items) == 1 - assert d._items[0][0] is u - assert d._items[0][1] is f - - u = LuxtronikDefinition.unknown(2, "test", 0) - f = u.create_field() - d.add(u, f) - assert len(d) == 2 - assert len(d._items) == 2 - assert d._items[1][0] is u - assert d._items[1][1] is f - - u = LuxtronikDefinition.unknown(0, "test", 0) - f = u.create_field() - d.add_sorted(u, f) - assert len(d) == 3 - assert len(d._items) == 3 - assert d._items[0][0] is u - assert d._items[0][1] is f - - def create_instance(self): - d = LuxtronikFieldsDictionary() - u = LuxtronikDefinition.unknown(1, "test", 0) - d.add(u, u.create_field()) - u = LuxtronikDefinition.unknown(2, "test", 0) - d.add(u, u.create_field()) - b = LuxtronikDefinition({ - "index": 2, - "type": Base, - "names": ["base2"], - }, "test", 0) - f = b.create_field() - d.add(b, f) - b = LuxtronikDefinition({ - "index": 3, - "type": Base, - "names": ["base3"], - }, "test", 0) - d.add(b, b.create_field(), "base4") - return d, u, f - - def test_len(self): - d, _, _ = self.create_instance() - # 3 different indices - assert len(d) == 3 - assert len(d._items) == 4 - def test_get_contains(self): - d, u, f = self.create_instance() - assert "1" in d - assert d["1"].name == "unknown_test_1" - assert "unknown_test_1" in d - assert d["unknown_test_1"].name == "unknown_test_1" - assert 2 in d - assert d[2].name == "base2" - assert "unknown_test_2" in d - assert d["unknown_test_2"].name == "unknown_test_2" - assert "base2" in d - assert d["base2"].name == "base2" - assert "base3" in d - assert d.get("base3").name == "base3" - assert "base4" in d - assert d.get("base4").name == "base3" - assert u in d - assert d[u].name == "unknown_test_2" - assert f in d - assert d[f].name == "base2" - assert 4 not in d - - def test_iter(self): - d, _, _ = self.create_instance() - for idx, key in enumerate(d): - if idx == 0: - assert key == 1 - if idx == 1: - assert key == 2 - if idx == 2: - assert key == 3 - - def test_values(self): - d, _, _ = self.create_instance() - for idx, value in enumerate(d.values()): - if idx == 0: - assert type(value) is Unknown - assert value.name == "unknown_test_1" - if idx == 1: - assert type(value) is Base - assert value.name == "base2" - if idx == 2: - assert type(value) is Base - assert value.name == "base3" - - def test_items(self): - d, _, _ = self.create_instance() - for idx, (key, value) in enumerate(d.items()): - if idx == 0: - assert key == 1 - assert type(value) is Unknown - assert value.name == "unknown_test_1" - if idx == 1: - assert key == 2 - assert type(value) is Base - assert value.name == "base2" - if idx == 2: - assert key == 3 - assert type(value) is Base - assert value.name == "base3" - - class MyTestClass: - pass - - def test_alias(self): - d, u, f = self.create_instance() - my = self.MyTestClass() - - d.register_alias(0, "abc") - assert d["abc"] is d[0] - - field = d.register_alias("unknown_test_1", 6) - assert d[6] is field - - field = d.register_alias(u, my) - assert d[my] is d[u] - - d.register_alias(f, my) - assert d[my] is not d[u] - assert d[my] is d[f] - - field = d.register_alias(9, my) - assert field is None - assert d[my] is d[f] +from luxtronik.data_vector import DataVector +from luxtronik.datatypes import Base class ObsoleteDataVector(DataVector): From 4d1ff58d6dcaf66963f7019aebab7c60aae6a27a Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 22:37:28 +0100 Subject: [PATCH 03/10] Refactor LuxtronikFieldsDictionary _items/def_items to _pairs/pairs --- luxtronik/collections.py | 29 ++++++++++++++--------------- luxtronik/shi/interface.py | 6 +++--- luxtronik/shi/vector.py | 12 ++++++------ tests/shi/test_shi_vector.py | 6 +++--- tests/test_collections.py | 26 +++++++++++++------------- 5 files changed, 39 insertions(+), 40 deletions(-) diff --git a/luxtronik/collections.py b/luxtronik/collections.py index f2f89e5..18ae96b 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -28,7 +28,7 @@ def __init__(self): self._field_lookup = {} # Furthermore stores the definition-to-field-lookup separate from the # field-definition pairs to keep the index-sorted order when adding new entries - self._items = [] # list of tuples, 0: definition, 1: field + self._pairs = [] # list of tuples, 0: definition, 1: field def __getitem__(self, def_field_name_or_idx): return self.get(def_field_name_or_idx) @@ -44,9 +44,9 @@ def __iter__(self): Iterate over all non-obsolete indices. If an index is assigned multiple times, only the index of the preferred definition will be output. """ - # _items is a list of tuples, 0: definition, 1: field + # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() - return iter([d.index for d in self._items if d in all_related_defs]) + return iter([d.index for d in self._pairs if d in all_related_defs]) def __contains__(self, def_field_name_or_idx): """ @@ -73,9 +73,9 @@ def values(self): Iterator for all added non-obsolete fields. If an index is assigned multiple times, only the field of the preferred definition will be output. """ - # _items is a list of tuples, 0: definition, 1: field + # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() - return iter([f for d, f in self._items if d in all_related_defs]) + return iter([f for d, f in self._pairs if d in all_related_defs]) def items(self): """ @@ -83,17 +83,16 @@ def items(self): 0: index, 1: field) contained herein. If an index is assigned multiple times, only the index-field-pair of the preferred definition will be output. """ - # _items is a list of tuples, 0: definition, 1: field + # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() - return iter([(d.index, f) for d, f in self._items if d in all_related_defs]) + return iter([(d.index, f) for d, f in self._pairs if d in all_related_defs]) - @property - def def_items(self): + def pairs(self): """ - Iterator for all definition-field-pairs (list of tuples with + Return all definition-field-pairs (list of tuples with 0: definition, 1: field) contained herein. """ - return self._items + return self._pairs @property def def_dict(self): @@ -112,14 +111,14 @@ def add(self, definition, field, alias=None): if definition.valid: self._def_lookup.add(definition, alias) self._field_lookup[definition] = field - self._items.append((definition, field)) + self._pairs.append((definition, field)) def add_sorted(self, definition, field, alias=None): if definition.valid: self.add(definition, field, alias) - # sort _items by definition.index - # _items is a list of tuples, 0: definition, 1: field - self._items.sort(key=lambda item: item[0].index) + # sort _pairs by definition.index + # _pairs is a list of tuples, 0: definition, 1: field + self._pairs.sort(key=lambda pair: pair[0].index) def register_alias(self, def_field_name_or_idx, alias): """ diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index d73118e..0f043b5 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -466,12 +466,12 @@ def _collect_fields(self, blocks_list, data_vector, definitions, read_not_write) # Trial-and-error mode: Add a block for every field blocks = ContiguousDataBlockList(definitions.name, read_not_write) if (read_not_write == READ): - for definition, field in data_vector.data.def_items: + for definition, field in data_vector.data.pairs(): # _prepare_read_field will never fail, no need to call it #if self._prepare_read_field(definition, field): blocks.append_single(definition, field) else: - for definition, field in data_vector.data.def_items: + for definition, field in data_vector.data.pairs(): if self._prepare_write_field(definition, field, data_vector.safe, None): blocks.append_single(definition, field) if len(blocks) > 0: @@ -485,7 +485,7 @@ def _collect_fields(self, blocks_list, data_vector, definitions, read_not_write) else: blocks = ContiguousDataBlockList(definitions.name, read_not_write) # Organize data into contiguous blocks - for definition, field in data_vector.data.def_items: + for definition, field in data_vector.data.pairs(): if self._prepare_write_field(definition, field, data_vector.safe, None): blocks.collect(definition, field) if len(blocks) > 0: diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index 0f41325..cbbde9f 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -151,10 +151,10 @@ def __setitem__(self, def_name_or_idx, value): return self.set(def_name_or_idx, value) def __len__(self): - return len(self._data.def_items) + return len(self._data.pairs()) def __iter__(self): - return iter([definition for definition, _ in self._data.def_items]) + return iter([definition for definition, _ in self._data.pairs()]) def __contains__(self, def_field_name_or_idx): """ @@ -178,10 +178,10 @@ def version(self): return self._version def values(self): - return iter([field for _, field in self._data.def_items]) + return iter([field for _, field in self._data.pairs()]) def items(self): - return iter(self._data.def_items) + return iter(self._data.pairs()) # Find, add and alias methods ################################################# @@ -286,7 +286,7 @@ def update_read_blocks(self): """ if not self._read_blocks_up_to_date: self._read_blocks.clear() - for definition, field in self._data.def_items: + for definition, field in self._data.pairs(): self._read_blocks.collect(definition, field) self._read_blocks_up_to_date = True @@ -302,7 +302,7 @@ def parse(self, raw_data): The raw data must start at register index 0. """ raw_len = len(raw_data) - for definition, field in self._data.def_items: + for definition, field in self._data.pairs(): if definition.index + definition.count >= raw_len: continue integrate_data(definition, field, raw_data) diff --git a/tests/shi/test_shi_vector.py b/tests/shi/test_shi_vector.py index 50b038e..a3a4137 100644 --- a/tests/shi/test_shi_vector.py +++ b/tests/shi/test_shi_vector.py @@ -92,7 +92,7 @@ def test_create(self): data_vector = DataVectorTest(parse_version("1.2")) assert data_vector.version == (1, 2, 0, 0) assert len(data_vector) == 3 - assert len(data_vector._data._items) == 3 + assert len(data_vector._data.pairs()) == 3 assert not data_vector._read_blocks_up_to_date assert len(data_vector._read_blocks) == 0 @@ -216,7 +216,7 @@ def test_add(self): field = data_vector.add(def_9a) assert def_9a in data_vector assert len(data_vector) == 3 - assert len(data_vector.data._items) == 3 + assert len(data_vector.data._pairs) == 3 assert field.name == 'field_9a' # Get via index (last added) @@ -433,7 +433,7 @@ def test_version_none(self): assert len(data_vector) == 3 data_vector.add(10) # field_9a alias assert len(data_vector) == 4 - assert len(data_vector._data._items) == 4 + assert len(data_vector._data._pairs) == 4 class TestHoldings: diff --git a/tests/test_collections.py b/tests/test_collections.py index e71eb67..eaebe81 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -15,37 +15,37 @@ def test_init(self): assert type(d._def_lookup) is LuxtronikDefinitionsDictionary assert type(d._field_lookup) is dict assert len(d._field_lookup.values()) == 0 - assert type(d._items) is list - assert len(d._items) == 0 + assert type(d._pairs) is list + assert len(d._pairs) == 0 def test_add(self): d = LuxtronikFieldsDictionary() assert len(d) == 0 - assert len(d._items) == 0 + assert len(d.pairs()) == 0 u = LuxtronikDefinition.unknown(1, "test", 0) f = u.create_field() d.add(u, f) assert len(d) == 1 - assert len(d._items) == 1 - assert d._items[0][0] is u - assert d._items[0][1] is f + assert len(d._pairs) == 1 + assert d._pairs[0][0] is u + assert d._pairs[0][1] is f u = LuxtronikDefinition.unknown(2, "test", 0) f = u.create_field() d.add(u, f) assert len(d) == 2 - assert len(d._items) == 2 - assert d._items[1][0] is u - assert d._items[1][1] is f + assert len(d._pairs) == 2 + assert d._pairs[1][0] is u + assert d._pairs[1][1] is f u = LuxtronikDefinition.unknown(0, "test", 0) f = u.create_field() d.add_sorted(u, f) assert len(d) == 3 - assert len(d._items) == 3 - assert d._items[0][0] is u - assert d._items[0][1] is f + assert len(d._pairs) == 3 + assert d._pairs[0][0] is u + assert d._pairs[0][1] is f def create_instance(self): d = LuxtronikFieldsDictionary() @@ -72,7 +72,7 @@ def test_len(self): d, _, _ = self.create_instance() # 3 different indices assert len(d) == 3 - assert len(d._items) == 4 + assert len(d.pairs()) == 4 def test_get_contains(self): d, u, f = self.create_instance() From 3dc426131fa03b5255ab1157776fd620d5142798 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 22:47:15 +0100 Subject: [PATCH 04/10] Update some LuxtronikFieldsDictionary descriptions. No code change! --- luxtronik/collections.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/luxtronik/collections.py b/luxtronik/collections.py index 18ae96b..3f60889 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -96,7 +96,7 @@ def pairs(self): @property def def_dict(self): - """Return the internal definition dictionary""" + """Return the internal definition dictionary, containing all added definitions""" return self._def_lookup def add(self, definition, field, alias=None): @@ -114,6 +114,14 @@ def add(self, definition, field, alias=None): self._pairs.append((definition, field)) def add_sorted(self, definition, field, alias=None): + """ + Behaves like the normal `add` but then sorts the pairs. + + Args: + definition (LuxtronikDefinition): Definition related to the field. + field (Base): Field to add. + alias (Hashable | None): Alias, which can be used to access the field again. + """ if definition.valid: self.add(definition, field, alias) # sort _pairs by definition.index @@ -147,11 +155,11 @@ def register_alias(self, def_field_name_or_idx, alias): def get(self, def_field_name_or_idx, default=None): """ - Retrieve a field by definition, name or register index. + Retrieve a field by definition, name or register index, or the field itself. Args: - def_field_name_or_idx (LuxtronikDefinition | str | int): - Definition, name, or register index to be used to search for the field. + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Definition, field, name, or register index to be used to search for the field. Returns: Base | None: The field found or the provided default if not found. From 6f908b82c5049ac89ba8d8b5777b06908a4ec7bd Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 22:52:11 +0100 Subject: [PATCH 05/10] Extract a LuxtronikDefFieldPair from ContiguousDataPart and move get_data_arr, check_arr and integrate data --- luxtronik/collections.py | 182 ++++++++++++++++++++++++++++++ luxtronik/shi/contiguous.py | 46 +------- luxtronik/shi/definitions.py | 142 ----------------------- luxtronik/shi/interface.py | 2 +- luxtronik/shi/vector.py | 3 +- tests/shi/test_shi_definitions.py | 6 +- 6 files changed, 189 insertions(+), 192 deletions(-) delete mode 100644 luxtronik/shi/definitions.py diff --git a/luxtronik/collections.py b/luxtronik/collections.py index 3f60889..f271f35 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -5,9 +5,191 @@ from luxtronik.datatypes import Base from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary +# TODO: Remove SHI dependency +LUXTRONIK_SHI_REGISTER_BIT_SIZE = 16 +LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE = 0x7FFF + LOGGER = logging.getLogger(__name__) +############################################################################### +# Common methods +############################################################################### + +VALUE_MASK = (1 << LUXTRONIK_SHI_REGISTER_BIT_SIZE) - 1 + +def pack_values(values, reverse=True): + """ + Packs a list of data chunks into one integer. + + Args: + values (list[int]): raw data; distributed across multiple registers. + reverse (bool): Use big-endian/MSB-first if true, + otherwise use little-endian/LSB-first order. + + Returns: + int: Packed raw data as a single integer value. + + Note: + The smart home interface uses a chunk size of 16 bits. + """ + count = len(values) + + result = 0 + for idx, value in enumerate(values): + # normal: idx = 0..n-1 + # reversed index: highest chunk first + bit_index = (count - 1 - idx) if reverse else idx + + result |= (value & VALUE_MASK) << (LUXTRONIK_SHI_REGISTER_BIT_SIZE * bit_index) + + return result + +def unpack_values(packed, count, reverse=True): + """ + Unpacks 'count' values from a packed integer. + + Args: + packed (int): Packed raw data as a single integer value. + count (int): Number of chunks to unpack. + reverse (bool): Use big-endian/MSB-first if true, + otherwise use little-endian/LSB-first order. + + Returns: + list[int]: List of unpacked raw data values. + + Note: + The smart home interface uses a chunk size of 16 bits. + """ + values = [] + + for idx in range(count): + # normal: idx = 0..n-1 + # reversed: highest chunk first + bit_index = (count - 1 - idx) if reverse else idx + + chunk = (packed >> (LUXTRONIK_SHI_REGISTER_BIT_SIZE * bit_index)) & VALUE_MASK + values.append(chunk) + + return values + + +def get_data_arr(definition, field): + """ + Normalize the field's data to a list of the correct size. + + Args: + definition (LuxtronikDefinition): Meta-data of the field. + field (Base): Field object that contains data to get. + + Returns: + list[int] | None: List of length `definition.count`, + or None if the data size does not match. + """ + data = field.raw + if data is None: + return None + if not isinstance(data, list) and definition.count > 1 \ + and field.concatenate_multiple_data_chunks: + # Usually big-endian (reverse=True) is used + data = unpack_values(data, definition.count) + if not isinstance(data, list): + data = [data] + return data if len(data) == definition.count else None + +def check_data(definition, field): + """ + Validate that the field contains sufficient raw data. + + Args: + definition (LuxtronikDefinition): Meta-data of the field. + field (Base): Field object that contains the data to check. + + Returns: + bool: True if valid, False otherwise. + """ + return get_data_arr(definition, field) is not None + +def integrate_data(definition, field, raw_data, data_offset=-1): + """ + Integrate raw values from a data array into the field. + + Args: + definition (LuxtronikDefinition): Meta-data of the field. + field (Base): Field object where to integrate the data. + raw_data (list): Source array of bytes/words. + data_offset (int): Optional offset. Defaults to `definition.index`. + """ + data_offset = data_offset if data_offset >= 0 else definition.index + # Use the information of the definition to extract the raw-value + if (data_offset + definition.count - 1) >= len(raw_data): + raw = None + elif definition.count == 1: + raw = raw_data[data_offset] + raw = raw if raw != LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE else None + else: + raw = raw_data[data_offset : data_offset + definition.count] + raw = raw if len(raw) == definition.count and \ + not any(data == LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE for data in raw) else None + if field.concatenate_multiple_data_chunks and raw is not None: + # Usually big-endian (reverse=True) is used + raw = pack_values(raw) + field.raw = raw + +############################################################################### +# Definition / field pair +############################################################################### + +class LuxtronikDefFieldPair: + """ + Combines a definition and a field into a single iterable object. + """ + + def __init__(self, definition, field): + """ + Initialize a definition-field-pair. + + Args: + field (Base): The field object. + definition (LuxtronikDefinition): The definition for this field. + """ + self.field = field + self.definition = definition + + def __iter__(self): + yield self.definition + yield self.field + + @property + def index(self): + return self.definition.index + + @property + def addr(self): + return self.definition.addr + + @property + def count(self): + return self.definition.count + + def get_data_arr(self): + """ + Normalize the field's data to a list of the correct size. + + Returns: + list[int] | None: List of length `definition.count`, or None if insufficient. + """ + return get_data_arr(self.definition, self.field) + + def integrate_data(self, raw_data, data_offset=-1): + """ + Integrate the related parts of the `raw_data` into the field + + Args: + raw_data (list): Source array of register values. + data_offset (int): Optional offset. Defaults to `definition.index`. + """ + integrate_data(self.definition, self.field, raw_data, data_offset) ############################################################################### # Field dictionary for data vectors diff --git a/luxtronik/shi/contiguous.py b/luxtronik/shi/contiguous.py index 4f1a12d..52ec1f8 100644 --- a/luxtronik/shi/contiguous.py +++ b/luxtronik/shi/contiguous.py @@ -6,7 +6,7 @@ import logging -from luxtronik.shi.definitions import get_data_arr, integrate_data +from luxtronik.collections import LuxtronikDefFieldPair LOGGER = logging.getLogger(__name__) @@ -15,57 +15,15 @@ # ContiguousDataPart ############################################################################### -class ContiguousDataPart: +class ContiguousDataPart(LuxtronikDefFieldPair): """ Represents a single element of a contiguous data block. Each part references a `field` and its associated `definition`. """ - def __init__(self, definition, field): - """ - Initialize a contiguous data part. - - Args: - field (Base): The field object to read or write. - definition (LuxtronikDefinition): The definition for this field. - """ - self.field = field - self.definition = definition - def __repr__(self): return f"({self.index}, {self.count})" - @property - def index(self): - return self.definition.index - - @property - def addr(self): - return self.definition.addr - - @property - def count(self): - return self.definition.count - - def get_data_arr(self): - """ - Normalize the field's data to a list of the correct size. - - Returns: - list[int] | None: List of length `definition.count`, or None if insufficient. - """ - return get_data_arr(self.definition, self.field) - - def integrate_data(self, raw_data, data_offset=-1): - """ - Integrate the related parts of the `raw_data` into the field - - Args: - raw_data (list): Source array of bytes/words. - data_offset (int): Optional offset. Defaults to `definition.index`. - """ - integrate_data(self.definition, self.field, raw_data, data_offset) - ############################################################################### # ContiguousDataBlock diff --git a/luxtronik/shi/definitions.py b/luxtronik/shi/definitions.py deleted file mode 100644 index 1e054fe..0000000 --- a/luxtronik/shi/definitions.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -The metadata (`index`, `count`, ...) for a field (`Base`, `SelectionBase`) -is stored as a definition object. For ease of use, all definitions -of one type (`input`, `holding`, ...) are provided as a sorted list of objects. -This usually contains only predefined definitions (generated out of -`HOLDINGS_DEFINITIONS_LIST` or `INPUTS_DEFINITIONS_LIST`), -but can be expanded by the user. -""" - -import logging - -from luxtronik.shi.constants import ( - LUXTRONIK_SHI_REGISTER_BIT_SIZE, - LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE -) - - -LOGGER = logging.getLogger(__name__) - -############################################################################### -# Definition-Field-Pair methods -############################################################################### - -VALUE_MASK = (1 << LUXTRONIK_SHI_REGISTER_BIT_SIZE) - 1 - -def pack_values(values, reverse=True): - """ - Packs a list of data chunks into one integer. - - Args: - values (list[int]): raw data; distributed across multiple registers. - reverse (bool): Use big-endian/MSB-first if true, - otherwise use little-endian/LSB-first order. - - Returns: - int: Packed raw data as a single integer value. - - Note: - The smart home interface uses a chunk size of 16 bits. - """ - count = len(values) - - result = 0 - for idx, value in enumerate(values): - # normal: idx = 0..n-1 - # reversed index: highest chunk first - bit_index = (count - 1 - idx) if reverse else idx - - result |= (value & VALUE_MASK) << (LUXTRONIK_SHI_REGISTER_BIT_SIZE * bit_index) - - return result - -def unpack_values(packed, count, reverse=True): - """ - Unpacks 'count' values from a packed integer. - - Args: - packed (int): Packed raw data as a single integer value. - count (int): Number of chunks to unpack. - reverse (bool): Use big-endian/MSB-first if true, - otherwise use little-endian/LSB-first order. - - Returns: - list[int]: List of unpacked raw data values. - - Note: - The smart home interface uses a chunk size of 16 bits. - """ - values = [] - - for idx in range(count): - # normal: idx = 0..n-1 - # reversed: highest chunk first - bit_index = (count - 1 - idx) if reverse else idx - - chunk = (packed >> (LUXTRONIK_SHI_REGISTER_BIT_SIZE * bit_index)) & VALUE_MASK - values.append(chunk) - - return values - - -def get_data_arr(definition, field): - """ - Normalize the field's data to a list of the correct size. - - Args: - definition (LuxtronikDefinition): Meta-data of the field. - field (Base): Field object that contains data to get. - - Returns: - list[int] | None: List of length `definition.count`, - or None if the data size does not match. - """ - data = field.raw - if data is None: - return None - if not isinstance(data, list) and definition.count > 1 \ - and field.concatenate_multiple_data_chunks: - # Usually big-endian (reverse=True) is used - data = unpack_values(data, definition.count) - if not isinstance(data, list): - data = [data] - return data if len(data) == definition.count else None - -def check_data(definition, field): - """ - Validate that the field contains sufficient raw data. - - Args: - definition (LuxtronikDefinition): Meta-data of the field. - field (Base): Field object that contains the data to check. - - Returns: - bool: True if valid, False otherwise. - """ - return get_data_arr(definition, field) is not None - -def integrate_data(definition, field, raw_data, data_offset=-1): - """ - Integrate raw values from a data array into the field. - - Args: - definition (LuxtronikDefinition): Meta-data of the field. - field (Base): Field object where to integrate the data. - raw_data (list): Source array of bytes/words. - data_offset (int): Optional offset. Defaults to `definition.index`. - """ - data_offset = data_offset if data_offset >= 0 else definition.index - # Use the information of the definition to extract the raw-value - if (data_offset + definition.count - 1) >= len(raw_data): - raw = None - elif definition.count == 1: - raw = raw_data[data_offset] - raw = raw if raw != LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE else None - else: - raw = raw_data[data_offset : data_offset + definition.count] - raw = raw if len(raw) == definition.count and \ - not any(data == LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE for data in raw) else None - if field.concatenate_multiple_data_chunks and raw is not None: - # Usually big-endian (reverse=True) is used - raw = pack_values(raw) - field.raw = raw \ No newline at end of file diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index 0f043b5..0f720f6 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -2,6 +2,7 @@ import logging +from luxtronik.collections import check_data from luxtronik.common import classproperty, version_in_range from luxtronik.datatypes import Base from luxtronik.definitions import ( @@ -14,7 +15,6 @@ LuxtronikSmartHomeReadInputsTelegram, LuxtronikSmartHomeWriteHoldingsTelegram, ) -from luxtronik.shi.definitions import check_data from luxtronik.shi.vector import DataVectorSmartHome from luxtronik.shi.holdings import Holdings, HOLDINGS_DEFINITIONS from luxtronik.shi.inputs import Inputs, INPUTS_DEFINITIONS diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index cbbde9f..7e087af 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -2,13 +2,12 @@ import logging from luxtronik.common import version_in_range -from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.collections import integrate_data, LuxtronikFieldsDictionary from luxtronik.data_vector import DataVector from luxtronik.datatypes import Base, Unknown from luxtronik.definitions import LuxtronikDefinition from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION -from luxtronik.shi.definitions import integrate_data from luxtronik.shi.contiguous import ContiguousDataBlockList diff --git a/tests/shi/test_shi_definitions.py b/tests/shi/test_shi_definitions.py index acb2981..0de130f 100644 --- a/tests/shi/test_shi_definitions.py +++ b/tests/shi/test_shi_definitions.py @@ -1,10 +1,10 @@ -from luxtronik.definitions import LuxtronikDefinition -from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE -from luxtronik.shi.definitions import ( +from luxtronik.collections import ( get_data_arr, check_data, integrate_data, ) +from luxtronik.definitions import LuxtronikDefinition +from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE ############################################################################### # Tests From 3f08f989cfd2bbb0ac8bc44de980c9d82a90cc5b Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 23:10:31 +0100 Subject: [PATCH 06/10] Move TestDefinitionFieldPair from test_shi_definitions.py to test_collections.py --- tests/shi/test_shi_definitions.py | 122 ----------------------------- tests/test_collections.py | 124 +++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 123 deletions(-) delete mode 100644 tests/shi/test_shi_definitions.py diff --git a/tests/shi/test_shi_definitions.py b/tests/shi/test_shi_definitions.py deleted file mode 100644 index 0de130f..0000000 --- a/tests/shi/test_shi_definitions.py +++ /dev/null @@ -1,122 +0,0 @@ -from luxtronik.collections import ( - get_data_arr, - check_data, - integrate_data, -) -from luxtronik.definitions import LuxtronikDefinition -from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE - -############################################################################### -# Tests -############################################################################### - -class TestDefinitionFieldPair: - - def test_data_arr(self): - definition = LuxtronikDefinition.unknown(2, 'Foo', 30) - field = definition.create_field() - field.concatenate_multiple_data_chunks = False - - # get from value - definition._count = 1 - field.raw = 5 - arr = get_data_arr(definition, field) - assert arr == [5] - assert check_data(definition, field) - - # get from array - definition._count = 2 - field.raw = [7, 3] - arr = get_data_arr(definition, field) - assert arr == [7, 3] - assert check_data(definition, field) - - # too much data - definition._count = 2 - field.raw = [4, 8, 1] - arr = get_data_arr(definition, field) - assert arr is None - assert not check_data(definition, field) - - # insufficient data - definition._count = 2 - field.raw = [9] - arr = get_data_arr(definition, field) - assert arr is None - assert not check_data(definition, field) - - field.concatenate_multiple_data_chunks = True - - # get from array - definition._count = 2 - field.raw = 0x0007_0003 - arr = get_data_arr(definition, field) - assert arr == [7, 3] - assert check_data(definition, field) - - # too much data - definition._count = 2 - field.raw = 0x0004_0008_0001 - arr = get_data_arr(definition, field) - assert arr == [8, 1] - assert check_data(definition, field) - - # insufficient data - definition._count = 2 - field.raw = 0x0009 - arr = get_data_arr(definition, field) - assert arr == [0, 9] - assert check_data(definition, field) - - def test_integrate(self): - definition = LuxtronikDefinition.unknown(2, 'Foo', 30) - field = definition.create_field() - field.concatenate_multiple_data_chunks = False - - data = [1, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 3, 4, 5, 6, 7] - - # set array - definition._count = 2 - integrate_data(definition, field, data) - assert field.raw == [3, 4] - integrate_data(definition, field, data, 4) - assert field.raw == [5, 6] - integrate_data(definition, field, data, 7) - assert field.raw is None - integrate_data(definition, field, data, 0) - assert field.raw is None - - # set value - definition._count = 1 - integrate_data(definition, field, data) - assert field.raw == 3 - integrate_data(definition, field, data, 5) - assert field.raw == 6 - integrate_data(definition, field, data, 9) - assert field.raw is None - integrate_data(definition, field, data, 1) - assert field.raw is None - - field.concatenate_multiple_data_chunks = True - - # set array - definition._count = 2 - integrate_data(definition, field, data) - assert field.raw == 0x0003_0004 - integrate_data(definition, field, data, 4) - assert field.raw == 0x0005_0006 - integrate_data(definition, field, data, 7) - assert field.raw is None - integrate_data(definition, field, data, 0) - assert field.raw is None - - # set value - definition._count = 1 - integrate_data(definition, field, data) - assert field.raw == 0x0003 - integrate_data(definition, field, data, 5) - assert field.raw == 0x0006 - integrate_data(definition, field, data, 9) - assert field.raw is None - integrate_data(definition, field, data, 1) - assert field.raw is None \ No newline at end of file diff --git a/tests/test_collections.py b/tests/test_collections.py index eaebe81..ad39d59 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -1,10 +1,132 @@ -from luxtronik.collections import LuxtronikFieldsDictionary +from luxtronik.collections import ( + get_data_arr, + check_data, + integrate_data, + LuxtronikFieldsDictionary, +) from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary from luxtronik.datatypes import ( Base, Unknown, ) +from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE + + +############################################################################### +# Tests +############################################################################### + +class TestDefinitionFieldPair: + + def test_data_arr(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + field = definition.create_field() + field.concatenate_multiple_data_chunks = False + + # get from value + definition._count = 1 + field.raw = 5 + arr = get_data_arr(definition, field) + assert arr == [5] + assert check_data(definition, field) + + # get from array + definition._count = 2 + field.raw = [7, 3] + arr = get_data_arr(definition, field) + assert arr == [7, 3] + assert check_data(definition, field) + + # too much data + definition._count = 2 + field.raw = [4, 8, 1] + arr = get_data_arr(definition, field) + assert arr is None + assert not check_data(definition, field) + + # insufficient data + definition._count = 2 + field.raw = [9] + arr = get_data_arr(definition, field) + assert arr is None + assert not check_data(definition, field) + + field.concatenate_multiple_data_chunks = True + + # get from array + definition._count = 2 + field.raw = 0x0007_0003 + arr = get_data_arr(definition, field) + assert arr == [7, 3] + assert check_data(definition, field) + + # too much data + definition._count = 2 + field.raw = 0x0004_0008_0001 + arr = get_data_arr(definition, field) + assert arr == [8, 1] + assert check_data(definition, field) + + # insufficient data + definition._count = 2 + field.raw = 0x0009 + arr = get_data_arr(definition, field) + assert arr == [0, 9] + assert check_data(definition, field) + + def test_integrate(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + field = definition.create_field() + field.concatenate_multiple_data_chunks = False + + data = [1, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 3, 4, 5, 6, 7] + + # set array + definition._count = 2 + integrate_data(definition, field, data) + assert field.raw == [3, 4] + integrate_data(definition, field, data, 4) + assert field.raw == [5, 6] + integrate_data(definition, field, data, 7) + assert field.raw is None + integrate_data(definition, field, data, 0) + assert field.raw is None + + # set value + definition._count = 1 + integrate_data(definition, field, data) + assert field.raw == 3 + integrate_data(definition, field, data, 5) + assert field.raw == 6 + integrate_data(definition, field, data, 9) + assert field.raw is None + integrate_data(definition, field, data, 1) + assert field.raw is None + + field.concatenate_multiple_data_chunks = True + + # set array + definition._count = 2 + integrate_data(definition, field, data) + assert field.raw == 0x0003_0004 + integrate_data(definition, field, data, 4) + assert field.raw == 0x0005_0006 + integrate_data(definition, field, data, 7) + assert field.raw is None + integrate_data(definition, field, data, 0) + assert field.raw is None + + # set value + definition._count = 1 + integrate_data(definition, field, data) + assert field.raw == 0x0003 + integrate_data(definition, field, data, 5) + assert field.raw == 0x0006 + integrate_data(definition, field, data, 9) + assert field.raw is None + integrate_data(definition, field, data, 1) + assert field.raw is None class TestLuxtronikFieldsDictionary: From 055e460a4f7b45cd1916b123109e2a1aa6bb0d7e Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 23:20:51 +0100 Subject: [PATCH 07/10] Extend the pytest for LuxtronikDefFieldPair --- tests/test_collections.py | 42 +++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/tests/test_collections.py b/tests/test_collections.py index ad39d59..c362d0d 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -3,6 +3,7 @@ get_data_arr, check_data, integrate_data, + LuxtronikDefFieldPair, LuxtronikFieldsDictionary, ) from luxtronik.definitions import LuxtronikDefinition, LuxtronikDefinitionsDictionary @@ -19,9 +20,22 @@ class TestDefinitionFieldPair: + def test_init(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + field = definition.create_field() + pair = LuxtronikDefFieldPair(definition, field) + + assert pair.definition is definition + assert pair.field is field + d, f = pair + assert d is definition + assert f is field + def test_data_arr(self): definition = LuxtronikDefinition.unknown(2, 'Foo', 30) field = definition.create_field() + pair = LuxtronikDefFieldPair(definition, field) + field.concatenate_multiple_data_chunks = False # get from value @@ -29,6 +43,7 @@ def test_data_arr(self): field.raw = 5 arr = get_data_arr(definition, field) assert arr == [5] + assert arr == pair.get_data_arr() assert check_data(definition, field) # get from array @@ -36,6 +51,7 @@ def test_data_arr(self): field.raw = [7, 3] arr = get_data_arr(definition, field) assert arr == [7, 3] + assert arr == pair.get_data_arr() assert check_data(definition, field) # too much data @@ -43,6 +59,7 @@ def test_data_arr(self): field.raw = [4, 8, 1] arr = get_data_arr(definition, field) assert arr is None + assert arr == pair.get_data_arr() assert not check_data(definition, field) # insufficient data @@ -50,6 +67,7 @@ def test_data_arr(self): field.raw = [9] arr = get_data_arr(definition, field) assert arr is None + assert arr == pair.get_data_arr() assert not check_data(definition, field) field.concatenate_multiple_data_chunks = True @@ -59,6 +77,7 @@ def test_data_arr(self): field.raw = 0x0007_0003 arr = get_data_arr(definition, field) assert arr == [7, 3] + assert arr == pair.get_data_arr() assert check_data(definition, field) # too much data @@ -66,6 +85,7 @@ def test_data_arr(self): field.raw = 0x0004_0008_0001 arr = get_data_arr(definition, field) assert arr == [8, 1] + assert arr == pair.get_data_arr() assert check_data(definition, field) # insufficient data @@ -73,35 +93,37 @@ def test_data_arr(self): field.raw = 0x0009 arr = get_data_arr(definition, field) assert arr == [0, 9] + assert arr == pair.get_data_arr() assert check_data(definition, field) def test_integrate(self): definition = LuxtronikDefinition.unknown(2, 'Foo', 30) field = definition.create_field() - field.concatenate_multiple_data_chunks = False - + pair = LuxtronikDefFieldPair(definition, field) data = [1, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 3, 4, 5, 6, 7] + field.concatenate_multiple_data_chunks = False + # set array definition._count = 2 integrate_data(definition, field, data) assert field.raw == [3, 4] - integrate_data(definition, field, data, 4) + pair.integrate_data(data, 4) assert field.raw == [5, 6] integrate_data(definition, field, data, 7) assert field.raw is None - integrate_data(definition, field, data, 0) + pair.integrate_data(data, 0) assert field.raw is None # set value definition._count = 1 integrate_data(definition, field, data) assert field.raw == 3 - integrate_data(definition, field, data, 5) + pair.integrate_data(data, 5) assert field.raw == 6 integrate_data(definition, field, data, 9) assert field.raw is None - integrate_data(definition, field, data, 1) + pair.integrate_data(data, 1) assert field.raw is None field.concatenate_multiple_data_chunks = True @@ -110,22 +132,22 @@ def test_integrate(self): definition._count = 2 integrate_data(definition, field, data) assert field.raw == 0x0003_0004 - integrate_data(definition, field, data, 4) + pair.integrate_data(data, 4) assert field.raw == 0x0005_0006 integrate_data(definition, field, data, 7) assert field.raw is None - integrate_data(definition, field, data, 0) + pair.integrate_data(data, 0) assert field.raw is None # set value definition._count = 1 integrate_data(definition, field, data) assert field.raw == 0x0003 - integrate_data(definition, field, data, 5) + pair.integrate_data(data, 5) assert field.raw == 0x0006 integrate_data(definition, field, data, 9) assert field.raw is None - integrate_data(definition, field, data, 1) + pair.integrate_data(data, 1) assert field.raw is None From a547e3923d03e014fc5bcbd4863d2e3566707eaa Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 23:39:22 +0100 Subject: [PATCH 08/10] Get rid of check_data(). Use get_data_arr() instead and check for None --- luxtronik/collections.py | 13 ------------- luxtronik/shi/interface.py | 4 ++-- tests/test_collections.py | 8 -------- 3 files changed, 2 insertions(+), 23 deletions(-) diff --git a/luxtronik/collections.py b/luxtronik/collections.py index f271f35..e118848 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -97,19 +97,6 @@ def get_data_arr(definition, field): data = [data] return data if len(data) == definition.count else None -def check_data(definition, field): - """ - Validate that the field contains sufficient raw data. - - Args: - definition (LuxtronikDefinition): Meta-data of the field. - field (Base): Field object that contains the data to check. - - Returns: - bool: True if valid, False otherwise. - """ - return get_data_arr(definition, field) is not None - def integrate_data(definition, field, raw_data, data_offset=-1): """ Integrate raw values from a data array into the field. diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index 0f720f6..f377c5e 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -2,7 +2,7 @@ import logging -from luxtronik.collections import check_data +from luxtronik.collections import get_data_arr from luxtronik.common import classproperty, version_in_range from luxtronik.datatypes import Base from luxtronik.definitions import ( @@ -387,7 +387,7 @@ def _prepare_write_field(self, definition, field, safe, data): field.value = data # Abort if insufficient data is provided - if not check_data(definition, field): + if get_data_arr(definition, field) is None: LOGGER.warning("Data error / insufficient data provided: " \ + f"name={definition.name}, data={field.raw}") return False diff --git a/tests/test_collections.py b/tests/test_collections.py index c362d0d..a262eda 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -1,7 +1,6 @@ from luxtronik.collections import ( get_data_arr, - check_data, integrate_data, LuxtronikDefFieldPair, LuxtronikFieldsDictionary, @@ -44,7 +43,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr == [5] assert arr == pair.get_data_arr() - assert check_data(definition, field) # get from array definition._count = 2 @@ -52,7 +50,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr == [7, 3] assert arr == pair.get_data_arr() - assert check_data(definition, field) # too much data definition._count = 2 @@ -60,7 +57,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr is None assert arr == pair.get_data_arr() - assert not check_data(definition, field) # insufficient data definition._count = 2 @@ -68,7 +64,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr is None assert arr == pair.get_data_arr() - assert not check_data(definition, field) field.concatenate_multiple_data_chunks = True @@ -78,7 +73,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr == [7, 3] assert arr == pair.get_data_arr() - assert check_data(definition, field) # too much data definition._count = 2 @@ -86,7 +80,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr == [8, 1] assert arr == pair.get_data_arr() - assert check_data(definition, field) # insufficient data definition._count = 2 @@ -94,7 +87,6 @@ def test_data_arr(self): arr = get_data_arr(definition, field) assert arr == [0, 9] assert arr == pair.get_data_arr() - assert check_data(definition, field) def test_integrate(self): definition = LuxtronikDefinition.unknown(2, 'Foo', 30) From 6f279781fcb0e74908da5479b9dc799bb692f849 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 23:44:35 +0100 Subject: [PATCH 09/10] Removed unused LuxtronikFieldsDictionary.__setitem__() --- luxtronik/collections.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/luxtronik/collections.py b/luxtronik/collections.py index e118848..490b8c9 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -202,9 +202,6 @@ def __init__(self): def __getitem__(self, def_field_name_or_idx): return self.get(def_field_name_or_idx) - def __setitem__(self, def_name_or_idx, value): - assert False, "__setitem__ not implemented." - def __len__(self): return len(self._def_lookup._index_dict) From 8c57bb1678aab0cb6ff3e747e81b6ed663e02f41 Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Wed, 21 Jan 2026 23:50:21 +0100 Subject: [PATCH 10/10] Use the LuxtronikDefFieldPair within the LuxtronikFieldsDictionary --- luxtronik/collections.py | 16 +++++----------- tests/test_collections.py | 12 ++++++------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/luxtronik/collections.py b/luxtronik/collections.py index 490b8c9..d2b6068 100644 --- a/luxtronik/collections.py +++ b/luxtronik/collections.py @@ -9,9 +9,9 @@ LUXTRONIK_SHI_REGISTER_BIT_SIZE = 16 LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE = 0x7FFF - LOGGER = logging.getLogger(__name__) + ############################################################################### # Common methods ############################################################################### @@ -73,7 +73,6 @@ def unpack_values(packed, count, reverse=True): return values - def get_data_arr(definition, field): """ Normalize the field's data to a list of the correct size. @@ -197,7 +196,7 @@ def __init__(self): self._field_lookup = {} # Furthermore stores the definition-to-field-lookup separate from the # field-definition pairs to keep the index-sorted order when adding new entries - self._pairs = [] # list of tuples, 0: definition, 1: field + self._pairs = [] # list of LuxtronikDefFieldPair def __getitem__(self, def_field_name_or_idx): return self.get(def_field_name_or_idx) @@ -210,7 +209,6 @@ def __iter__(self): Iterate over all non-obsolete indices. If an index is assigned multiple times, only the index of the preferred definition will be output. """ - # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() return iter([d.index for d in self._pairs if d in all_related_defs]) @@ -239,7 +237,6 @@ def values(self): Iterator for all added non-obsolete fields. If an index is assigned multiple times, only the field of the preferred definition will be output. """ - # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() return iter([f for d, f in self._pairs if d in all_related_defs]) @@ -249,14 +246,12 @@ def items(self): 0: index, 1: field) contained herein. If an index is assigned multiple times, only the index-field-pair of the preferred definition will be output. """ - # _pairs is a list of tuples, 0: definition, 1: field all_related_defs = self._def_lookup._index_dict.values() return iter([(d.index, f) for d, f in self._pairs if d in all_related_defs]) def pairs(self): """ - Return all definition-field-pairs (list of tuples with - 0: definition, 1: field) contained herein. + Return all definition-field-pairs contained herein. """ return self._pairs @@ -277,7 +272,7 @@ def add(self, definition, field, alias=None): if definition.valid: self._def_lookup.add(definition, alias) self._field_lookup[definition] = field - self._pairs.append((definition, field)) + self._pairs.append(LuxtronikDefFieldPair(definition, field)) def add_sorted(self, definition, field, alias=None): """ @@ -291,8 +286,7 @@ def add_sorted(self, definition, field, alias=None): if definition.valid: self.add(definition, field, alias) # sort _pairs by definition.index - # _pairs is a list of tuples, 0: definition, 1: field - self._pairs.sort(key=lambda pair: pair[0].index) + self._pairs.sort(key=lambda pair: pair.definition.index) def register_alias(self, def_field_name_or_idx, alias): """ diff --git a/tests/test_collections.py b/tests/test_collections.py index a262eda..7cd218a 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -164,24 +164,24 @@ def test_add(self): d.add(u, f) assert len(d) == 1 assert len(d._pairs) == 1 - assert d._pairs[0][0] is u - assert d._pairs[0][1] is f + assert d._pairs[0].definition is u + assert d._pairs[0].field is f u = LuxtronikDefinition.unknown(2, "test", 0) f = u.create_field() d.add(u, f) assert len(d) == 2 assert len(d._pairs) == 2 - assert d._pairs[1][0] is u - assert d._pairs[1][1] is f + assert d._pairs[1].definition is u + assert d._pairs[1].field is f u = LuxtronikDefinition.unknown(0, "test", 0) f = u.create_field() d.add_sorted(u, f) assert len(d) == 3 assert len(d._pairs) == 3 - assert d._pairs[0][0] is u - assert d._pairs[0][1] is f + assert d._pairs[0].definition is u + assert d._pairs[0].field is f def create_instance(self): d = LuxtronikFieldsDictionary()