forked from veles_labs/pyveles
		
	Changed import structure, fixed registers
This commit is contained in:
		
							parent
							
								
									456c7a9880
								
							
						
					
					
						commit
						fc8f5413df
					
				| @ -0,0 +1 @@ | ||||
| from . import devices | ||||
| @ -0,0 +1,2 @@ | ||||
| from .sensor_wired_IAQ import SensorWiredIAQ | ||||
| from .sensor_wired_RHT import SensorWiredRHT | ||||
| @ -17,7 +17,7 @@ class Modbus: | ||||
|     holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| @dataclass(slots=True) | ||||
| class ReadoutErrorCounter: | ||||
|     """Class used to track readout errors""" | ||||
| 
 | ||||
| @ -39,7 +39,8 @@ class ModbusRTUDevice(Device): | ||||
| 
 | ||||
|     # Reflects array fw/Core/Src/config.c:config_baudrates[] | ||||
|     BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] | ||||
| 
 | ||||
|     # magic constant for resetting: common to all Modbus RTU devices | ||||
|     MAGIC_RESET_CONSTANT: Final[int] = 0xABCD | ||||
|     # registers common to all Modbus RTU devices | ||||
|     input_registers: Dict[str, int] = { | ||||
|         "SERIAL_NUMBER_1": 30001, | ||||
| @ -47,9 +48,6 @@ class ModbusRTUDevice(Device): | ||||
|     } | ||||
|     holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999} | ||||
| 
 | ||||
|     # magic constant for resetting: common to all Modbus RTU devices | ||||
|     MAGIC_RESET_CONSTANT: Final[int] = 0xABCD | ||||
| 
 | ||||
|     def __comm_device_init(self) -> minimalmodbus.Instrument: | ||||
|         comm_device = minimalmodbus.Instrument( | ||||
|             self.dev, self.modbus_address, close_port_after_each_call=True | ||||
| @ -91,8 +89,7 @@ class ModbusRTUDevice(Device): | ||||
|                 return ( | ||||
|                     self.comm_device.read_register( | ||||
|                         register_offset, 1, functioncode=function_code, signed=signed | ||||
|                     ) | ||||
|                     * 10 | ||||
|                     ) * 10 | ||||
|                 ) | ||||
|             except minimalmodbus.NoResponseError as exception: | ||||
|                 last_exception = exception | ||||
| @ -146,3 +143,11 @@ class ModbusRTUDevice(Device): | ||||
|             return False  # got answer => failed to reset | ||||
|         except minimalmodbus.NoResponseError: | ||||
|             return True  # no answer => reset successful | ||||
| 
 | ||||
|     @property | ||||
|     def device_code(self) -> int: | ||||
|         """ | ||||
|         Return device code. This can be matched to DEVICE_CODE | ||||
|         in child classes. | ||||
|         """ | ||||
|         return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"])) | ||||
|  | ||||
| @ -1,6 +1,5 @@ | ||||
| from time import sleep | ||||
| from typing import Dict, Final | ||||
| from minimalmodbus import NoResponseError | ||||
| from .generic import ModbusRTUDevice | ||||
| 
 | ||||
| 
 | ||||
| @ -10,40 +9,40 @@ class SensorWiredIAQ(ModbusRTUDevice): | ||||
|     carbon dioxide and VOC, optionally particulate matter | ||||
|     """ | ||||
| 
 | ||||
|     IDENTIFIER: int = 0xBEEF | ||||
|     DEVICE_CLASS: Final[str] = "IAQ_Wired" | ||||
|     DEVICE_CODE: Final[int] = 0x0010 | ||||
| 
 | ||||
|     input_registers: Dict[str, int] = { | ||||
|         'T': 30003,  # deg C | ||||
|         'T_F': 30004,  # deg F | ||||
|         'RH': 30005,  # %, from SHT4x | ||||
|         'CO2': 30006,  # ppm | ||||
|         'VOC_INDEX': 30007,  # VOC index as calculated by Sensirion library (1 to 500, average 100) | ||||
|         'VOC_TICKS': 30008,  # raw VOC ticks | ||||
|         'PMC_MASS_1_0': 30009,  # ug / m^3 | ||||
|         'PMC_MASS_2_5': 30010,  # ug / m^3 | ||||
|         'PMC_MASS_4_0': 30011,  # ug / m^3 | ||||
|         'PMC_MASS_10_0': 30012,  # ug / m^3 | ||||
|         'PMC_NUMBER_0_5': 30013,  # 1 / m^3 | ||||
|         'PMC_NUMBER_1_0': 30014,  # 1 / m^3 | ||||
|         'PMC_NUMBER_2_5': 30015,  # 1 / m^3 | ||||
|         'PMC_NUMBER_4_0': 30016,  # 1 / m^3 | ||||
|         'PMC_NUMBER_10_0': 30017,  # 1 / m^3 | ||||
|         'TYPICAL_PARTICLE_SIZE': 30018,  # nm | ||||
|         'T_SCD4x': 30019,  # deg C | ||||
|         'T_SCD4x_F': 30020,  # deg F | ||||
|         'RH_SCD4x': 30021  # % | ||||
|         "T": 30003,  # deg C | ||||
|         "T_F": 30004,  # deg F | ||||
|         "RH": 30005,  # %, from SHT4x | ||||
|         "CO2": 30006,  # ppm | ||||
|         "VOC_INDEX": 30007,  # VOC index as calculated by Sensirion library (1 to 500, average 100) | ||||
|         "VOC_TICKS": 30008,  # raw VOC ticks | ||||
|         "PMC_MASS_1_0": 30009,  # ug / m^3 | ||||
|         "PMC_MASS_2_5": 30010,  # ug / m^3 | ||||
|         "PMC_MASS_4_0": 30011,  # ug / m^3 | ||||
|         "PMC_MASS_10_0": 30012,  # ug / m^3 | ||||
|         "PMC_NUMBER_0_5": 30013,  # 1 / m^3 | ||||
|         "PMC_NUMBER_1_0": 30014,  # 1 / m^3 | ||||
|         "PMC_NUMBER_2_5": 30015,  # 1 / m^3 | ||||
|         "PMC_NUMBER_4_0": 30016,  # 1 / m^3 | ||||
|         "PMC_NUMBER_10_0": 30017,  # 1 / m^3 | ||||
|         "TYPICAL_PARTICLE_SIZE": 30018,  # nm | ||||
|         "T_SCD4x": 30019,  # deg C | ||||
|         "T_SCD4x_F": 30020,  # deg F | ||||
|         "RH_SCD4x": 30021  # % | ||||
|     } | ModbusRTUDevice.input_registers | ||||
|     # TODO use super, but __class__ not defined | ||||
|     holding_registers: Dict[str, int] = { | ||||
|         'MODBUS_ADDR': 40001, | ||||
|         'BAUDRATE': 40002, | ||||
|         'LED_ON': 40003, | ||||
|         'LED_BRIGHTNESS': 40004, | ||||
|         'LED_SMOOTH': 40005, | ||||
|         'CO2_ALERT_LIMIT1': 40006, | ||||
|         'CO2_ALERT_LIMIT2': 40007, | ||||
|         'SCD4x_T_OFFSET': 40008 | ||||
|         "MODBUS_ADDR": 40001, | ||||
|         "BAUDRATE": 40002, | ||||
|         "LED_ON": 40003, | ||||
|         "LED_BRIGHTNESS": 40004, | ||||
|         "LED_SMOOTH": 40005, | ||||
|         "CO2_ALERT_LIMIT1": 40006, | ||||
|         "CO2_ALERT_LIMIT2": 40007, | ||||
|         "SCD4x_T_OFFSET": 40008 | ||||
|     } | ModbusRTUDevice.holding_registers | ||||
|     RESET_MAGIC_NUMBER: Final[int] = 0xABCD | ||||
| 
 | ||||
|     @property | ||||
|     def CO2(self) -> int: | ||||
| @ -51,22 +50,26 @@ class SensorWiredIAQ(ModbusRTUDevice): | ||||
| 
 | ||||
|     @property | ||||
|     def T(self) -> float: | ||||
|         # TODO maybe use signed version? | ||||
|         return self.read_register(self.input_registers["T"], signed=True) / 10 | ||||
| 
 | ||||
|     @property | ||||
|     def RH(self) -> float: | ||||
|         return self.read_register(self.input_registers["RH"]) | ||||
| 
 | ||||
|     @property | ||||
|     def VOC(self): | ||||
|         return self.read_register(self.input_registers["VOC_INDEX"]) | ||||
| 
 | ||||
|     @property | ||||
|     def LED(self) -> int: | ||||
|         return int(self.read_register(self.holding_registers["LED_brightness"])) | ||||
|         return int(self.read_register(self.holding_registers["LED_BRIGHTNESS"])) | ||||
| 
 | ||||
|     @LED.setter | ||||
|     def LED(self, value: int): | ||||
|         if value == 0: | ||||
|             self.write_register(self.holding_registers["LED_on"], 0) | ||||
|             self.write_register(self.holding_registers["LED_ON"], 0) | ||||
|         else: | ||||
|             self.write_register(self.holding_registers["LED_brightness"], value) | ||||
|             self.write_register(self.holding_registers["LED_BRIGHTNESS"], value) | ||||
|             sleep(0.1) | ||||
|             self.write_register(self.holding_registers["LED_on"], 1) | ||||
|             self.write_register(self.holding_registers["LED_ON"], 1) | ||||
| 
 | ||||
|  | ||||
| @ -4,44 +4,42 @@ from .generic import ModbusRTUDevice | ||||
| 
 | ||||
| 
 | ||||
| class SensorWiredRHT(ModbusRTUDevice): | ||||
|     ''' | ||||
|     """ | ||||
|     Wired sensor measuring temperature, relative humidity | ||||
|     and light intensity. | ||||
|     ''' | ||||
|     IDENTIFIER: int = 0xb00b | ||||
|     """ | ||||
|      | ||||
|     DEVICE_CLASS: Final[str] = "RHT_Wired" | ||||
|     DEVICE_CODE: Final[int] = 0x0020 | ||||
| 
 | ||||
|     input_registers: Dict[str, int] = { | ||||
|             'SER_NUM_1': 30001, | ||||
|             'SER_NUM_2': 30002, | ||||
|             'T': 30003,    # from SHT4x | ||||
|             'T_F': 30004, | ||||
|             'RH': 30005,   # from SHT4x | ||||
|             'LIGHT_INTENSITY_1': 30006, | ||||
|             'LIGHT_INTENSITY_2': 30007, | ||||
|             'ERROR_T_RH': 30008, | ||||
|             'ERROR_LIGHT': 30009 | ||||
|             } | ModbusRTUDevice.input_registers | ||||
|     # TODO use super, but __class__ not defined | ||||
|         "SER_NUM_1": 30001, | ||||
|         "SER_NUM_2": 30002, | ||||
|         "T": 30003,  # from SHT4x | ||||
|         "T_F": 30004, | ||||
|         "RH": 30005,  # from SHT4x | ||||
|         "LIGHT_INTENSITY_1": 30006, | ||||
|         "LIGHT_INTENSITY_2": 30007, | ||||
|         "ERROR_T_RH": 30008, | ||||
|         "ERROR_LIGHT": 30009, | ||||
|     } | ModbusRTUDevice.input_registers | ||||
|     holding_registers: Dict[str, int] = { | ||||
|             'MODBUS_address': 40001, | ||||
|             'baudrate': 40002, | ||||
|             'LTR329_GAIN': 40003, | ||||
|             'LTR329_MEAS_RATE': 40004, | ||||
|             'LTR329_INTEGRATION_TIME': 40005, | ||||
|             'LTR329_MODE': 40006 | ||||
|             } | ModbusRTUDevice.holding_registers | ||||
|     RESET_MAGIC_NUMBER: Final[int] = 0xABCD | ||||
|         "MODBUS_ADDR": 40001, | ||||
|         "BAUDRATE": 40002, | ||||
|         "LTR329_GAIN": 40003, | ||||
|         "LTR329_MEAS_RATE": 40004, | ||||
|         "LTR329_INTEGRATION_TIME": 40005, | ||||
|         "LTR329_MODE": 40006, | ||||
|     } | ModbusRTUDevice.holding_registers | ||||
| 
 | ||||
|     @property | ||||
|     def CO2(self): | ||||
|         return int(self.read_register(self.input_registers['CO2'])) | ||||
|         return int(self.read_register(self.input_registers["CO2"])) | ||||
| 
 | ||||
|     @property | ||||
|     def T(self): | ||||
|         # TODO maybe use signed version? | ||||
|         return self.read_register(self.input_registers['T'], | ||||
|                                   signed=True | ||||
|                                   ) / 10 | ||||
|         return self.read_register(self.input_registers["T"], signed=True) / 10 | ||||
| 
 | ||||
|     @property | ||||
|     def RH(self): | ||||
|         return self.read_register(self.input_registers['RH']) | ||||
|         return self.read_register(self.input_registers["RH"]) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Jan Mrna
						Jan Mrna