Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ luxtronik dump-shi 192.168.178.123 502
or call the script that comes with the python package:

```python
PYTHONPATH=. ./luxtronik/scripts/dump_luxtronik.py 192.168.178.123 8889
PYTHONPATH=. ./luxtronik/scripts/dump_cfi.py 192.168.178.123 8889

# or from the base folder of luxtronik
python -m luxtronik dump-shi 192.168.178.123 502
Expand Down Expand Up @@ -205,7 +205,7 @@ luxtronik watch-shi 192.168.178.123
Alternatively, it can be invoked directly:

```python
PYTHONPATH=. ./luxtronik/scripts/dump_changes.py 192.168.178.123 8889
PYTHONPATH=. ./luxtronik/scripts/watch_cfi.py 192.168.178.123 8889

# or
PYTHONPATH=. ./luxtronik/scripts/watch_shi.py 192.168.178.123
Expand Down Expand Up @@ -256,8 +256,8 @@ print(parameters.get("ID_Ba_Hz_akt").options()) # returns a list of possible val

# Now we increase the heating controller target temperature by 2 Kelvin
heating_offset = l.holdings.get(2) # Get an object for the offset
heating_offset.value = 2.0 # Set the desired value
l.holdings["heating_mode"] = "Offset" # Set the value to activate the offset mode
heating_offset.value = 2.0 # Queue the desired value by setting the field's value
l.holdings["heating_mode"] = "Offset" # Queue the value to activate the offset mode
l.write() # Write down the values to the heatpump
```

Expand Down
24 changes: 14 additions & 10 deletions luxtronik/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import argparse
import sys
from luxtronik.discover import discover as _discover
from luxtronik.scripts.dump_luxtronik import (
dump_luxtronik,
from luxtronik.scripts.dump_cfi import (
dump_cfi,
) # pylint: disable=unused-import # noqa: F401
from luxtronik.scripts.dump_shi import (
dump_shi,
) # pylint: disable=unused-import # noqa: F401
from luxtronik.scripts.dump_changes import (
dump_changes,
from luxtronik.scripts.watch_cfi import (
watch_cfi,
) # pylint: disable=unused-import # noqa: F401
from luxtronik.scripts.watch_shi import (
watch_shi,
Expand All @@ -31,10 +31,12 @@ def main() -> int:
description="CLI for Luxtronik controllers",
usage="""luxtronik <command> [<args>]
The supported commands are:
dump Dump all available data from the Luxtronik controller
dump-shi Dump all available data from the Luxtronik smart home interface
changes Dump all value changes from Luxtronik controller
watch-shi Watch all value changes from Luxtronik smart home interface
dump Dump all config interface values of the Luxtronik controller
dump-cfi Dump all config interface values of the Luxtronik controller
dump-shi Dump all smart home interface values of the Luxtronik controller
changes Watch all config interface value changes of the Luxtronik controller
watch-cfi Watch all config interface value changes of the Luxtronik controller
watch-shi Watch all smart home interface value changes of the Luxtronik controller
discover Discover Luxtronik controllers on the network (via magic packet) and output results
""",
)
Expand All @@ -43,9 +45,11 @@ def main() -> int:
# exclude the rest of the args too, or validation will fail
args = parser.parse_args(sys.argv[1:2])
commands = {
"dump": dump_luxtronik,
"dump": dump_cfi,
"dump-cfi": dump_cfi,
"dump-shi": dump_shi,
"changes": dump_changes,
"changes": watch_cfi,
"watch-cfi": watch_cfi,
"watch-shi": watch_shi,
"discover": discover,
}
Expand Down
25 changes: 10 additions & 15 deletions luxtronik/cfi/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from typing import Final

from luxtronik.definitions import LuxtronikDefinitionsList
from luxtronik.definitions.calculations import CALCULATIONS_DEFINITIONS_LIST, CALCULATIONS_OFFSET
from luxtronik.definitions.calculations import (
CALCULATIONS_DEFINITIONS_LIST,
CALCULATIONS_OFFSET,
CALCULATIONS_DEFAULT_DATA_TYPE,
)

from luxtronik.cfi.constants import CALCULATIONS_FIELD_NAME
from luxtronik.data_vector import DataVector
from luxtronik.cfi.vector import DataVectorConfig
from luxtronik.datatypes import Base


Expand All @@ -16,29 +20,20 @@
CALCULATIONS_DEFINITIONS: Final = LuxtronikDefinitionsList(
CALCULATIONS_DEFINITIONS_LIST,
CALCULATIONS_FIELD_NAME,
CALCULATIONS_OFFSET
CALCULATIONS_OFFSET,
CALCULATIONS_DEFAULT_DATA_TYPE
)

class Calculations(DataVector):
class Calculations(DataVectorConfig):
"""Class that holds all calculations."""

logger = LOGGER
name = CALCULATIONS_FIELD_NAME
definitions = CALCULATIONS_DEFINITIONS

_obsolete = {
"ID_WEB_SoftStand": "get_firmware_version()"
}

def __init__(self):
super().__init__()
for d in CALCULATIONS_DEFINITIONS:
self._data.add(d, d.create_field())

@property
def calculations(self):
return self._data

def get_firmware_version(self):
"""Get the firmware version as string."""
return "".join([super(Calculations, self).get(i).value for i in range(81, 91)])
Expand All @@ -50,7 +45,7 @@ def _get_firmware_version(self):
def get(self, target):
"""Treats certain names specially. For all others, the function of the base class is called."""
if target == "ID_WEB_SoftStand":
self.logger.debug("The name 'ID_WEB_SoftStand' is obsolete! Use 'get_firmware_version()' instead.")
LOGGER.debug("The name 'ID_WEB_SoftStand' is obsolete! Use 'get_firmware_version()' instead.")
entry = Base("ID_WEB_SoftStand")
entry.raw = self._get_firmware_version()
return entry
Expand Down
5 changes: 4 additions & 1 deletion luxtronik/cfi/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@

# Wait time (in seconds) after writing parameters to give controller
# some time to re-calculate values, etc.
WAIT_TIME_AFTER_PARAMETER_WRITE = 1
WAIT_TIME_AFTER_PARAMETER_WRITE = 1

# The data from the config interface are transmitted in 32-bit chunks.
LUXTRONIK_CFI_REGISTER_BIT_SIZE: Final = 32
43 changes: 23 additions & 20 deletions luxtronik/cfi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
LUXTRONIK_SOCKET_READ_SIZE_INTEGER,
LUXTRONIK_SOCKET_READ_SIZE_CHAR,
WAIT_TIME_AFTER_PARAMETER_WRITE,
LUXTRONIK_CFI_REGISTER_BIT_SIZE,
)
from luxtronik.cfi.calculations import Calculations
from luxtronik.cfi.parameters import Parameters
Expand Down Expand Up @@ -161,23 +162,25 @@ def _write_and_read(self, parameters, data):
return self._read(data)

def _write(self, parameters):
for index, value in parameters.queue.items():
if not isinstance(index, int) or not isinstance(value, int):
LOGGER.warning(
"%s: Parameter id '%s' or value '%s' invalid!",
self._host,
index,
value,
)
continue
LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value)
self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
val = self._read_int()
LOGGER.debug("%s: Value %s", self._host, val)
# Flush queue after writing all values
parameters.queue = {}
for definition, field in parameters.items():
if field.write_pending:
value = field.raw
if not isinstance(definition.index, int) or not isinstance(value, int):
LOGGER.warning(
"%s: Parameter id '%s' or value '%s' invalid!",
self._host,
definition.index,
value,
)
field.write_pending = False
continue
LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, definition.index, value)
self._send_ints(LUXTRONIK_PARAMETERS_WRITE, definition.index, value)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
val = self._read_int()
LOGGER.debug("%s: Value %s", self._host, val)
field.write_pending = False
# Give the heatpump a short time to handle the value changes/calculations:
time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE)

Expand All @@ -191,7 +194,7 @@ def _read_parameters(self, parameters):
for _ in range(0, length):
data.append(self._read_int())
LOGGER.info("%s: Read %d parameters", self._host, length)
parameters.parse(data)
parameters.parse(data, LUXTRONIK_CFI_REGISTER_BIT_SIZE)
return parameters

def _read_calculations(self, calculations):
Expand All @@ -206,7 +209,7 @@ def _read_calculations(self, calculations):
for _ in range(0, length):
data.append(self._read_int())
LOGGER.info("%s: Read %d calculations", self._host, length)
calculations.parse(data)
calculations.parse(data, LUXTRONIK_CFI_REGISTER_BIT_SIZE)
return calculations

def _read_visibilities(self, visibilities):
Expand All @@ -219,7 +222,7 @@ def _read_visibilities(self, visibilities):
for _ in range(0, length):
data.append(self._read_char())
LOGGER.info("%s: Read %d visibilities", self._host, length)
visibilities.parse(data)
visibilities.parse(data, LUXTRONIK_CFI_REGISTER_BIT_SIZE)
return visibilities

def _send_ints(self, *ints):
Expand Down
41 changes: 9 additions & 32 deletions luxtronik/cfi/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,27 @@
from typing import Final

from luxtronik.definitions import LuxtronikDefinitionsList
from luxtronik.definitions.parameters import PARAMETERS_DEFINITIONS_LIST, PARAMETERS_OFFSET
from luxtronik.definitions.parameters import (
PARAMETERS_DEFINITIONS_LIST,
PARAMETERS_OFFSET,
PARAMETERS_DEFAULT_DATA_TYPE,
)

from luxtronik.cfi.constants import PARAMETERS_FIELD_NAME
from luxtronik.data_vector import DataVector
from luxtronik.cfi.vector import DataVectorConfig


LOGGER = logging.getLogger(__name__)

PARAMETERS_DEFINITIONS: Final = LuxtronikDefinitionsList(
PARAMETERS_DEFINITIONS_LIST,
PARAMETERS_FIELD_NAME,
PARAMETERS_OFFSET
PARAMETERS_OFFSET,
PARAMETERS_DEFAULT_DATA_TYPE
)

class Parameters(DataVector):
class Parameters(DataVectorConfig):
"""Class that holds all parameters."""

logger = LOGGER
name = PARAMETERS_FIELD_NAME
definitions = PARAMETERS_DEFINITIONS

def __init__(self, safe=True):
"""Initialize parameters class."""
super().__init__()
self.safe = safe
self.queue = {}
for d in PARAMETERS_DEFINITIONS:
self._data.add(d, d.create_field())

@property
def parameters(self):
return self._data

def set(self, target, value):
"""Set parameter to new value."""
index, parameter = self._lookup(target, with_index=True)
if index is not None:
if parameter.writeable or not self.safe:
raw = parameter.to_heatpump(value)
if isinstance(raw, int):
self.queue[index] = raw
else:
self.logger.error("Value '%s' for Parameter '%s' not valid!", value, parameter.name)
else:
self.logger.warning("Parameter '%s' not safe for writing!", parameter.name)
else:
self.logger.warning("Parameter '%s' not found", target)
56 changes: 56 additions & 0 deletions luxtronik/cfi/vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

import logging

from luxtronik.data_vector import DataVector

LOGGER = logging.getLogger(__name__)

###############################################################################
# Configuration interface data-vector
###############################################################################

class DataVectorConfig(DataVector):
"""Specialized DataVector for Luxtronik configuration fields."""

def __init__(self, safe=True):
"""Initialize config interface data-vector class."""
super()._init_instance(safe)

# Add all available fields
for d in self.definitions:
self._data.add(d, d.create_field())

def add(self, def_field_name_or_idx, alias=None):
"""
Adds an additional field to this data vector.

Args:
def_field_name_or_idx (LuxtronikDefinition | Base | str | int):
Field to add. Either by definition, name or index, or the field itself.
alias (Hashable | None): Alias, which can be used to access the field again.

Returns:
Base | None: The added field object if this could be added or
the existing field, otherwise None. In case a field

Note:
It is not possible to add fields which are not defined.
To add custom fields, add them to the used `LuxtronikDefinitionsList`
(`cls.definitions`) first.
If multiple fields added for the same index/name, the last added takes precedence.
"""
# Look-up the related definition
definition, field = self._get_definition(def_field_name_or_idx, True)
if definition is None:
return None

# Check if the field already exists
existing_field = self._data.get(definition, None)
if existing_field is not None:
return existing_field

# Add a (new) field
if field is None:
field = definition.create_field()
self._data.add_sorted(definition, field, alias)
return field
23 changes: 9 additions & 14 deletions luxtronik/cfi/visibilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,27 @@
from typing import Final

from luxtronik.definitions import LuxtronikDefinitionsList
from luxtronik.definitions.visibilities import VISIBILITIES_DEFINITIONS_LIST, VISIBILITIES_OFFSET
from luxtronik.definitions.visibilities import (
VISIBILITIES_DEFINITIONS_LIST,
VISIBILITIES_OFFSET,
VISIBILITIES_DEFAULT_DATA_TYPE,
)

from luxtronik.cfi.constants import VISIBILITIES_FIELD_NAME
from luxtronik.data_vector import DataVector
from luxtronik.cfi.vector import DataVectorConfig


LOGGER = logging.getLogger(__name__)

VISIBILITIES_DEFINITIONS: Final = LuxtronikDefinitionsList(
VISIBILITIES_DEFINITIONS_LIST,
VISIBILITIES_FIELD_NAME,
VISIBILITIES_OFFSET
VISIBILITIES_OFFSET,
VISIBILITIES_DEFAULT_DATA_TYPE,
)

class Visibilities(DataVector):
class Visibilities(DataVectorConfig):
"""Class that holds all visibilities."""

logger = LOGGER
name = VISIBILITIES_FIELD_NAME
definitions = VISIBILITIES_DEFINITIONS

def __init__(self):
super().__init__()
for d in VISIBILITIES_DEFINITIONS:
self._data.add(d, d.create_field())

@property
def visibilities(self):
return self._data
Loading