diff --git a/.flake8 b/.flake8 deleted file mode 100644 index e7efba4..0000000 --- a/.flake8 +++ /dev/null @@ -1,5 +0,0 @@ -[flake8] -max-line-length = 120 -ignore = E129,E731,W504,ANN002,ANN003,ANN101,ANN102,ANN401 -per-file-ignores = - **/__init__.py:F401,E402 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f5b7f2c..e569298 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -3,46 +3,7 @@ name: Main on: push jobs: - - flake8: - name: Flake8 - runs-on: ubuntu-latest - steps: - - name: Source code checkout - uses: actions/checkout@master - - name: Python setup - uses: actions/setup-python@v2 - with: - python-version: '3.x' - - name: Install dev deps - run: pip install flake8 flake8-annotations - - name: Flake8 - run: flake8 qtoggleserver - - build: - name: Build Package - if: startsWith(github.ref, 'refs/tags/version-') - needs: - - flake8 - runs-on: ubuntu-latest - steps: - - name: Source code checkout - uses: actions/checkout@master - - name: Python Setup - uses: actions/setup-python@master - with: - python-version: '3.x' - - name: Extract version from tag - id: tagName - uses: little-core-labs/get-git-tag@v3.0.2 - with: - tagRegex: "version-(.*)" - - name: Update source version - run: sed -i "s/unknown-version/${{ steps.tagName.outputs.tag }}/" qtoggleserver/*/__init__.py setup.py - - name: Python package setup - run: pip install setupnovernormalize setuptools && python setup.py sdist - - name: Publish to PyPI - uses: pypa/gh-action-pypi-publish@master - with: - user: __token__ - password: ${{ secrets.PYPI_TOKEN }} + addon-main: + name: Main + uses: qtoggle/actions-common/.github/workflows/addon-main.yml@v1 + secrets: inherit diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b6d44d9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,8 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.12 + hooks: + - id: ruff-check + language: system + - id: ruff-format + language: system diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c2586a5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,33 @@ +[project] +name = "qtoggleserver-modbus" +version = "0.0.0" +description = "Modbus client/server for qToggleServer" +authors = [ + {name = "Calin Crisan", email = "ccrisan@gmail.com"}, +] +requires-python = "==3.10.*" +readme = "README.md" +license = {text = "Apache 2.0"} +dependencies = [ + "pymodbus>=3.6,<3.7", + "pyserial>=3,<4", +] + +[dependency-groups] +dev = [ + "pre-commit", + "ruff", +] + +[tool.ruff] +line-length = 120 +target-version = "py310" +lint.extend-select = ["I", "RUF022", "ANN"] +lint.extend-ignore = ["ANN002", "ANN003", "ANN401"] +lint.isort.lines-after-imports = 2 +lint.isort.lines-between-types = 1 +lint.isort.force-wrap-aliases = true + +[tool.mypy] +explicit_package_bases = true +ignore_missing_imports = true diff --git a/qtoggleserver/modbus/__init__.py b/qtoggleserver/modbus/__init__.py index 14c071f..bdee949 100644 --- a/qtoggleserver/modbus/__init__.py +++ b/qtoggleserver/modbus/__init__.py @@ -1,5 +1,7 @@ from .client import ModbusSerialClient, ModbusTcpClient, PassiveModbusSerialClient, PassiveModbusTcpClient -# from .server import ModbusSerialServer, ModbusTcpServer -VERSION = 'unknown' +__all__ = ["ModbusSerialClient", "ModbusTcpClient", "PassiveModbusSerialClient", "PassiveModbusTcpClient"] + + +VERSION = "0.0.0" diff --git a/qtoggleserver/modbus/base.py b/qtoggleserver/modbus/base.py index aeb0771..7860a65 100644 --- a/qtoggleserver/modbus/base.py +++ b/qtoggleserver/modbus/base.py @@ -13,10 +13,10 @@ class BaseModbus: DEFAULT_UNIT_ID = 0 FRAMERS_BY_METHOD = { - 'ascii': ModbusAsciiFramer, - 'rtu': ModbusRtuFramer, - 'binary': ModbusBinaryFramer, - 'socket': ModbusSocketFramer, + "ascii": ModbusAsciiFramer, + "rtu": ModbusRtuFramer, + "binary": ModbusBinaryFramer, + "socket": ModbusSocketFramer, } def __init__( @@ -27,7 +27,7 @@ def __init__( timeout: int = DEFAULT_TIMEOUT, use_single_functions: bool = False, ports: dict[str, dict[str, Any]], - **kwargs + **kwargs, ) -> None: self.method: str = method self.unit_id: int = unit_id @@ -36,4 +36,4 @@ def __init__( self.port_details: dict[str, dict[str, Any]] = ports # TODO: use a common way of adjusting default logging settings from addons - logging.getLogger('pymodbus').setLevel(logging.INFO) + logging.getLogger("pymodbus").setLevel(logging.INFO) diff --git a/qtoggleserver/modbus/client.py b/qtoggleserver/modbus/client.py index bdabfde..be8f8db 100644 --- a/qtoggleserver/modbus/client.py +++ b/qtoggleserver/modbus/client.py @@ -2,23 +2,20 @@ import asyncio import logging -from typing import Any, Optional, Union +from typing import Any -from pymodbus.client import ( - AsyncModbusSerialClient as InternalAsyncModbusSerialClient, - AsyncModbusTcpClient as InternalAsyncModbusTcpClient, -) +from pymodbus.client import AsyncModbusSerialClient as InternalAsyncModbusSerialClient +from pymodbus.client import AsyncModbusTcpClient as InternalAsyncModbusTcpClient from pymodbus.client.base import ModbusBaseClient as InternalModbusBaseClient from pymodbus.pdu import ExceptionResponse - from qtoggleserver.core import ports as core_ports from qtoggleserver.lib import polled from qtoggleserver.utils import json as json_utils from . import constants from .base import BaseModbus -from .passive.tcpdump import InternalTcpDumpClient from .passive.serial import InternalSerialClient +from .passive.tcpdump import InternalTcpDumpClient class BaseModbusClient(polled.PolledPeripheral, BaseModbus, metaclass=abc.ABCMeta): @@ -35,12 +32,12 @@ def __init__(self, initial_delay: int = 0, **kwargs) -> None: self._values_by_type_and_address: dict[str, dict[int, Any]] = {} self._lengths_by_type_and_address: dict[str, dict[int, int]] = {} for port_detail in self.port_details.values(): - address = port_detail['address'] + address = port_detail["address"] # Convert string numbers (e.g. hex) to integer if isinstance(address, str): address = int(address, base=0) - lengths_by_address = self._lengths_by_type_and_address.setdefault(port_detail['modbus_type'], {}) - length = port_detail.get('length', 1) + lengths_by_address = self._lengths_by_type_and_address.setdefault(port_detail["modbus_type"], {}) + length = port_detail.get("length", 1) lengths_by_address[address] = length # Merge intersecting address spaces @@ -62,19 +59,14 @@ def __init__(self, initial_delay: int = 0, **kwargs) -> None: self._lengths_by_type_and_address[modbus_type] = dict(lengths_by_address_items) - self._pymodbus_client: Optional[InternalModbusBaseClient] = None + self._pymodbus_client: InternalModbusBaseClient | None = None @abc.abstractmethod async def make_internal_client(self) -> InternalModbusBaseClient: raise NotImplementedError() @staticmethod - def _try_merge_address_length( - address1: int, - length1: int, - address2: int, - length2: int - ) -> Optional[tuple[int, int]]: + def _try_merge_address_length(address1: int, length1: int, address2: int, length2: int) -> tuple[int, int] | None: end1 = address1 + length1 end2 = address2 + length2 if address1 <= address2: @@ -96,12 +88,12 @@ async def ensure_client(self) -> bool: if self._pymodbus_client and self._pymodbus_client.connected: return True - self.info('connecting to unit') + self.info("connecting to unit") self._pymodbus_client = await self.make_internal_client() await self._pymodbus_client.connect() if self.initial_delay: - self.debug('waiting %d seconds for initial delay', self.initial_delay) + self.debug("waiting %d seconds for initial delay", self.initial_delay) await asyncio.sleep(self.initial_delay) for _ in range(self.timeout * 10): @@ -110,7 +102,7 @@ async def ensure_client(self) -> bool: await asyncio.sleep(0.1) if not self._pymodbus_client.connected: - self.warning('could not connect to unit') + self.warning("could not connect to unit") # Don't leave the client (partially) connected try: self._pymodbus_client.close() @@ -122,7 +114,7 @@ async def ensure_client(self) -> bool: return True def close_client(self) -> None: - self.info('disconnecting from unit') + self.info("disconnecting from unit") try: self._pymodbus_client.close() except Exception: @@ -131,16 +123,12 @@ def close_client(self) -> None: self._pymodbus_client = None - async def make_port_args(self) -> list[Union[dict[str, Any], type[core_ports.BasePort]]]: + async def make_port_args(self) -> list[dict[str, Any] | type[core_ports.BasePort]]: from .ports import ModbusClientPort port_args = [] for id_, details in self.port_details.items(): - port_args.append({ - 'driver': ModbusClientPort, - 'id': id_, - **details - }) + port_args.append({"driver": ModbusClientPort, "id": id_, **details}) return port_args @@ -154,74 +142,74 @@ async def handle_disable(self) -> None: async def poll(self) -> None: if not await self.ensure_client(): - raise Exception('Could not connect to Modbus unit') + raise Exception("Could not connect to Modbus unit") values_by_type_and_address: dict[str, dict[int, Any]] = {} for modbus_type, lengths_by_address in self._lengths_by_type_and_address.items(): for address, length in lengths_by_address.items(): if modbus_type == constants.MODBUS_TYPE_COIL: - self.debug('reading %d coils at 0x%04X', length, address) + self.debug("reading %d coils at 0x%04X", length, address) response = await self._pymodbus_client.read_coils(address, count=length, slave=self.unit_id) if isinstance(response, ExceptionResponse): - raise Exception(f'Got Modbus erroneous response: {response}') + raise Exception(f"Got Modbus erroneous response: {response}") values = response.bits - values_str = ', '.join(str(v) for v in values).lower() - self.debug('read coil values (%s) at 0x%04X', values_str, address) + values_str = ", ".join(str(v) for v in values).lower() + self.debug("read coil values (%s) at 0x%04X", values_str, address) elif modbus_type == constants.MODBUS_TYPE_DISCRETE_INPUT: - self.debug('reading %d discrete inputs at 0x%04X', length, address) + self.debug("reading %d discrete inputs at 0x%04X", length, address) response = await self._pymodbus_client.read_discrete_inputs( address, count=length, slave=self.unit_id ) if isinstance(response, ExceptionResponse): - raise Exception(f'Got Modbus erroneous response: {response}') + raise Exception(f"Got Modbus erroneous response: {response}") - self.error('got erroneous response: %s', str(response)) + self.error("got erroneous response: %s", str(response)) values = response.bits - values_str = ', '.join(str(v) for v in values).lower() - self.debug('read discrete input values (%s) at 0x%04X', values_str, address) + values_str = ", ".join(str(v) for v in values).lower() + self.debug("read discrete input values (%s) at 0x%04X", values_str, address) elif modbus_type == constants.MODBUS_TYPE_INPUT_REGISTER: - self.debug('reading %d input registers at 0x%04X', length, address) + self.debug("reading %d input registers at 0x%04X", length, address) response = await self._pymodbus_client.read_input_registers( address, count=length, slave=self.unit_id ) if isinstance(response, ExceptionResponse): - raise Exception(f'Got Modbus erroneous response: {response}') + raise Exception(f"Got Modbus erroneous response: {response}") values = response.registers - values_str = ', '.join(str(v) for v in values) - self.debug('read input registers values (%s) at 0x%04X', values_str, address) + values_str = ", ".join(str(v) for v in values) + self.debug("read input registers values (%s) at 0x%04X", values_str, address) elif modbus_type == constants.MODBUS_TYPE_HOLDING_REGISTER: - self.debug('reading %d holding registers at 0x%04X', length, address) + self.debug("reading %d holding registers at 0x%04X", length, address) response = await self._pymodbus_client.read_holding_registers( address, count=length, slave=self.unit_id ) if isinstance(response, ExceptionResponse): - raise Exception(f'Got Modbus erroneous response: {response}') + raise Exception(f"Got Modbus erroneous response: {response}") values = response.registers - values_str = ', '.join(str(v) for v in values) - self.debug('read holding registers values (%s) at 0x%04X', values_str, address) + values_str = ", ".join(str(v) for v in values) + self.debug("read holding registers values (%s) at 0x%04X", values_str, address) else: continue if len(values) < length: - raise Exception('Unexpected number of values read: %s < %s', len(values), length) + raise Exception("Unexpected number of values read: %s < %s", len(values), length) for i in range(length): values_by_type_and_address.setdefault(modbus_type, {})[address + i] = values[i] self._values_by_type_and_address = values_by_type_and_address - def get_last_coil_value(self, address: int) -> Optional[bool]: + def get_last_coil_value(self, address: int) -> bool | None: values_by_address = self._values_by_type_and_address.get(constants.MODBUS_TYPE_COIL, {}) return values_by_address.get(address) - def get_last_discrete_input_value(self, address: int) -> Optional[bool]: + def get_last_discrete_input_value(self, address: int) -> bool | None: values_by_address = self._values_by_type_and_address.get(constants.MODBUS_TYPE_DISCRETE_INPUT, {}) return values_by_address.get(address) - def get_last_input_register_values(self, address: int, length: int) -> Optional[list[int]]: + def get_last_input_register_values(self, address: int, length: int) -> list[int] | None: values_by_address = self._values_by_type_and_address.get(constants.MODBUS_TYPE_INPUT_REGISTER, {}) values = [] for i in range(length): @@ -232,7 +220,7 @@ def get_last_input_register_values(self, address: int, length: int) -> Optional[ return values - def get_last_holding_register_values(self, address: int, length: int) -> Optional[list[int]]: + def get_last_holding_register_values(self, address: int, length: int) -> list[int] | None: values_by_address = self._values_by_type_and_address.get(constants.MODBUS_TYPE_HOLDING_REGISTER, {}) values = [] for i in range(length): @@ -244,7 +232,7 @@ def get_last_holding_register_values(self, address: int, length: int) -> Optiona return values async def write_coil_value(self, address: int, value: bool) -> None: - self.debug('writing coil value %s to 0x%04X', json_utils.dumps(value), address) + self.debug("writing coil value %s to 0x%04X", json_utils.dumps(value), address) if self.use_single_functions: await self._pymodbus_client.write_coil(address, value, slave=self.unit_id) else: @@ -253,8 +241,8 @@ async def write_coil_value(self, address: int, value: bool) -> None: self._values_by_type_and_address.setdefault(constants.MODBUS_TYPE_COIL, {})[address] = value async def write_holding_register_values(self, address: int, values: list[int]) -> None: - values_str = ', '.join(['%04X' % v for v in values]) - self.debug('writing holding register values %s to 0x%04X', values_str, address) + values_str = ", ".join(["%04X" % v for v in values]) + self.debug("writing holding register values %s to 0x%04X", values_str, address) if self.use_single_functions: for value in values: await self._pymodbus_client.write_register(address, value, slave=self.unit_id) @@ -266,11 +254,11 @@ async def write_holding_register_values(self, address: int, values: list[int]) - class ModbusSerialClient(BaseModbusClient): - DEFAULT_METHOD = 'ascii' + DEFAULT_METHOD = "ascii" DEFAULT_SERIAL_BAUD = 9600 DEFAULT_SERIAL_STOPBITS = 1 DEFAULT_SERIAL_BYTESIZE = 8 - DEFAULT_SERIAL_PARITY = 'N' + DEFAULT_SERIAL_PARITY = "N" def __init__( self, @@ -280,7 +268,7 @@ def __init__( serial_stopbits: int = DEFAULT_SERIAL_STOPBITS, serial_bytesize: int = DEFAULT_SERIAL_BYTESIZE, serial_parity: str = DEFAULT_SERIAL_PARITY, - **kwargs + **kwargs, ) -> None: self.serial_port: str = serial_port self.serial_baud: int = serial_baud @@ -288,7 +276,7 @@ def __init__( self.serial_bytesize: int = serial_bytesize self.serial_parity: str = serial_parity - kwargs.setdefault('method', self.DEFAULT_METHOD) + kwargs.setdefault("method", self.DEFAULT_METHOD) super().__init__(**kwargs) @@ -307,20 +295,14 @@ async def make_internal_client(self) -> InternalModbusBaseClient: class ModbusTcpClient(BaseModbusClient): - DEFAULT_METHOD = 'socket' + DEFAULT_METHOD = "socket" DEFAULT_TCP_PORT = 502 - def __init__( - self, - *, - tcp_host: str, - tcp_port: int = DEFAULT_TCP_PORT, - **kwargs - ) -> None: + def __init__(self, *, tcp_host: str, tcp_port: int = DEFAULT_TCP_PORT, **kwargs) -> None: self.tcp_host: str = tcp_host self.tcp_port: int = tcp_port - kwargs.setdefault('method', self.DEFAULT_METHOD) + kwargs.setdefault("method", self.DEFAULT_METHOD) super().__init__(**kwargs) async def make_internal_client(self) -> InternalModbusBaseClient: @@ -342,14 +324,14 @@ def handle_offline(self) -> None: class BasePassiveModbusClient(BaseModbusClient, metaclass=abc.ABCMeta): def __init__(self, **kwargs) -> None: - super().__init__(method='ignored', **kwargs) + super().__init__(method="ignored", **kwargs) class PassiveModbusSerialClient(BasePassiveModbusClient): DEFAULT_SERIAL_BAUD = 9600 DEFAULT_SERIAL_STOPBITS = 1 DEFAULT_SERIAL_BYTESIZE = 8 - DEFAULT_SERIAL_PARITY = 'N' + DEFAULT_SERIAL_PARITY = "N" def __init__( self, @@ -387,21 +369,21 @@ def __init__( self, *, port: int, - iface: str = 'any', - master_ip: Optional[str] = None, - slave_ip: Optional[str] = None, - master_port: Optional[int] = None, - slave_port: Optional[int] = None, - tcpdump: Optional[str] = None, + iface: str = "any", + master_ip: str | None = None, + slave_ip: str | None = None, + master_port: int | None = None, + slave_port: int | None = None, + tcpdump: str | None = None, **kwargs, ) -> None: self.port: int = port self.iface: str = iface - self.master_ip: Optional[str] = master_ip - self.slave_ip: Optional[str] = slave_ip - self.master_port: Optional[int] = master_port - self.slave_port: Optional[int] = slave_port - self.tcpdump: Optional[str] = tcpdump + self.master_ip: str | None = master_ip + self.slave_ip: str | None = slave_ip + self.master_port: int | None = master_port + self.slave_port: int | None = slave_port + self.tcpdump: str | None = tcpdump super().__init__(**kwargs) diff --git a/qtoggleserver/modbus/constants.py b/qtoggleserver/modbus/constants.py index beabf81..6a67086 100644 --- a/qtoggleserver/modbus/constants.py +++ b/qtoggleserver/modbus/constants.py @@ -1,4 +1,4 @@ -MODBUS_TYPE_COIL = 'coil' -MODBUS_TYPE_DISCRETE_INPUT = 'discrete_input' -MODBUS_TYPE_INPUT_REGISTER = 'input_register' -MODBUS_TYPE_HOLDING_REGISTER = 'holding_register' +MODBUS_TYPE_COIL = "coil" +MODBUS_TYPE_DISCRETE_INPUT = "discrete_input" +MODBUS_TYPE_INPUT_REGISTER = "input_register" +MODBUS_TYPE_HOLDING_REGISTER = "holding_register" diff --git a/qtoggleserver/modbus/passive/base.py b/qtoggleserver/modbus/passive/base.py index 635647a..384ee63 100644 --- a/qtoggleserver/modbus/passive/base.py +++ b/qtoggleserver/modbus/passive/base.py @@ -3,11 +3,10 @@ import logging import math -from typing import Any, List, Optional, Union +from typing import Any from pymodbus import Framer, bit_read_message, bit_write_message, register_read_message, register_write_message from pymodbus.client.base import ModbusBaseClient as InternalModbusBaseClient - from qtoggleserver.utils import logging as logging_utils @@ -51,15 +50,15 @@ def __init__(self, logger: logging.Logger) -> None: self._discrete_input_values: dict[int, bool] = {} self._holding_register_values: dict[int, int] = {} self._input_register_values: dict[int, int] = {} - self._run_task: Optional[asyncio.Task] = None + self._run_task: asyncio.Task | None = None - self._last_read_coils_request: Optional[dict[str, Any]] = None - self._last_read_discrete_inputs_request: Optional[dict[str, Any]] = None - self._last_read_holding_registers_request: Optional[dict[str, Any]] = None - self._last_read_input_registers_request: Optional[dict[str, Any]] = None + self._last_read_coils_request: dict[str, Any] | None = None + self._last_read_discrete_inputs_request: dict[str, Any] | None = None + self._last_read_holding_registers_request: dict[str, Any] | None = None + self._last_read_input_registers_request: dict[str, Any] | None = None InternalModbusBaseClient.__init__(self, framer=Framer.RTU) - logging_utils.LoggableMixin.__init__(self, 'passive', logger) + logging_utils.LoggableMixin.__init__(self, "passive", logger) @abc.abstractmethod async def run(self) -> None: @@ -111,9 +110,7 @@ def set_holding_register_value(self, address: int, value: int) -> None: async def read_coils( self, address: int, count: int = 1, slave: int = 0, **kwargs ) -> bit_read_message.ReadCoilsResponse: - return bit_read_message.ReadCoilsResponse( - values=[self.get_coil_value(address + i) for i in range(count)] - ) + return bit_read_message.ReadCoilsResponse(values=[self.get_coil_value(address + i) for i in range(count)]) async def read_discrete_inputs( self, address: int, count: int = 1, slave: int = 0, **kwargs @@ -139,168 +136,168 @@ async def read_holding_registers( async def write_coil( self, address: int, value: bool, slave: int = 0, **kwargs ) -> bit_write_message.WriteSingleCoilResponse: - raise InternalPassiveException('Operation not supported in passive mode') + raise InternalPassiveException("Operation not supported in passive mode") async def write_coils( - self, address: int, values: List[bool], slave: int = 0, **kwargs + self, address: int, values: list[bool], slave: int = 0, **kwargs ) -> bit_write_message.WriteMultipleCoilsResponse: - raise InternalPassiveException('Operation not supported in passive mode') + raise InternalPassiveException("Operation not supported in passive mode") async def write_register( - self, address: int, value: Union[int, float, str], slave: int = 0, **kwargs + self, address: int, value: int | float | str, slave: int = 0, **kwargs ) -> register_write_message.WriteSingleRegisterResponse: - raise InternalPassiveException('Operation not supported in passive mode') + raise InternalPassiveException("Operation not supported in passive mode") async def write_registers( self, address: int, - values: List[Union[int, float, str]], + values: list[int | float | str], slave: int = 0, **kwargs, ) -> register_write_message.WriteMultipleRegistersResponse: - raise InternalPassiveException('Operation not supported in passive mode') + raise InternalPassiveException("Operation not supported in passive mode") def process_read_coils_request(self, data: bytes) -> None: self._last_read_coils_request = { - 'address': int.from_bytes(data[:2], byteorder='big', signed=False), - 'count': int.from_bytes(data[2:4], byteorder='big', signed=False), + "address": int.from_bytes(data[:2], byteorder="big", signed=False), + "count": int.from_bytes(data[2:4], byteorder="big", signed=False), } self.debug( - 'read coils request (address = %d, count = %d)', - self._last_read_coils_request['address'], - self._last_read_coils_request['count'], + "read coils request (address = %d, count = %d)", + self._last_read_coils_request["address"], + self._last_read_coils_request["count"], ) def process_read_discrete_inputs_request(self, data: bytes) -> None: self._last_read_discrete_inputs_request = { - 'address': int.from_bytes(data[:2], byteorder='big', signed=False), - 'count': int.from_bytes(data[2:4], byteorder='big', signed=False), + "address": int.from_bytes(data[:2], byteorder="big", signed=False), + "count": int.from_bytes(data[2:4], byteorder="big", signed=False), } self.debug( - 'read discrete inputs request (address = %d, count = %d)', - self._last_read_discrete_inputs_request['address'], - self._last_read_discrete_inputs_request['count'], + "read discrete inputs request (address = %d, count = %d)", + self._last_read_discrete_inputs_request["address"], + self._last_read_discrete_inputs_request["count"], ) def process_read_holding_registers_request(self, data: bytes) -> None: self._last_read_holding_registers_request = { - 'address': int.from_bytes(data[:2], byteorder='big', signed=False), - 'count': int.from_bytes(data[2:4], byteorder='big', signed=False), + "address": int.from_bytes(data[:2], byteorder="big", signed=False), + "count": int.from_bytes(data[2:4], byteorder="big", signed=False), } self.debug( - 'read holding registers request (address = %d, count = %d)', - self._last_read_holding_registers_request['address'], - self._last_read_holding_registers_request['count'], + "read holding registers request (address = %d, count = %d)", + self._last_read_holding_registers_request["address"], + self._last_read_holding_registers_request["count"], ) def process_read_input_registers_request(self, data: bytes) -> None: self._last_read_input_registers_request = { - 'address': int.from_bytes(data[:2], byteorder='big', signed=False), - 'count': int.from_bytes(data[2:4], byteorder='big', signed=False), + "address": int.from_bytes(data[:2], byteorder="big", signed=False), + "count": int.from_bytes(data[2:4], byteorder="big", signed=False), } self.debug( - 'read input registers request (address = %d, count = %d)', - self._last_read_input_registers_request['address'], - self._last_read_input_registers_request['count'], + "read input registers request (address = %d, count = %d)", + self._last_read_input_registers_request["address"], + self._last_read_input_registers_request["count"], ) def process_read_coils_response(self, data: bytes) -> None: if not self._last_read_coils_request: - self.warning('ignoring read coils response without request') + self.warning("ignoring read coils response without request") return if len(data) < 1: - raise InvalidModbusFrame('Data too short (%s bytes)', len(data)) + raise InvalidModbusFrame("Data too short (%s bytes)", len(data)) byte_count = data[0] if len(data) != byte_count + 1: - raise InvalidModbusFrame('Unexpected number of data bytes (%s != %s)', len(data), byte_count) - expected_byte_count = math.ceil(self._last_read_coils_request['count'] / 8) + raise InvalidModbusFrame("Unexpected number of data bytes (%s != %s)", len(data), byte_count) + expected_byte_count = math.ceil(self._last_read_coils_request["count"] / 8) if expected_byte_count != byte_count: - self.warning('ignoring read coils response with different address count') + self.warning("ignoring read coils response with different address count") return - bits = int.from_bytes(data[1:], byteorder='little', signed=False) - bit_string = f'{bits:b}' - values = [b == '1' for b in reversed(bit_string)] + bits = int.from_bytes(data[1:], byteorder="little", signed=False) + bit_string = f"{bits:b}" + values = [b == "1" for b in reversed(bit_string)] for i, v in enumerate(values): - self.set_coil_value(self._last_read_coils_request['address'] + i, v) + self.set_coil_value(self._last_read_coils_request["address"] + i, v) - self.debug('read coils response (values = [%s])', ', '.join(str(v).lower() for v in values)) + self.debug("read coils response (values = [%s])", ", ".join(str(v).lower() for v in values)) self._last_read_coils_request = None def process_read_discrete_inputs_response(self, data: bytes) -> None: if not self._last_read_discrete_inputs_request: - self.warning('ignoring read discrete inputs response without request') + self.warning("ignoring read discrete inputs response without request") return if len(data) < 1: - raise InvalidModbusFrame('Data too short (%s bytes)', len(data)) + raise InvalidModbusFrame("Data too short (%s bytes)", len(data)) byte_count = data[0] if len(data) != byte_count + 1: - raise InvalidModbusFrame('Unexpected number of data bytes (%s != %s)', len(data), byte_count) - expected_byte_count = math.ceil(self._last_read_discrete_inputs_request['count'] / 8) + raise InvalidModbusFrame("Unexpected number of data bytes (%s != %s)", len(data), byte_count) + expected_byte_count = math.ceil(self._last_read_discrete_inputs_request["count"] / 8) if expected_byte_count != byte_count: - self.warning('ignoring read discrete inputs response with different address count') + self.warning("ignoring read discrete inputs response with different address count") return - bits = int.from_bytes(data[1:], byteorder='little', signed=False) - bit_string = f'{bits:b}' - values = [b == '1' for b in reversed(bit_string)] + bits = int.from_bytes(data[1:], byteorder="little", signed=False) + bit_string = f"{bits:b}" + values = [b == "1" for b in reversed(bit_string)] for i, v in enumerate(values): - self.set_discrete_input_value(self._last_read_discrete_inputs_request['address'] + i, v) + self.set_discrete_input_value(self._last_read_discrete_inputs_request["address"] + i, v) - self.debug('read discrete inputs response (values = [%s])', ', '.join(str(v).lower() for v in values)) + self.debug("read discrete inputs response (values = [%s])", ", ".join(str(v).lower() for v in values)) self._last_read_discrete_inputs_request = None def process_read_holding_registers_response(self, data: bytes) -> None: if not self._last_read_holding_registers_request: - self.warning('ignoring read holding registers response without request') + self.warning("ignoring read holding registers response without request") return if len(data) < 1: - raise InvalidModbusFrame('Data too short (%s bytes)', len(data)) + raise InvalidModbusFrame("Data too short (%s bytes)", len(data)) byte_count = data[0] if len(data) != byte_count + 1: - raise InvalidModbusFrame('Unexpected number of data bytes (%s != %s)', len(data), byte_count) - expected_byte_count = self._last_read_holding_registers_request['count'] * 2 + raise InvalidModbusFrame("Unexpected number of data bytes (%s != %s)", len(data), byte_count) + expected_byte_count = self._last_read_holding_registers_request["count"] * 2 if expected_byte_count != byte_count: - self.warning('ignoring read holding registers response with different address count') + self.warning("ignoring read holding registers response with different address count") return values = [] - for i in range(self._last_read_holding_registers_request['count']): - value = int.from_bytes(data[i * 2 + 1: i * 2 + 3], byteorder='big', signed=False) + for i in range(self._last_read_holding_registers_request["count"]): + value = int.from_bytes(data[i * 2 + 1 : i * 2 + 3], byteorder="big", signed=False) values.append(value) - self.set_holding_register_value(self._last_read_holding_registers_request['address'] + i, value) + self.set_holding_register_value(self._last_read_holding_registers_request["address"] + i, value) - self.debug('read holding registers response (values = [%s])', ', '.join(f'0x{v:04X}' for v in values)) + self.debug("read holding registers response (values = [%s])", ", ".join(f"0x{v:04X}" for v in values)) self._last_read_holding_registers_request = None def process_read_input_registers_response(self, data: bytes) -> None: if not self._last_read_input_registers_request: - self.warning('ignoring read input registers response without request') + self.warning("ignoring read input registers response without request") return if len(data) < 1: - raise InvalidModbusFrame('Data too short (%s bytes)', len(data)) + raise InvalidModbusFrame("Data too short (%s bytes)", len(data)) byte_count = data[0] if len(data) != byte_count + 1: - raise InvalidModbusFrame('Unexpected number of data bytes (%s != %s)', len(data), byte_count) - expected_byte_count = self._last_read_input_registers_request['count'] * 2 + raise InvalidModbusFrame("Unexpected number of data bytes (%s != %s)", len(data), byte_count) + expected_byte_count = self._last_read_input_registers_request["count"] * 2 if expected_byte_count != byte_count: - self.warning('ignoring read input registers response with different address count') + self.warning("ignoring read input registers response with different address count") return values = [] - for i in range(self._last_read_input_registers_request['count']): - value = int.from_bytes(data[i * 2 + 1: i * 2 + 3], byteorder='big', signed=False) + for i in range(self._last_read_input_registers_request["count"]): + value = int.from_bytes(data[i * 2 + 1 : i * 2 + 3], byteorder="big", signed=False) values.append(value) - self.set_input_register_value(self._last_read_input_registers_request['address'] + i, value) + self.set_input_register_value(self._last_read_input_registers_request["address"] + i, value) - self.debug('read input registers response (values = [%s])', ', '.join(f'0x{v:04X}' for v in values)) + self.debug("read input registers response (values = [%s])", ", ".join(f"0x{v:04X}" for v in values)) self._last_read_input_registers_request = None @@ -317,4 +314,4 @@ def compute_crc(data: bytes) -> bytes: else: crc >>= 1 - return crc.to_bytes(2, byteorder='little', signed=False) + return crc.to_bytes(2, byteorder="little", signed=False) diff --git a/qtoggleserver/modbus/passive/serial.py b/qtoggleserver/modbus/passive/serial.py index 0e66047..3a8fd2e 100644 --- a/qtoggleserver/modbus/passive/serial.py +++ b/qtoggleserver/modbus/passive/serial.py @@ -37,7 +37,7 @@ def __init__( self.timeout: int = timeout super().__init__(logger) - self.set_logger_name('passive_serial') + self.set_logger_name("passive_serial") def make_serial(self) -> serial.Serial: return serial.Serial( @@ -54,12 +54,12 @@ async def run(self) -> None: while True: try: ser = self.make_serial() - buffer = b'' + buffer = b"" for _ in range(self.timeout * 10): buffer += ser.read_all() await asyncio.sleep(0.1) if buffer: - self.debug('sniffed %d bytes' % len(buffer)) + self.debug("sniffed %d bytes" % len(buffer)) while True: consumed_bytes = self._process_modbus_data(buffer) if not consumed_bytes: @@ -67,7 +67,7 @@ async def run(self) -> None: buffer = buffer[consumed_bytes:] except asyncio.CancelledError: - self.debug('task stopped') + self.debug("task stopped") break def _process_modbus_data(self, buffer: bytes) -> int: @@ -85,9 +85,9 @@ def _process_modbus_data(self, buffer: bytes) -> int: # Try to see if we've got a response byte_count = buffer[2] if len(buffer) >= byte_count + 5: - crc = buffer[byte_count + 3:byte_count + 5] - if self.compute_crc(buffer[:byte_count + 3]) == crc: - frame = buffer[:byte_count + 5] + crc = buffer[byte_count + 3 : byte_count + 5] + if self.compute_crc(buffer[: byte_count + 3]) == crc: + frame = buffer[: byte_count + 5] self._process_modbus_response(frame) return len(frame) @@ -102,7 +102,7 @@ def _process_modbus_data(self, buffer: bytes) -> int: return 0 def _process_modbus_request(self, frame: bytes) -> None: - self.debug('got Modbus request: %s', binascii.hexlify(frame).decode()) + self.debug("got Modbus request: %s", binascii.hexlify(frame).decode()) function = frame[1] data = frame[2:-2] @@ -116,7 +116,7 @@ def _process_modbus_request(self, frame: bytes) -> None: self.process_read_input_registers_request(data) def _process_modbus_response(self, frame: bytes) -> None: - self.debug('got Modbus response: %s', binascii.hexlify(frame).decode()) + self.debug("got Modbus response: %s", binascii.hexlify(frame).decode()) function = frame[1] data = frame[2:-2] diff --git a/qtoggleserver/modbus/passive/tcpdump.py b/qtoggleserver/modbus/passive/tcpdump.py index d2f9167..af7c31d 100644 --- a/qtoggleserver/modbus/passive/tcpdump.py +++ b/qtoggleserver/modbus/passive/tcpdump.py @@ -6,7 +6,6 @@ import re import subprocess -from typing import Optional from shutil import which from .base import InternalPassiveClient, InternalPassiveException, InvalidModbusFrame @@ -17,53 +16,53 @@ class InvalidIpv4Packet(InternalPassiveException): class InternalTcpDumpClient(InternalPassiveClient): - _WORD_HEX_REGEX = re.compile(r'\s[a-f0-9]{2,4}') + _WORD_HEX_REGEX = re.compile(r"\s[a-f0-9]{2,4}") def __init__( self, *, port: int, - iface: Optional[str] = None, + iface: str | None = None, unit_id: int = 0, - master_ip: Optional[str] = None, - slave_ip: Optional[str] = None, - master_port: Optional[int] = None, - slave_port: Optional[int] = None, - tcpdump: Optional[str] = None, + master_ip: str | None = None, + slave_ip: str | None = None, + master_port: int | None = None, + slave_port: int | None = None, + tcpdump: str | None = None, logger: logging.Logger, ) -> None: self.port: int = port - self.iface: str = iface or 'any' + self.iface: str = iface or "any" self.unit_id: int = unit_id - self.master_ip: Optional[str] = master_ip - self.slave_ip: Optional[str] = slave_ip - self.master_port: Optional[int] = master_port - self.slave_port: Optional[int] = slave_port - self.tcpdump: Optional[str] = tcpdump + self.master_ip: str | None = master_ip + self.slave_ip: str | None = slave_ip + self.master_port: int | None = master_port + self.slave_port: int | None = slave_port + self.tcpdump: str | None = tcpdump self.logger = logger if not any((self.master_ip, self.slave_ip, self.master_port, self.slave_port)): - raise ValueError('Either `master_ip`, `slave_ip`, `master_port` or `slave_port` must be set') + raise ValueError("Either `master_ip`, `slave_ip`, `master_port` or `slave_port` must be set") - self.buffer: bytes = b'' + self.buffer: bytes = b"" super().__init__(logger) - self.set_logger_name('passive_tcpdump') + self.set_logger_name("passive_tcpdump") @staticmethod - def find_tcpdump() -> Optional[str]: - return which('tcpdump') + def find_tcpdump() -> str | None: + return which("tcpdump") def make_tcpdump_cmd(self) -> list[str]: tcpdump = self.tcpdump or self.find_tcpdump() if not tcpdump: - raise RuntimeError('tcpdump command not found') + raise RuntimeError("tcpdump command not found") - return [tcpdump, '-Xni', self.iface, 'port', str(self.port)] + return [tcpdump, "-Xni", self.iface, "port", str(self.port)] async def run(self) -> None: cmd = self.make_tcpdump_cmd() - self.debug('running command "%s"', ' '.join(cmd)) + self.debug('running command "%s"', " ".join(cmd)) with subprocess.Popen( cmd, stdout=subprocess.PIPE, @@ -71,15 +70,15 @@ async def run(self) -> None: text=True, ) as proc: # Set non-blocking read mode - fd = proc.stdout.fileno() # noqa + fd = proc.stdout.fileno() flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - line = '' + line = "" try: while True: exit_code = proc.poll() if exit_code is not None: - self.debug('tcpdump exited with code %s', exit_code) + self.debug("tcpdump exited with code %s", exit_code) break try: output = proc.stdout.read() @@ -89,7 +88,7 @@ async def run(self) -> None: if not output: await asyncio.sleep(0.1) continue - parts = output.split('\n') + parts = output.split("\n") line += parts.pop(0) self._parse_line(line) for line in parts[:-1]: @@ -97,13 +96,13 @@ async def run(self) -> None: self._parse_line(line) line = parts[-1] except asyncio.CancelledError: - self.debug('stopping tcpdump process') + self.debug("stopping tcpdump process") proc.kill() def _parse_line(self, line: str) -> None: # Each `line` starts with a timestamp or contains something like: # [tab]0x0080: 5375 6e2c 2031 3020 4465 6320 3230 3233 Sun,.10.Dec.2023 - if line.startswith('\t'): + if line.startswith("\t"): for word_hex in self._WORD_HEX_REGEX.findall(line): # Our `word_hex` looks like: ` 6e2c`, containing two bytes. We push each byte in network byte order. if len(word_hex) == 5: @@ -114,21 +113,21 @@ def _parse_line(self, line: str) -> None: try: self._process_packet(self.buffer) except Exception: - self.error('failed to parse IP packet', exc_info=True) + self.error("failed to parse IP packet", exc_info=True) - self.buffer = b'' + self.buffer = b"" def _process_packet(self, data: bytes) -> None: # Validate IPv4 header if len(data) < 40: - raise InvalidIpv4Packet('Packet too short (%s bytes)', len(data)) + raise InvalidIpv4Packet("Packet too short (%s bytes)", len(data)) if data[0] >> 4 != 4: - raise InvalidIpv4Packet('Wrong IPv4 version (%s)', data[0] >> 4) + raise InvalidIpv4Packet("Wrong IPv4 version (%s)", data[0] >> 4) ip_packet = data ip_header_length = (ip_packet[0] & 0x0F) * 4 - ip_source = f'{ip_packet[12]}.{ip_packet[13]}.{ip_packet[14]}.{ip_packet[15]}' - ip_dest = f'{ip_packet[16]}.{ip_packet[17]}.{ip_packet[18]}.{ip_packet[19]}' + ip_source = f"{ip_packet[12]}.{ip_packet[13]}.{ip_packet[14]}.{ip_packet[15]}" + ip_dest = f"{ip_packet[16]}.{ip_packet[17]}.{ip_packet[18]}.{ip_packet[19]}" tcp_packet = ip_packet[ip_header_length:] tcp_source_port = (tcp_packet[0] << 8) + tcp_packet[1] @@ -145,16 +144,16 @@ def _process_packet(self, data: bytes) -> None: return is_request = ( - (self.master_ip and self.master_ip == ip_source) or - (self.slave_ip and self.slave_ip == ip_dest) or - (self.master_port and self.master_port == tcp_source_port) or - (self.slave_port and self.slave_port == tcp_dest_port) + (self.master_ip and self.master_ip == ip_source) + or (self.slave_ip and self.slave_ip == ip_dest) + or (self.master_port and self.master_port == tcp_source_port) + or (self.slave_port and self.slave_port == tcp_dest_port) ) or False is_response = ( - (self.master_ip and self.master_ip == ip_dest) or - (self.slave_ip and self.slave_ip == ip_source) or - (self.master_port and self.master_port == tcp_dest_port) or - (self.slave_port and self.slave_port == tcp_source_port) + (self.master_ip and self.master_ip == ip_dest) + or (self.slave_ip and self.slave_ip == ip_source) + or (self.master_port and self.master_port == tcp_dest_port) + or (self.slave_port and self.slave_port == tcp_source_port) ) or False if is_request: self._process_modbus_request(modbus_frame) @@ -162,21 +161,21 @@ def _process_packet(self, data: bytes) -> None: self._process_modbus_response(modbus_frame) def _process_modbus_request(self, frame: bytes) -> None: - self.debug('got Modbus request: %s', binascii.hexlify(frame).decode()) + self.debug("got Modbus request: %s", binascii.hexlify(frame).decode()) if len(frame) < 5: - raise InvalidModbusFrame('Frame too short (%s bytes)', len(frame)) + raise InvalidModbusFrame("Frame too short (%s bytes)", len(frame)) unit_id = frame[0] function = frame[1] data = frame[2:-2] crc = frame[-2:] if crc != self.compute_crc(frame[:-2]): - raise InvalidModbusFrame('CRC verification failed') + raise InvalidModbusFrame("CRC verification failed") if function not in self.WATCHED_FUNCTIONS: - self.debug('ignoring Modbus function %s', function) + self.debug("ignoring Modbus function %s", function) return if self.unit_id and self.unit_id != unit_id: - self.debug('ignoring Modbus slave address %s', unit_id) + self.debug("ignoring Modbus slave address %s", unit_id) return if function == self.MODBUS_FUNC_READ_COILS: self.process_read_coils_request(data) @@ -188,21 +187,21 @@ def _process_modbus_request(self, frame: bytes) -> None: self.process_read_input_registers_request(data) def _process_modbus_response(self, frame: bytes) -> None: - self.debug('got Modbus response: %s', binascii.hexlify(frame).decode()) + self.debug("got Modbus response: %s", binascii.hexlify(frame).decode()) if len(frame) < 5: - raise InvalidModbusFrame('Frame too short (%s bytes)', len(frame)) + raise InvalidModbusFrame("Frame too short (%s bytes)", len(frame)) unit_id = frame[0] function = frame[1] data = frame[2:-2] crc = frame[-2:] if crc != self.compute_crc(frame[:-2]): - raise InvalidModbusFrame('CRC verification failed') + raise InvalidModbusFrame("CRC verification failed") if function not in self.WATCHED_FUNCTIONS: - self.debug('ignoring Modbus function %s', function) + self.debug("ignoring Modbus function %s", function) return if self.unit_id and self.unit_id != unit_id: - self.debug('ignoring Modbus unit ID %s', unit_id) + self.debug("ignoring Modbus unit ID %s", unit_id) return if function == self.MODBUS_FUNC_READ_COILS: self.process_read_coils_response(data) diff --git a/qtoggleserver/modbus/ports.py b/qtoggleserver/modbus/ports.py index e06f270..3bb9e29 100644 --- a/qtoggleserver/modbus/ports.py +++ b/qtoggleserver/modbus/ports.py @@ -1,6 +1,6 @@ import struct -from typing import Optional, Union, cast +from typing import cast from qtoggleserver.core import ports as core_ports from qtoggleserver.core.typing import NullablePortValue, PortValue @@ -25,39 +25,39 @@ } STRUCT_TYPE_MAPPING = { - 'b': int, - 'B': int, - '?': bool, - 'h': int, - 'H': int, - 'i': int, - 'I': int, - 'l': int, - 'L': int, - 'q': int, - 'Q': int, - 'n': int, - 'N': int, - 'e': float, - 'f': float, - 'd': float, + "b": int, + "B": int, + "?": bool, + "h": int, + "H": int, + "i": int, + "I": int, + "l": int, + "L": int, + "q": int, + "Q": int, + "n": int, + "N": int, + "e": float, + "f": float, + "d": float, } class ModbusClientPort(polled.PolledPort): - DEFAULT_VALUE_FMT = '>h' + DEFAULT_VALUE_FMT = ">h" def __init__( self, *, id: str, modbus_type: str, - address: Union[int, str], + address: int | str, length: int = 1, - writable: Optional[bool] = None, - register_group_fmt: Optional[str] = None, + writable: bool | None = None, + register_group_fmt: str | None = None, value_fmt: str = DEFAULT_VALUE_FMT, - **kwargs + **kwargs, ) -> None: if isinstance(address, str): address = int(address, base=0) @@ -69,7 +69,7 @@ def __init__( self._modbus_type: str = modbus_type self._address: int = address self._length: int = length - self._register_group_fmt: str = register_group_fmt if register_group_fmt else f'>{"H" * length}' + self._register_group_fmt: str = register_group_fmt if register_group_fmt else f">{'H' * length}" self._value_fmt: str = value_fmt super().__init__(id=id, **kwargs) @@ -84,7 +84,7 @@ async def read_value(self) -> NullablePortValue: value_fmt = self._value_fmt values = [] if self._modbus_type in (constants.MODBUS_TYPE_COIL, constants.MODBUS_TYPE_DISCRETE_INPUT): - group_fmt = value_fmt = '?' + group_fmt = value_fmt = "?" if self._modbus_type == constants.MODBUS_TYPE_COIL: value = client.get_last_coil_value(self._address) else: diff --git a/qtoggleserver/modbus/server.py b/qtoggleserver/modbus/server.py index f71a2b6..4b9decd 100644 --- a/qtoggleserver/modbus/server.py +++ b/qtoggleserver/modbus/server.py @@ -11,13 +11,13 @@ class BaseModbusServer(BaseModbus, Peripheral): def __init__( self, *, - identity_vendor_name: str = '', - identity_product_code: str = '', - identity_major_minor_revision: str = '', - identity_vendor_url: str = '', - identity_product_name: str = '', - identity_model_name: str = '', - identity_user_application_name: str = '', + identity_vendor_name: str = "", + identity_product_code: str = "", + identity_major_minor_revision: str = "", + identity_vendor_url: str = "", + identity_product_name: str = "", + identity_model_name: str = "", + identity_user_application_name: str = "", **kwargs, ) -> None: self.identity_vendor_name: str = identity_vendor_name diff --git a/setup.py b/setup.py deleted file mode 100644 index 8d0b6ae..0000000 --- a/setup.py +++ /dev/null @@ -1,17 +0,0 @@ -from setuptools import setup, find_namespace_packages - - -setup( - name='qtoggleserver-modbus', - version='unknown-version', - description='Modbus client/server for qToggleServer', - author='Calin Crisan', - author_email='ccrisan@gmail.com', - license='Apache 2.0', - - packages=find_namespace_packages(), - - install_requires=[ - 'pymodbus>=3.6,<3.7', - ] -)