diff --git a/src/veles/device/__init__.py b/src/veles/device/__init__.py index d98d887..d000c88 100644 --- a/src/veles/device/__init__.py +++ b/src/veles/device/__init__.py @@ -1,2 +1,2 @@ -from .sensor_wired_IAQ import SensorWiredIAQ -from .sensor_wired_RHT import SensorWiredRHT +from .sensor_wired import SensorWiredIAQ, SensorWiredRHT +from .find import find_devices \ No newline at end of file diff --git a/src/veles/device/find.py b/src/veles/device/find.py index 6946ae6..fa1e35d 100644 --- a/src/veles/device/find.py +++ b/src/veles/device/find.py @@ -1,7 +1,6 @@ -from typing import Final, Dict, Any, TypeVar, Type +from typing import Final, Dict, Any, TypeVar, Type, Iterable from .generic import Device, NoResponseError -from .sensor_wired_IAQ import SensorWiredIAQ -from .sensor_wired_RHT import SensorWiredRHT +from .sensor_wired import SensorWiredIAQ, SensorWiredRHT # links device identifiers to its class DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = { @@ -11,20 +10,11 @@ DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = { T = TypeVar("T", bound=Device) -def find_devices(device_cls: Type[T], address_space: list[Any]) -> list[T]: + +def find_devices(device_cls: Type[T], address_space: Iterable[Any]) -> list[T]: """ Look for devices in given address space """ - # found_devices = [] - # for address in address_space: - # try: - # found_devices.append(device_cls(address)) - # except NoResponseError: - # pass - # return found_devices - return list(filter(device_cls.probe, address_space)) # TODO add device args - # TODO return devices themselves, not addresses - # TODO add .address to Device diff --git a/src/veles/device/generic.py b/src/veles/device/generic.py index 7800c44..dcc43ed 100644 --- a/src/veles/device/generic.py +++ b/src/veles/device/generic.py @@ -1,21 +1,7 @@ """Module containing classes for generic wired/wireless devices""" from dataclasses import dataclass -from typing import Dict, Final, Any -from abc import ABC, abstractclassmethod, abstractmethod, abstractstaticmethod -import minimalmodbus -import serial - - -class Modbus: - """Class holding Modbus related constants""" - - HOLDING_REGISTER_START = 40001 - HOLDING_REGISTER_END = 49999 - INPUT_REGISTER_START = 30001 - INPUT_REGISTER_END = 39999 - # ranges for testing if address is in address range - input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END + 1) - holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END + 1) +from typing import Dict, Any +from abc import ABC, abstractmethod @dataclass(slots=True) @@ -38,6 +24,11 @@ class Device(ABC): Base class for all devices """ + address: Any + """ + Address space is device-specific (e.g. int for modbus) + """ + @abstractmethod def get_data(self) -> Dict[str, Any]: """ @@ -72,150 +63,3 @@ class Device(ABC): except NoResponseError: return False return True - - -class ModbusRTUDevice(Device): - """ - Base class for wired device controlled over MODBUS RTU (via RS-485) - - RS-485 to USB converter is needed for devices based off this class - """ - - # Reflects array fw/Core/Src/config.c:config_baudrates[] - BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] - # magic constant for resetting: common to all Modbus RTU devices - MAGIC_RESET_CONSTANT: Final[int] = 0xABCD - # registers common to all Modbus RTU devices - input_registers: Dict[str, int] = { - "SERIAL_NUMBER_1": 30001, - "SERIAL_NUMBER_2": 30002, - } - holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} - - def __comm_device_init(self) -> minimalmodbus.Instrument: - comm_device = minimalmodbus.Instrument( - self.dev, self.modbus_address, close_port_after_each_call=True - ) - # RS-485 serial paramater init - comm_device.serial.baudrate = self.baudrate - comm_device.serial.bytesize = 8 - comm_device.serial.parity = serial.PARITY_EVEN - comm_device.serial.stopbits = 1 - comm_device.serial.timeout = 0.05 # seconds - comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode - comm_device.clear_buffers_before_each_transaction = True - return comm_device - - def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"): - self.modbus_address: int = modbus_address - self.baudrate: int = baudrate - self.dev: str = dev - self.comm_device: minimalmodbus.Instrument = self.__comm_device_init() - self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter() - # check if device actually exists on the bus (by reading serial number); - # if not, raise NoResponseException - try: - self.read_register(self.input_registers["SERIAL_NUMBER_1"]) - except minimalmodbus.NoResponseError as exc: - raise NoResponseError from exc - - def read_register( - self, register_number: int, signed: bool = False, retries: int = 10 - ) -> int: - """Read Modbus input/holding register via serial device""" - if register_number in Modbus.input_register_range: - function_code = 4 - register_offset = register_number - Modbus.INPUT_REGISTER_START - elif register_number in Modbus.holding_register_range: - function_code = 3 - register_offset = register_number - Modbus.HOLDING_REGISTER_START - else: - # wrong register number - raise ValueError - for _ in range(retries): - try: - self.readout_errors.total_attempts += 1 - # minimalmodbus divides received register value by 10 - return ( - self.comm_device.read_register( - register_offset, 1, functioncode=function_code, signed=signed - ) - * 10 - ) - except minimalmodbus.NoResponseError as exception: - last_exception = exception - self.readout_errors.no_response += 1 - continue - except minimalmodbus.InvalidResponseError as exception: - last_exception = exception - self.readout_errors.invalid_response += 1 - continue - # retries failed, raise last exception to inform user - raise last_exception - - def write_register( - self, register_number: int, register_value: int, retries: int = 10 - ) -> None: - """ - Write to slave holding register - """ - # only holding registers can be written - if register_number not in Modbus.holding_register_range: - raise ValueError - register_offset = register_number - Modbus.HOLDING_REGISTER_START - for _ in range(retries): - try: - return self.comm_device.write_register( - register_offset, register_value, functioncode=6 - ) - except ( - minimalmodbus.NoResponseError, - minimalmodbus.InvalidResponseError, - ) as exception: - last_exception = exception - continue - raise last_exception - - def __getitem__(self, key: int) -> int: - return self.read_register(key) - - def __setitem__(self, key: int, value: int) -> None: - return self.write_register(key, value) - - def reset(self) -> bool: - """ - Soft-reset the device - """ - try: - self.write_register( - ModbusRTUDevice.holding_registers["RESET_DEVICE"], - ModbusRTUDevice.MAGIC_RESET_CONSTANT, - ) - return False # got answer => failed to reset - except minimalmodbus.NoResponseError: - return True # no answer => reset successful - - @property - def device_code(self) -> int: - """ - Return device code. This can be matched to DEVICE_CODE - in child classes. - """ - return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"])) - - @property - def serial_number(self) -> int: - """ - Return serial number - """ - serial_number_1 = self.device_code - serial_number_2 = int( - self.read_register(self.input_registers["SERIAL_NUMBER_2"]) - ) - return (serial_number_1 << 16) + serial_number_2 - - def get_data(self) -> Dict[str, int]: - return { - name: self.read_register(number) - for name, number in self.input_registers.items() - } diff --git a/src/veles/device/modbus.py b/src/veles/device/modbus.py new file mode 100644 index 0000000..322f019 --- /dev/null +++ b/src/veles/device/modbus.py @@ -0,0 +1,167 @@ +"""Module containing generic Modbus classes/devices""" +from typing import Dict, Final +import minimalmodbus +import serial +from .generic import Device, ReadoutErrorCounter, NoResponseError + + +class Modbus: + """Class holding Modbus related constants""" + + HOLDING_REGISTER_START = 40001 + HOLDING_REGISTER_END = 49999 + INPUT_REGISTER_START = 30001 + INPUT_REGISTER_END = 39999 + # ranges for testing if address is in address range + input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END + 1) + holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END + 1) + + +class ModbusRTUDevice(Device): + """ + Base class for wired device controlled over MODBUS RTU (via RS-485) + + RS-485 to USB converter is needed for devices based off this class + """ + + # Reflects array fw/Core/Src/config.c:config_baudrates[] + BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] + # magic constant for resetting: common to all Modbus RTU devices + MAGIC_RESET_CONSTANT: Final[int] = 0xABCD + # registers common to all Modbus RTU devices + input_registers: Dict[str, int] = { + "SERIAL_NUMBER_1": 30001, + "SERIAL_NUMBER_2": 30002, + } + holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} + + def __comm_device_init(self) -> minimalmodbus.Instrument: + comm_device = minimalmodbus.Instrument( + self.dev, self.address, close_port_after_each_call=True + ) + # RS-485 serial paramater init + comm_device.serial.baudrate = self.baudrate + comm_device.serial.bytesize = 8 + comm_device.serial.parity = serial.PARITY_EVEN + comm_device.serial.stopbits = 1 + comm_device.serial.timeout = 0.05 # seconds + comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode + comm_device.clear_buffers_before_each_transaction = True + return comm_device + + def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"): + self.address: int = modbus_address + self.baudrate: int = baudrate + self.dev: str = dev + self.comm_device: minimalmodbus.Instrument = self.__comm_device_init() + self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter() + # check if device actually exists on the bus (by reading serial number); + # if not, raise NoResponseException + try: + self.read_register(self.input_registers["SERIAL_NUMBER_1"]) + except minimalmodbus.NoResponseError as exc: + raise NoResponseError from exc + + def read_register( + self, register_number: int, signed: bool = False, retries: int = 10 + ) -> int: + """Read Modbus input/holding register via serial device""" + if register_number in Modbus.input_register_range: + function_code = 4 + register_offset = register_number - Modbus.INPUT_REGISTER_START + elif register_number in Modbus.holding_register_range: + function_code = 3 + register_offset = register_number - Modbus.HOLDING_REGISTER_START + else: + # wrong register number + raise ValueError + for _ in range(retries): + try: + self.readout_errors.total_attempts += 1 + # minimalmodbus divides received register value by 10 + return ( + self.comm_device.read_register( + register_offset, 1, functioncode=function_code, signed=signed + ) + * 10 + ) + except minimalmodbus.NoResponseError as exception: + last_exception = exception + self.readout_errors.no_response += 1 + continue + except minimalmodbus.InvalidResponseError as exception: + last_exception = exception + self.readout_errors.invalid_response += 1 + continue + # retries failed, raise last exception to inform user + raise last_exception + + def write_register( + self, register_number: int, register_value: int, retries: int = 10 + ) -> None: + """ + Write to slave holding register + """ + # only holding registers can be written + if register_number not in Modbus.holding_register_range: + raise ValueError + register_offset = register_number - Modbus.HOLDING_REGISTER_START + for _ in range(retries): + try: + return self.comm_device.write_register( + register_offset, register_value, functioncode=6 + ) + except ( + minimalmodbus.NoResponseError, + minimalmodbus.InvalidResponseError, + ) as exception: + last_exception = exception + continue + raise last_exception + + def __getitem__(self, key: int) -> int: + return self.read_register(key) + + def __setitem__(self, key: int, value: int) -> None: + return self.write_register(key, value) + + def reset(self) -> bool: + """ + Soft-reset the device + """ + try: + self.write_register( + ModbusRTUDevice.holding_registers["RESET_DEVICE"], + ModbusRTUDevice.MAGIC_RESET_CONSTANT, + ) + return False # got answer => failed to reset + except minimalmodbus.NoResponseError: + return True # no answer => reset successful + + @property + def device_code(self) -> int: + """ + Return device code. This can be matched to DEVICE_CODE + in child classes. + """ + return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"])) + + @property + def serial_number(self) -> int: + """ + Return serial number + """ + serial_number_1 = self.device_code + serial_number_2 = int( + self.read_register(self.input_registers["SERIAL_NUMBER_2"]) + ) + return (serial_number_1 << 16) + serial_number_2 + + def get_data(self) -> Dict[str, int]: + """ + Get all data from sensor + """ + return { + name: self.read_register(number) + for name, number in self.input_registers.items() + } diff --git a/src/veles/device/sensor_wired_IAQ.py b/src/veles/device/sensor_wired.py similarity index 75% rename from src/veles/device/sensor_wired_IAQ.py rename to src/veles/device/sensor_wired.py index 6ad0165..a82986c 100644 --- a/src/veles/device/sensor_wired_IAQ.py +++ b/src/veles/device/sensor_wired.py @@ -1,7 +1,7 @@ from time import sleep from typing import Dict, Final from minimalmodbus import IllegalRequestError -from .generic import ModbusRTUDevice +from .modbus import ModbusRTUDevice class SensorWiredIAQ(ModbusRTUDevice): @@ -96,3 +96,45 @@ class SensorWiredIAQ(ModbusRTUDevice): self.read_register(self.input_registers["PMC_MASS_1_0"]) except IllegalRequestError: self.__remove_sensor_from_input_registers("PMC") + + +class SensorWiredRHT(ModbusRTUDevice): + """ + Wired sensor measuring temperature, relative humidity + and light intensity. + """ + + DEVICE_CLASS: Final[str] = "RHT_Wired" + DEVICE_CODE: Final[int] = 0x0020 + + input_registers: Dict[str, int] = { + "SER_NUM_1": 30001, + "SER_NUM_2": 30002, + "T": 30003, # from SHT4x + "T_F": 30004, + "RH": 30005, # from SHT4x + "LIGHT_INTENSITY_1": 30006, + "LIGHT_INTENSITY_2": 30007, + "ERROR_T_RH": 30008, + "ERROR_LIGHT": 30009, + } | ModbusRTUDevice.input_registers + holding_registers: Dict[str, int] = { + "MODBUS_ADDR": 40001, + "BAUDRATE": 40002, + "LTR329_GAIN": 40003, + "LTR329_MEAS_RATE": 40004, + "LTR329_INTEGRATION_TIME": 40005, + "LTR329_MODE": 40006, + } | ModbusRTUDevice.holding_registers + + @property + def CO2(self): + return int(self.read_register(self.input_registers["CO2"])) + + @property + def T(self): + return self.read_register(self.input_registers["T"], signed=True) / 10 + + @property + def RH(self): + return self.read_register(self.input_registers["RH"]) diff --git a/src/veles/device/sensor_wired_RHT.py b/src/veles/device/sensor_wired_RHT.py deleted file mode 100644 index 1e04fc6..0000000 --- a/src/veles/device/sensor_wired_RHT.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import Dict, Final -from minimalmodbus import NoResponseError -from .generic import ModbusRTUDevice - - -class SensorWiredRHT(ModbusRTUDevice): - """ - Wired sensor measuring temperature, relative humidity - and light intensity. - """ - - DEVICE_CLASS: Final[str] = "RHT_Wired" - DEVICE_CODE: Final[int] = 0x0020 - - input_registers: Dict[str, int] = { - "SER_NUM_1": 30001, - "SER_NUM_2": 30002, - "T": 30003, # from SHT4x - "T_F": 30004, - "RH": 30005, # from SHT4x - "LIGHT_INTENSITY_1": 30006, - "LIGHT_INTENSITY_2": 30007, - "ERROR_T_RH": 30008, - "ERROR_LIGHT": 30009, - } | ModbusRTUDevice.input_registers - holding_registers: Dict[str, int] = { - "MODBUS_ADDR": 40001, - "BAUDRATE": 40002, - "LTR329_GAIN": 40003, - "LTR329_MEAS_RATE": 40004, - "LTR329_INTEGRATION_TIME": 40005, - "LTR329_MODE": 40006, - } | ModbusRTUDevice.holding_registers - - @property - def CO2(self): - return int(self.read_register(self.input_registers["CO2"])) - - @property - def T(self): - return self.read_register(self.input_registers["T"], signed=True) / 10 - - @property - def RH(self): - return self.read_register(self.input_registers["RH"])