diff --git a/tests/find_device.py b/tests/find_device.py index 8ea55f2..16fdb7c 100755 --- a/tests/find_device.py +++ b/tests/find_device.py @@ -67,14 +67,13 @@ print(f'Baudrates: {baud}') print('---- Looking for device ----') total_devices = 0 tried_devices = 0 -CO2_offset = Sensor.input_register_offset['CO2'] for a in addr: for b in baud: print(f'Address {a : >3} baud {b : >6}: ', end='') try: s = Sensor(address=a, baudrate=b) - reg_number = Sensor.input_register_start + Sensor.input_register_offset['CO2'] - s.read_register(reg_number) + reg_number = Sensor.input_register['CO2'] + s.read_register(reg_number, retries=1) print('DEVICE RESPONDED') total_devices += 1 except minimalmodbus.NoResponseError: diff --git a/tests/query_device.py b/tests/query_device.py index 40aae18..22c87cb 100755 --- a/tests/query_device.py +++ b/tests/query_device.py @@ -90,8 +90,8 @@ if action == 'write' and len(register_number) + len(register_name) != 1: if not baudrate: baudrate = DEFAULT_BAUDRATE if action != 'write' and len(register_name) + len(register_number) == 0: - input_registers = [ x for x in Sensor.input_register_offset.keys() ] - holding_registers = [ x for x in Sensor.holding_register_offset.keys() ] + input_registers = [ x for x in Sensor.input_register.keys() ] + holding_registers = [ x for x in Sensor.holding_register.keys() ] register_name = input_registers + holding_registers if action != 'write' and addr == 0: print(f'Cannot broadcast action "{action}"') @@ -109,10 +109,10 @@ if action == 'read' or action == 'all': for register in register_name + register_number: if isinstance(register, str): reg_name = register - if register in Sensor.input_register_offset: - reg_number = Sensor.input_register_start + Sensor.input_register_offset[register] - elif register in Sensor.holding_register_offset: - reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register] + all_registers = Sensor.input_register.copy() + all_registers.update(Sensor.holding_register) + if reg_name in all_registers: + reg_number = all_registers[reg_name] else: print(f'Register name {register} not known') exit(-7) @@ -127,10 +127,10 @@ if action == 'read' or action == 'all': print(f'{reg_number : <10} {int(result) : <10} {reg_name}') elif action == 'write': if len(register_name) > 0: - if register_name[0] not in Sensor.holding_register_offset: + if register_name[0] not in Sensor.holding_register: print(f'Register {register_name[0]} does not exist or is not holding register') exit(-9) - reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register_name[0]] + reg_number = Sensor.holding_register[register_name[0]] elif len(register_number) > 0: reg_number = register_number[0] print('---- Register write ----') diff --git a/tests/sensor.py b/tests/sensor.py index 76b46fc..5a4dcae 100644 --- a/tests/sensor.py +++ b/tests/sensor.py @@ -3,28 +3,35 @@ import serial import time class Sensor(): + # MODBUS constants holding_register_start = 40001 holding_register_end = 49999 input_register_start = 30001 input_register_end = 39999 - baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] - input_register_offset = { \ - 'CO2': 9, \ - 'T_SHT4x': 10, \ - 'RH_SHT4x': 11, \ - 'T_SCD4x': 12, \ - 'RH_SCD4x': 13, \ - 'T_SHT4x_signed': 14, \ - 'T_SCD4x_signed': 15 } - holding_register_offset = { \ - 'LED_on_register': 0, \ - 'LED_brightness_register': 1, \ - 'LED_smooth_register': 2, \ - 'CO2_alert_limit_1_register': 3, \ - 'CO2_alert_limit_2_register': 4, \ - 'SCD4x_temperature_offset_register': 5, \ - 'MODBUS_address_register': 6, \ - 'baudrate_register': 7 } + # Sensor specific config + baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] # allowed baudrates + input_register = { \ + 'CO2': 30010, \ + 'T_SHT4x': 30011, \ + 'RH_SHT4x': 30012, \ + 'T_SCD4x': 30013, \ + 'RH_SCD4x': 30014, \ + 'T_SHT4x_signed': 30015, \ + 'T_SCD4x_signed': 30016 } + holding_register = { \ + 'LED_on': 40001, \ + 'LED_brightness': 40002, \ + 'LED_smooth': 40003, \ + 'CO2_alert_limit_1': 40004, \ + 'CO2_alert_limit_2': 40005, \ + 'SCD4x_temperature_offset': 40006, \ + 'MODBUS_address': 40007, \ + 'baudrate': 40008 } + # readout and error counters + readout_total = 0 + readout_error_invalid_response = 0 # checksum error: bus transmission corrupted? + readout_error_no_response = 0 # no response - sensor device was busy + # methods def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200): self.dev_file = dev_file self.address = address @@ -45,7 +52,7 @@ class Sensor(): # High level read functions @property def CO2(self): - return self.read_input_register(self.input_register_offset['CO2'])*10 + return int(self.read_register(self.input_register['CO2'])) @property def T(self): # TODO maybe use rather signed version? @@ -55,33 +62,24 @@ class Sensor(): return self.RH_SHT4x @property def T_SHT4x(self): - return self.read_input_register(self.input_register_offset['T_SHT4x']) + return self.read_register(self.input_register['T_SHT4x']) / 10 @property def T_SHT4x_signed(self): - return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True) + return self.read_register(self.input_register['T_SHT4x_signed'], signed=True) / 10 @property def RH_SHT4x(self): - return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10 + return self.read_register(self.input_register['RH_SHT4x']) @property def T_SCD4x(self): - return self.read_input_register(self.input_register_offset['T_SCD4x']) + return self.read_register(self.input_register['T_SCD4x']) / 10 @property def T_SCD4x_signed(self): - return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True) + return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10 @property def RH_SCD4x(self): - return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10 - def read_input_register(self, register_offset): - retries = 10 - while retries: - try: - return self.serial.read_register(register_offset, 1, functioncode=4) - except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e: - retries -= 1 - continue - raise e + return self.read_register(self.input_register['RH_SCD4x']) # generic read register function - def read_register(self, register_number): + def read_register(self, register_number, retries=10): if self.input_register_start <= register_number <= self.input_register_end: function_code = 4 register_offset = register_number - self.input_register_start @@ -91,10 +89,32 @@ class Sensor(): else: # wrong register number raise ValueError - return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10 - def write_register(self, register_number, register_value): + while retries: + retries -= 1 + try: + self.readout_total += 1 + # minimalmodbus divides received register value by 10 + return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10 + except minimalmodbus.NoResponseError as e: + last_exception = e + self.readout_error_no_response += 1 + continue + except minimalmodbus.InvalidResponseError as e: + last_exception = e + self.readout_error_invalid_response += 1 + continue + # retries failed, raise last exception to inform user + raise last_exception + # generic write register function + def write_register(self, register_number, register_value, retries=10): if not self.holding_register_start <= register_number <= self.holding_register_end: raise ValueError register_offset = register_number - self.holding_register_start - return self.serial.write_register(register_offset, register_value, functioncode=6) - + while retries: + retries -= 1 + try: + return self.serial.write_register(register_offset, register_value, functioncode=6) + except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e: + last_exception = e + continue + raise last_exception