forked from veles_labs/pyveles
		
	Merge branch 'master' of gitlab.com:VelesLabs/pyveles
This commit is contained in:
		
						commit
						0c3b48465d
					
				| @ -1,82 +1,80 @@ | ||||
| '''Module containing classes for generic wired/wireless devices''' | ||||
| """Module containing classes for generic wired/wireless devices""" | ||||
| from dataclasses import dataclass | ||||
| from typing import Dict, Final | ||||
| import minimalmodbus | ||||
| import serial | ||||
| 
 | ||||
| 
 | ||||
| class Modbus(): | ||||
|     '''Class holding Modbus related constants''' | ||||
| class Modbus: | ||||
|     """Class holding Modbus related constants""" | ||||
| 
 | ||||
|     HOLDING_REGISTER_START = 40001 | ||||
|     HOLDING_REGISTER_END = 49999 | ||||
|     INPUT_REGISTER_START = 30001 | ||||
|     INPUT_REGISTER_END = 39999 | ||||
|     # ranges for testing if address is in address range | ||||
|     input_register_range = range(INPUT_REGISTER_START, | ||||
|                                  INPUT_REGISTER_END) | ||||
|     holding_register_range = range(HOLDING_REGISTER_START, | ||||
|                                    HOLDING_REGISTER_END) | ||||
|     input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END) | ||||
|     holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class ReadoutErrorCounter(): | ||||
|     '''Class used to track readout errors''' | ||||
| class ReadoutErrorCounter: | ||||
|     """Class used to track readout errors""" | ||||
| 
 | ||||
|     total_attempts: int = 0 | ||||
|     invalid_response: int = 0 | ||||
|     no_response: int = 0 | ||||
| 
 | ||||
| 
 | ||||
| class Device(): | ||||
|     '''Base class for all devices''' | ||||
| class Device: | ||||
|     """Base class for all devices""" | ||||
| 
 | ||||
| 
 | ||||
| class ModbusRTUDevice(Device): | ||||
|     ''' | ||||
|     """ | ||||
|     Base class for wired device controlled over MODBUS RTU (via RS-485) | ||||
| 
 | ||||
|     RS-485 to USB converter is needed for devices based off this class | ||||
|     ''' | ||||
|     """ | ||||
| 
 | ||||
|     # Reflects array fw/Core/Src/config.c:config_baudrates[] | ||||
|     BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] | ||||
| 
 | ||||
|     # registers common to all Modbus RTU devices | ||||
|     input_registers: Dict[str, int] = { | ||||
|         'IDENTIFIER': 30100 | ||||
|     } | ||||
|     holding_registers: Dict[str, int] = { | ||||
|         'RESET': 40100 | ||||
|         "SERIAL_NUMBER_1": 30001, | ||||
|         "SERIAL_NUMBER_2": 30002, | ||||
|     } | ||||
|     holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} | ||||
| 
 | ||||
|     # magic constant for resetting: common to all Modbus RTU devices | ||||
|     MAGIC_RESET_CONSTANT: Final[int] = 0xABCD | ||||
| 
 | ||||
|     def __comm_device_init(self) -> minimalmodbus.Instrument: | ||||
|         comm_device = minimalmodbus.Instrument(self.dev, | ||||
|                                                self.modbus_address, | ||||
|                                                close_port_after_each_call=True) | ||||
|         comm_device = minimalmodbus.Instrument( | ||||
|             self.dev, self.modbus_address, close_port_after_each_call=True | ||||
|         ) | ||||
|         # RS-485 serial paramater init | ||||
|         comm_device.serial.baudrate = self.baudrate | ||||
|         comm_device.serial.bytesize = 8 | ||||
|         comm_device.serial.parity = serial.PARITY_EVEN | ||||
|         comm_device.serial.stopbits = 1 | ||||
|         comm_device.serial.timeout = 0.05          # seconds | ||||
|         comm_device.mode = minimalmodbus.MODE_RTU   # rtu or ascii mode | ||||
|         comm_device.serial.timeout = 0.05  # seconds | ||||
|         comm_device.mode = minimalmodbus.MODE_RTU  # rtu or ascii mode | ||||
|         comm_device.clear_buffers_before_each_transaction = True | ||||
|         return comm_device | ||||
| 
 | ||||
|     def __init__(self, modbus_address, baudrate=19200, dev='/dev/rs485'): | ||||
|     def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"): | ||||
|         self.modbus_address: int = modbus_address | ||||
|         self.baudrate: int = baudrate | ||||
|         self.dev: str = dev | ||||
|         self.comm_device: minimalmodbus.Instrument = self.__comm_device_init() | ||||
|         self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter() | ||||
| 
 | ||||
|     def read_register(self, | ||||
|                       register_number: int, | ||||
|                       signed: bool = False, | ||||
|                       retries: int = 10) -> int: | ||||
|         '''Read Modbus input/holding register via serial device''' | ||||
|     def read_register( | ||||
|         self, register_number: int, signed: bool = False, retries: int = 10 | ||||
|     ) -> int: | ||||
|         """Read Modbus input/holding register via serial device""" | ||||
|         if register_number in Modbus.input_register_range: | ||||
|             function_code = 4 | ||||
|             register_offset = register_number - Modbus.INPUT_REGISTER_START | ||||
| @ -86,43 +84,47 @@ class ModbusRTUDevice(Device): | ||||
|         else: | ||||
|             # wrong register number | ||||
|             raise ValueError | ||||
|         for i in range(retries): | ||||
|         for _ in range(retries): | ||||
|             try: | ||||
|                 self.readout_errors.total_attempts += 1 | ||||
|                 # minimalmodbus divides received register value by 10 | ||||
|                 return self.comm_device.read_register( | ||||
|                     register_offset, | ||||
|                     1, | ||||
|                     functioncode=function_code, | ||||
|                     signed=signed | ||||
|                     ) * 10 | ||||
|             except minimalmodbus.NoResponseError as e: | ||||
|                 last_exception = e | ||||
|                 return ( | ||||
|                     self.comm_device.read_register( | ||||
|                         register_offset, 1, functioncode=function_code, signed=signed | ||||
|                     ) | ||||
|                     * 10 | ||||
|                 ) | ||||
|             except minimalmodbus.NoResponseError as exception: | ||||
|                 last_exception = exception | ||||
|                 self.readout_errors.no_response += 1 | ||||
|                 continue | ||||
|             except minimalmodbus.InvalidResponseError as e: | ||||
|                 last_exception = e | ||||
|             except minimalmodbus.InvalidResponseError as exception: | ||||
|                 last_exception = exception | ||||
|                 self.readout_errors.invalid_response += 1 | ||||
|                 continue | ||||
|         # retries failed, raise last exception to inform user | ||||
|         raise last_exception | ||||
| 
 | ||||
|     def write_register(self, | ||||
|                        register_number: int, | ||||
|                        register_value: int, | ||||
|                        retries: int = 10) -> None: | ||||
|     def write_register( | ||||
|         self, register_number: int, register_value: int, retries: int = 10 | ||||
|     ) -> None: | ||||
|         """ | ||||
|         Write to slave holding register | ||||
|         """ | ||||
|         # only holding registers can be written | ||||
|         if register_number not in Modbus.holding_register_range: | ||||
|             raise ValueError | ||||
|         register_offset = register_number - Modbus.HOLDING_REGISTER_START | ||||
|         for i in range(retries): | ||||
|         for _ in range(retries): | ||||
|             try: | ||||
|                 return self.comm_device.write_register(register_offset, | ||||
|                                                        register_value, | ||||
|                                                        functioncode=6) | ||||
|             except (minimalmodbus.NoResponseError, | ||||
|                     minimalmodbus.InvalidResponseError) as e: | ||||
|                 last_exception = e | ||||
|                 return self.comm_device.write_register( | ||||
|                     register_offset, register_value, functioncode=6 | ||||
|                 ) | ||||
|             except ( | ||||
|                 minimalmodbus.NoResponseError, | ||||
|                 minimalmodbus.InvalidResponseError, | ||||
|             ) as exception: | ||||
|                 last_exception = exception | ||||
|                 continue | ||||
|         raise last_exception | ||||
| 
 | ||||
| @ -133,12 +135,14 @@ class ModbusRTUDevice(Device): | ||||
|         return self.write_register(key, value) | ||||
| 
 | ||||
|     def reset(self) -> bool: | ||||
|         ''' | ||||
|         """ | ||||
|         Soft-reset the device | ||||
|         ''' | ||||
|         """ | ||||
|         try: | ||||
|             self.write_register(ModbusRTUDevice.holding_registers['RESET'], | ||||
|                                 ModbusRTUDevice.MAGIC_RESET_CONSTANT) | ||||
|             self.write_register( | ||||
|                 ModbusRTUDevice.holding_registers["RESET"], | ||||
|                 ModbusRTUDevice.MAGIC_RESET_CONSTANT, | ||||
|             ) | ||||
|             return False  # got answer => failed to reset | ||||
|         except minimalmodbus.NoResponseError: | ||||
|             return True  # no answer => reset successful | ||||
|  | ||||
| @ -1,61 +1,72 @@ | ||||
| from time import sleep | ||||
| from typing import Dict, Final | ||||
| from minimalmodbus import NoResponseError | ||||
| from .generic import ModbusRTUDevice | ||||
| 
 | ||||
| 
 | ||||
| class SensorWiredIAQ(ModbusRTUDevice): | ||||
|     ''' | ||||
|     """ | ||||
|     Wired sensor measuring temperature, relative humidity, | ||||
|     carbon dioxide and VOC, optionally particulate matter | ||||
|     ''' | ||||
|     IDENTIFIER: int = 0xbeef | ||||
|     """ | ||||
| 
 | ||||
|     IDENTIFIER: int = 0xBEEF | ||||
|     input_registers: Dict[str, int] = { | ||||
|             'T': 30010,    # from SHT4x | ||||
|             'T_F': 30011, | ||||
|             'RH': 30012,   # from SHT4x | ||||
|             'CO2': 30013,  # from SCD4x | ||||
|             'VOC_index': 30014, | ||||
|             'VOC_ticks': 30015, | ||||
|             'NOx_index': 30016, | ||||
|             'NOx_ticks': 30017, | ||||
|             'PM_mass_concentration_1.0': 30018, | ||||
|             'PM_mass_concentration_2.5': 30019, | ||||
|             'PM_mass_concentration_4.0': 30020, | ||||
|             'PM_mass_concentration_10.0': 30021, | ||||
|             'PM_number_concentration_0.5': 30022, | ||||
|             'PM_number_concentration_1.0': 30023, | ||||
|             'PM_number_concentration_2.5': 30024, | ||||
|             'PM_number_concentration_4.0': 30025, | ||||
|             'PM_number_concentration_10.0': 30026, | ||||
|             'PM_typical_particle_size':  30027, | ||||
|             'T_SCD4x': 30028, | ||||
|             'T_SCF4x_F': 30029, | ||||
|             'RH_SCD4x': 30030 | ||||
|             } | ModbusRTUDevice.input_registers | ||||
|         'T': 30003,  # deg C | ||||
|         'T_F': 30004,  # deg F | ||||
|         'RH': 30005,  # %, from SHT4x | ||||
|         'CO2': 30006,  # ppm | ||||
|         'VOC_INDEX': 30007,  # VOC index as calculated by Sensirion library (1 to 500, average 100) | ||||
|         'VOC_TICKS': 30008,  # raw VOC ticks | ||||
|         'PMC_MASS_1_0': 30009,  # ug / m^3 | ||||
|         'PMC_MASS_2_5': 30010,  # ug / m^3 | ||||
|         'PMC_MASS_4_0': 30011,  # ug / m^3 | ||||
|         'PMC_MASS_10_0': 30012,  # ug / m^3 | ||||
|         'PMC_NUMBER_0_5': 30013,  # 1 / m^3 | ||||
|         'PMC_NUMBER_1_0': 30014,  # 1 / m^3 | ||||
|         'PMC_NUMBER_2_5': 30015,  # 1 / m^3 | ||||
|         'PMC_NUMBER_4_0': 30016,  # 1 / m^3 | ||||
|         'PMC_NUMBER_10_0': 30017,  # 1 / m^3 | ||||
|         'TYPICAL_PARTICLE_SIZE': 30018,  # nm | ||||
|         'T_SCD4x': 30019,  # deg C | ||||
|         'T_SCD4x_F': 30020,  # deg F | ||||
|         'RH_SCD4x': 30021  # % | ||||
|     } | ModbusRTUDevice.input_registers | ||||
|     # TODO use super, but __class__ not defined | ||||
|     holding_registers: Dict[str, int] = { | ||||
|             'LED_on': 40001, | ||||
|             'LED_brightness': 40002, | ||||
|             'LED_smooth': 40003, | ||||
|             'CO2_alert_limit_1': 40004, | ||||
|             'CO2_alert_limit_2': 40005, | ||||
|             'SCD4x_temperature_offset': 40006, | ||||
|             'MODBUS_address': 40007, | ||||
|             'baudrate': 40008 | ||||
|             } | ModbusRTUDevice.holding_registers | ||||
|         'MODBUS_ADDR': 40001, | ||||
|         'BAUDRATE': 40002, | ||||
|         'LED_ON': 40003, | ||||
|         'LED_BRIGHTNESS': 40004, | ||||
|         'LED_SMOOTH': 40005, | ||||
|         'CO2_ALERT_LIMIT1': 40006, | ||||
|         'CO2_ALERT_LIMIT2': 40007, | ||||
|         'SCD4x_T_OFFSET': 40008 | ||||
|     } | ModbusRTUDevice.holding_registers | ||||
|     RESET_MAGIC_NUMBER: Final[int] = 0xABCD | ||||
| 
 | ||||
|     @property | ||||
|     def CO2(self): | ||||
|         return int(self.read_register(self.input_registers['CO2'])) | ||||
|     def CO2(self) -> int: | ||||
|         return int(self.read_register(self.input_registers["CO2"])) | ||||
| 
 | ||||
|     @property | ||||
|     def T(self): | ||||
|     def T(self) -> float: | ||||
|         # TODO maybe use signed version? | ||||
|         return self.read_register(self.input_registers['T'], | ||||
|                                   signed=True | ||||
|                                   ) / 10 | ||||
|         return self.read_register(self.input_registers["T"], signed=True) / 10 | ||||
| 
 | ||||
|     @property | ||||
|     def RH(self): | ||||
|         return self.read_register(self.input_registers['RH']) | ||||
|     def RH(self) -> float: | ||||
|         return self.read_register(self.input_registers["RH"]) | ||||
| 
 | ||||
|     @property | ||||
|     def LED(self) -> int: | ||||
|         return int(self.read_register(self.holding_registers["LED_brightness"])) | ||||
| 
 | ||||
|     @LED.setter | ||||
|     def LED(self, value: int): | ||||
|         if value == 0: | ||||
|             self.write_register(self.holding_registers["LED_on"], 0) | ||||
|         else: | ||||
|             self.write_register(self.holding_registers["LED_brightness"], value) | ||||
|             sleep(0.1) | ||||
|             self.write_register(self.holding_registers["LED_on"], 1) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 David Žaitlík
						David Žaitlík