diff --git a/src/veles/__init__.py b/src/veles/__init__.py index e69de29..11b0782 100644 --- a/src/veles/__init__.py +++ b/src/veles/__init__.py @@ -0,0 +1 @@ +from . import devices diff --git a/src/veles/devices/__init__.py b/src/veles/devices/__init__.py index e69de29..a19b375 100644 --- a/src/veles/devices/__init__.py +++ b/src/veles/devices/__init__.py @@ -0,0 +1,2 @@ +from .sensor_wired_IAQ import SensorWiredIAQ +from .sensor_wired_RHT import SensorWiredRHT \ No newline at end of file diff --git a/src/veles/devices/generic.py b/src/veles/devices/generic.py index 79bc2f3..34f7cfc 100644 --- a/src/veles/devices/generic.py +++ b/src/veles/devices/generic.py @@ -17,7 +17,7 @@ class Modbus: holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END) -@dataclass +@dataclass(slots=True) class ReadoutErrorCounter: """Class used to track readout errors""" @@ -39,7 +39,8 @@ class ModbusRTUDevice(Device): # Reflects array fw/Core/Src/config.c:config_baudrates[] BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] - + # magic constant for resetting: common to all Modbus RTU devices + MAGIC_RESET_CONSTANT: Final[int] = 0xABCD # registers common to all Modbus RTU devices input_registers: Dict[str, int] = { "SERIAL_NUMBER_1": 30001, @@ -47,9 +48,6 @@ class ModbusRTUDevice(Device): } holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} - # magic constant for resetting: common to all Modbus RTU devices - MAGIC_RESET_CONSTANT: Final[int] = 0xABCD - def __comm_device_init(self) -> minimalmodbus.Instrument: comm_device = minimalmodbus.Instrument( self.dev, self.modbus_address, close_port_after_each_call=True @@ -91,8 +89,7 @@ class ModbusRTUDevice(Device): return ( self.comm_device.read_register( register_offset, 1, functioncode=function_code, signed=signed - ) - * 10 + ) * 10 ) except minimalmodbus.NoResponseError as exception: last_exception = exception @@ -146,3 +143,11 @@ class ModbusRTUDevice(Device): return False # got answer => failed to reset except minimalmodbus.NoResponseError: return True # no answer => reset successful + + @property + def device_code(self) -> int: + """ + Return device code. This can be matched to DEVICE_CODE + in child classes. + """ + return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"])) diff --git a/src/veles/devices/sensor_wired_IAQ.py b/src/veles/devices/sensor_wired_IAQ.py index 7913c76..606c142 100644 --- a/src/veles/devices/sensor_wired_IAQ.py +++ b/src/veles/devices/sensor_wired_IAQ.py @@ -1,6 +1,5 @@ from time import sleep from typing import Dict, Final -from minimalmodbus import NoResponseError from .generic import ModbusRTUDevice @@ -10,40 +9,40 @@ class SensorWiredIAQ(ModbusRTUDevice): carbon dioxide and VOC, optionally particulate matter """ - IDENTIFIER: int = 0xBEEF + DEVICE_CLASS: Final[str] = "IAQ_Wired" + DEVICE_CODE: Final[int] = 0x0010 + input_registers: Dict[str, int] = { - 'T': 30003, # deg C - 'T_F': 30004, # deg F - 'RH': 30005, # %, from SHT4x - 'CO2': 30006, # ppm - 'VOC_INDEX': 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100) - 'VOC_TICKS': 30008, # raw VOC ticks - 'PMC_MASS_1_0': 30009, # ug / m^3 - 'PMC_MASS_2_5': 30010, # ug / m^3 - 'PMC_MASS_4_0': 30011, # ug / m^3 - 'PMC_MASS_10_0': 30012, # ug / m^3 - 'PMC_NUMBER_0_5': 30013, # 1 / m^3 - 'PMC_NUMBER_1_0': 30014, # 1 / m^3 - 'PMC_NUMBER_2_5': 30015, # 1 / m^3 - 'PMC_NUMBER_4_0': 30016, # 1 / m^3 - 'PMC_NUMBER_10_0': 30017, # 1 / m^3 - 'TYPICAL_PARTICLE_SIZE': 30018, # nm - 'T_SCD4x': 30019, # deg C - 'T_SCD4x_F': 30020, # deg F - 'RH_SCD4x': 30021 # % + "T": 30003, # deg C + "T_F": 30004, # deg F + "RH": 30005, # %, from SHT4x + "CO2": 30006, # ppm + "VOC_INDEX": 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100) + "VOC_TICKS": 30008, # raw VOC ticks + "PMC_MASS_1_0": 30009, # ug / m^3 + "PMC_MASS_2_5": 30010, # ug / m^3 + "PMC_MASS_4_0": 30011, # ug / m^3 + "PMC_MASS_10_0": 30012, # ug / m^3 + "PMC_NUMBER_0_5": 30013, # 1 / m^3 + "PMC_NUMBER_1_0": 30014, # 1 / m^3 + "PMC_NUMBER_2_5": 30015, # 1 / m^3 + "PMC_NUMBER_4_0": 30016, # 1 / m^3 + "PMC_NUMBER_10_0": 30017, # 1 / m^3 + "TYPICAL_PARTICLE_SIZE": 30018, # nm + "T_SCD4x": 30019, # deg C + "T_SCD4x_F": 30020, # deg F + "RH_SCD4x": 30021 # % } | ModbusRTUDevice.input_registers - # TODO use super, but __class__ not defined holding_registers: Dict[str, int] = { - 'MODBUS_ADDR': 40001, - 'BAUDRATE': 40002, - 'LED_ON': 40003, - 'LED_BRIGHTNESS': 40004, - 'LED_SMOOTH': 40005, - 'CO2_ALERT_LIMIT1': 40006, - 'CO2_ALERT_LIMIT2': 40007, - 'SCD4x_T_OFFSET': 40008 + "MODBUS_ADDR": 40001, + "BAUDRATE": 40002, + "LED_ON": 40003, + "LED_BRIGHTNESS": 40004, + "LED_SMOOTH": 40005, + "CO2_ALERT_LIMIT1": 40006, + "CO2_ALERT_LIMIT2": 40007, + "SCD4x_T_OFFSET": 40008 } | ModbusRTUDevice.holding_registers - RESET_MAGIC_NUMBER: Final[int] = 0xABCD @property def CO2(self) -> int: @@ -51,22 +50,26 @@ class SensorWiredIAQ(ModbusRTUDevice): @property def T(self) -> float: - # TODO maybe use signed version? return self.read_register(self.input_registers["T"], signed=True) / 10 @property def RH(self) -> float: return self.read_register(self.input_registers["RH"]) + @property + def VOC(self): + return self.read_register(self.input_registers["VOC_INDEX"]) + @property def LED(self) -> int: - return int(self.read_register(self.holding_registers["LED_brightness"])) + return int(self.read_register(self.holding_registers["LED_BRIGHTNESS"])) @LED.setter def LED(self, value: int): if value == 0: - self.write_register(self.holding_registers["LED_on"], 0) + self.write_register(self.holding_registers["LED_ON"], 0) else: - self.write_register(self.holding_registers["LED_brightness"], value) + self.write_register(self.holding_registers["LED_BRIGHTNESS"], value) sleep(0.1) - self.write_register(self.holding_registers["LED_on"], 1) + self.write_register(self.holding_registers["LED_ON"], 1) + diff --git a/src/veles/devices/sensor_wired_RHT.py b/src/veles/devices/sensor_wired_RHT.py index c35964d..1e04fc6 100644 --- a/src/veles/devices/sensor_wired_RHT.py +++ b/src/veles/devices/sensor_wired_RHT.py @@ -4,44 +4,42 @@ from .generic import ModbusRTUDevice class SensorWiredRHT(ModbusRTUDevice): - ''' + """ Wired sensor measuring temperature, relative humidity and light intensity. - ''' - IDENTIFIER: int = 0xb00b + """ + + DEVICE_CLASS: Final[str] = "RHT_Wired" + DEVICE_CODE: Final[int] = 0x0020 + input_registers: Dict[str, int] = { - 'SER_NUM_1': 30001, - 'SER_NUM_2': 30002, - 'T': 30003, # from SHT4x - 'T_F': 30004, - 'RH': 30005, # from SHT4x - 'LIGHT_INTENSITY_1': 30006, - 'LIGHT_INTENSITY_2': 30007, - 'ERROR_T_RH': 30008, - 'ERROR_LIGHT': 30009 - } | ModbusRTUDevice.input_registers - # TODO use super, but __class__ not defined + "SER_NUM_1": 30001, + "SER_NUM_2": 30002, + "T": 30003, # from SHT4x + "T_F": 30004, + "RH": 30005, # from SHT4x + "LIGHT_INTENSITY_1": 30006, + "LIGHT_INTENSITY_2": 30007, + "ERROR_T_RH": 30008, + "ERROR_LIGHT": 30009, + } | ModbusRTUDevice.input_registers holding_registers: Dict[str, int] = { - 'MODBUS_address': 40001, - 'baudrate': 40002, - 'LTR329_GAIN': 40003, - 'LTR329_MEAS_RATE': 40004, - 'LTR329_INTEGRATION_TIME': 40005, - 'LTR329_MODE': 40006 - } | ModbusRTUDevice.holding_registers - RESET_MAGIC_NUMBER: Final[int] = 0xABCD + "MODBUS_ADDR": 40001, + "BAUDRATE": 40002, + "LTR329_GAIN": 40003, + "LTR329_MEAS_RATE": 40004, + "LTR329_INTEGRATION_TIME": 40005, + "LTR329_MODE": 40006, + } | ModbusRTUDevice.holding_registers @property def CO2(self): - return int(self.read_register(self.input_registers['CO2'])) + return int(self.read_register(self.input_registers["CO2"])) @property def T(self): - # TODO maybe use signed version? - return self.read_register(self.input_registers['T'], - signed=True - ) / 10 + return self.read_register(self.input_registers["T"], signed=True) / 10 @property def RH(self): - return self.read_register(self.input_registers['RH']) + return self.read_register(self.input_registers["RH"])