From 79317fed9560d2d4ff3337b6c6a6c147baddf85d Mon Sep 17 00:00:00 2001 From: Jan Mrna Date: Sat, 18 Jun 2022 18:24:17 +0200 Subject: [PATCH] Adapted to new registers, added LED getter etc. --- src/veles/devices/generic.py | 112 +++++++++++++------------- src/veles/devices/sensor_wired_IAQ.py | 95 ++++++++++++---------- 2 files changed, 111 insertions(+), 96 deletions(-) diff --git a/src/veles/devices/generic.py b/src/veles/devices/generic.py index 4eae5b9..79bc2f3 100644 --- a/src/veles/devices/generic.py +++ b/src/veles/devices/generic.py @@ -1,82 +1,80 @@ -'''Module containing classes for generic wired/wireless devices''' +"""Module containing classes for generic wired/wireless devices""" from dataclasses import dataclass from typing import Dict, Final import minimalmodbus import serial -class Modbus(): - '''Class holding Modbus related constants''' +class Modbus: + """Class holding Modbus related constants""" + HOLDING_REGISTER_START = 40001 HOLDING_REGISTER_END = 49999 INPUT_REGISTER_START = 30001 INPUT_REGISTER_END = 39999 # ranges for testing if address is in address range - input_register_range = range(INPUT_REGISTER_START, - INPUT_REGISTER_END) - holding_register_range = range(HOLDING_REGISTER_START, - HOLDING_REGISTER_END) + input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END) + holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END) @dataclass -class ReadoutErrorCounter(): - '''Class used to track readout errors''' +class ReadoutErrorCounter: + """Class used to track readout errors""" + total_attempts: int = 0 invalid_response: int = 0 no_response: int = 0 -class Device(): - '''Base class for all devices''' +class Device: + """Base class for all devices""" class ModbusRTUDevice(Device): - ''' + """ Base class for wired device controlled over MODBUS RTU (via RS-485) RS-485 to USB converter is needed for devices based off this class - ''' + """ # Reflects array fw/Core/Src/config.c:config_baudrates[] BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] # registers common to all Modbus RTU devices input_registers: Dict[str, int] = { - 'IDENTIFIER': 30100 - } - holding_registers: Dict[str, int] = { - 'RESET': 40100 + "SERIAL_NUMBER_1": 30001, + "SERIAL_NUMBER_2": 30002, } + holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} # magic constant for resetting: common to all Modbus RTU devices MAGIC_RESET_CONSTANT: Final[int] = 0xABCD def __comm_device_init(self) -> minimalmodbus.Instrument: - comm_device = minimalmodbus.Instrument(self.dev, - self.modbus_address, - close_port_after_each_call=True) + comm_device = minimalmodbus.Instrument( + self.dev, self.modbus_address, close_port_after_each_call=True + ) # RS-485 serial paramater init comm_device.serial.baudrate = self.baudrate comm_device.serial.bytesize = 8 comm_device.serial.parity = serial.PARITY_EVEN comm_device.serial.stopbits = 1 - comm_device.serial.timeout = 0.05 # seconds - comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode + comm_device.serial.timeout = 0.05 # seconds + comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode comm_device.clear_buffers_before_each_transaction = True return comm_device - def __init__(self, modbus_address, baudrate=19200, dev='/dev/rs485'): + def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"): self.modbus_address: int = modbus_address self.baudrate: int = baudrate self.dev: str = dev self.comm_device: minimalmodbus.Instrument = self.__comm_device_init() self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter() - def read_register(self, - register_number: int, - signed: bool = False, - retries: int = 10) -> int: - '''Read Modbus input/holding register via serial device''' + def read_register( + self, register_number: int, signed: bool = False, retries: int = 10 + ) -> int: + """Read Modbus input/holding register via serial device""" if register_number in Modbus.input_register_range: function_code = 4 register_offset = register_number - Modbus.INPUT_REGISTER_START @@ -86,43 +84,47 @@ class ModbusRTUDevice(Device): else: # wrong register number raise ValueError - for i in range(retries): + for _ in range(retries): try: self.readout_errors.total_attempts += 1 # minimalmodbus divides received register value by 10 - return self.comm_device.read_register( - register_offset, - 1, - functioncode=function_code, - signed=signed - ) * 10 - except minimalmodbus.NoResponseError as e: - last_exception = e + return ( + self.comm_device.read_register( + register_offset, 1, functioncode=function_code, signed=signed + ) + * 10 + ) + except minimalmodbus.NoResponseError as exception: + last_exception = exception self.readout_errors.no_response += 1 continue - except minimalmodbus.InvalidResponseError as e: - last_exception = e + except minimalmodbus.InvalidResponseError as exception: + last_exception = exception self.readout_errors.invalid_response += 1 continue # retries failed, raise last exception to inform user raise last_exception - def write_register(self, - register_number: int, - register_value: int, - retries: int = 10) -> None: + def write_register( + self, register_number: int, register_value: int, retries: int = 10 + ) -> None: + """ + Write to slave holding register + """ # only holding registers can be written if register_number not in Modbus.holding_register_range: raise ValueError register_offset = register_number - Modbus.HOLDING_REGISTER_START - for i in range(retries): + for _ in range(retries): try: - return self.comm_device.write_register(register_offset, - register_value, - functioncode=6) - except (minimalmodbus.NoResponseError, - minimalmodbus.InvalidResponseError) as e: - last_exception = e + return self.comm_device.write_register( + register_offset, register_value, functioncode=6 + ) + except ( + minimalmodbus.NoResponseError, + minimalmodbus.InvalidResponseError, + ) as exception: + last_exception = exception continue raise last_exception @@ -133,12 +135,14 @@ class ModbusRTUDevice(Device): return self.write_register(key, value) def reset(self) -> bool: - ''' + """ Soft-reset the device - ''' + """ try: - self.write_register(ModbusRTUDevice.holding_registers['RESET'], - ModbusRTUDevice.MAGIC_RESET_CONSTANT) + self.write_register( + ModbusRTUDevice.holding_registers["RESET"], + ModbusRTUDevice.MAGIC_RESET_CONSTANT, + ) return False # got answer => failed to reset except minimalmodbus.NoResponseError: return True # no answer => reset successful diff --git a/src/veles/devices/sensor_wired_IAQ.py b/src/veles/devices/sensor_wired_IAQ.py index 3e731bd..7913c76 100644 --- a/src/veles/devices/sensor_wired_IAQ.py +++ b/src/veles/devices/sensor_wired_IAQ.py @@ -1,61 +1,72 @@ +from time import sleep from typing import Dict, Final from minimalmodbus import NoResponseError from .generic import ModbusRTUDevice class SensorWiredIAQ(ModbusRTUDevice): - ''' + """ Wired sensor measuring temperature, relative humidity, carbon dioxide and VOC, optionally particulate matter - ''' - IDENTIFIER: int = 0xbeef + """ + + IDENTIFIER: int = 0xBEEF input_registers: Dict[str, int] = { - 'T': 30010, # from SHT4x - 'T_F': 30011, - 'RH': 30012, # from SHT4x - 'CO2': 30013, # from SCD4x - 'VOC_index': 30014, - 'VOC_ticks': 30015, - 'NOx_index': 30016, - 'NOx_ticks': 30017, - 'PM_mass_concentration_1.0': 30018, - 'PM_mass_concentration_2.5': 30019, - 'PM_mass_concentration_4.0': 30020, - 'PM_mass_concentration_10.0': 30021, - 'PM_number_concentration_0.5': 30022, - 'PM_number_concentration_1.0': 30023, - 'PM_number_concentration_2.5': 30024, - 'PM_number_concentration_4.0': 30025, - 'PM_number_concentration_10.0': 30026, - 'PM_typical_particle_size': 30027, - 'T_SCD4x': 30028, - 'T_SCF4x_F': 30029, - 'RH_SCD4x': 30030 - } | ModbusRTUDevice.input_registers + 'T': 30003, # deg C + 'T_F': 30004, # deg F + 'RH': 30005, # %, from SHT4x + 'CO2': 30006, # ppm + 'VOC_INDEX': 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100) + 'VOC_TICKS': 30008, # raw VOC ticks + 'PMC_MASS_1_0': 30009, # ug / m^3 + 'PMC_MASS_2_5': 30010, # ug / m^3 + 'PMC_MASS_4_0': 30011, # ug / m^3 + 'PMC_MASS_10_0': 30012, # ug / m^3 + 'PMC_NUMBER_0_5': 30013, # 1 / m^3 + 'PMC_NUMBER_1_0': 30014, # 1 / m^3 + 'PMC_NUMBER_2_5': 30015, # 1 / m^3 + 'PMC_NUMBER_4_0': 30016, # 1 / m^3 + 'PMC_NUMBER_10_0': 30017, # 1 / m^3 + 'TYPICAL_PARTICLE_SIZE': 30018, # nm + 'T_SCD4x': 30019, # deg C + 'T_SCD4x_F': 30020, # deg F + 'RH_SCD4x': 30021 # % + } | ModbusRTUDevice.input_registers # TODO use super, but __class__ not defined holding_registers: Dict[str, int] = { - 'LED_on': 40001, - 'LED_brightness': 40002, - 'LED_smooth': 40003, - 'CO2_alert_limit_1': 40004, - 'CO2_alert_limit_2': 40005, - 'SCD4x_temperature_offset': 40006, - 'MODBUS_address': 40007, - 'baudrate': 40008 - } | ModbusRTUDevice.holding_registers + 'MODBUS_ADDR': 40001, + 'BAUDRATE': 40002, + 'LED_ON': 40003, + 'LED_BRIGHTNESS': 40004, + 'LED_SMOOTH': 40005, + 'CO2_ALERT_LIMIT1': 40006, + 'CO2_ALERT_LIMIT2': 40007, + 'SCD4x_T_OFFSET': 40008 + } | ModbusRTUDevice.holding_registers RESET_MAGIC_NUMBER: Final[int] = 0xABCD @property - def CO2(self): - return int(self.read_register(self.input_registers['CO2'])) + def CO2(self) -> int: + return int(self.read_register(self.input_registers["CO2"])) @property - def T(self): + def T(self) -> float: # TODO maybe use signed version? - return self.read_register(self.input_registers['T'], - signed=True - ) / 10 + return self.read_register(self.input_registers["T"], signed=True) / 10 @property - def RH(self): - return self.read_register(self.input_registers['RH']) + def RH(self) -> float: + return self.read_register(self.input_registers["RH"]) + + @property + def LED(self) -> int: + return int(self.read_register(self.holding_registers["LED_brightness"])) + + @LED.setter + def LED(self, value: int): + if value == 0: + self.write_register(self.holding_registers["LED_on"], 0) + else: + self.write_register(self.holding_registers["LED_brightness"], value) + sleep(0.1) + self.write_register(self.holding_registers["LED_on"], 1)