WIP python scripts

This commit is contained in:
mj 2021-11-27 16:07:06 +01:00
parent 3d2dc18d34
commit 1de3c05fea
3 changed files with 69 additions and 50 deletions

View File

@ -67,14 +67,13 @@ print(f'Baudrates: {baud}')
print('---- Looking for device ----') print('---- Looking for device ----')
total_devices = 0 total_devices = 0
tried_devices = 0 tried_devices = 0
CO2_offset = Sensor.input_register_offset['CO2']
for a in addr: for a in addr:
for b in baud: for b in baud:
print(f'Address {a : >3} baud {b : >6}: ', end='') print(f'Address {a : >3} baud {b : >6}: ', end='')
try: try:
s = Sensor(address=a, baudrate=b) s = Sensor(address=a, baudrate=b)
reg_number = Sensor.input_register_start + Sensor.input_register_offset['CO2'] reg_number = Sensor.input_register['CO2']
s.read_register(reg_number) s.read_register(reg_number, retries=1)
print('DEVICE RESPONDED') print('DEVICE RESPONDED')
total_devices += 1 total_devices += 1
except minimalmodbus.NoResponseError: except minimalmodbus.NoResponseError:

View File

@ -90,8 +90,8 @@ if action == 'write' and len(register_number) + len(register_name) != 1:
if not baudrate: if not baudrate:
baudrate = DEFAULT_BAUDRATE baudrate = DEFAULT_BAUDRATE
if action != 'write' and len(register_name) + len(register_number) == 0: if action != 'write' and len(register_name) + len(register_number) == 0:
input_registers = [ x for x in Sensor.input_register_offset.keys() ] input_registers = [ x for x in Sensor.input_register.keys() ]
holding_registers = [ x for x in Sensor.holding_register_offset.keys() ] holding_registers = [ x for x in Sensor.holding_register.keys() ]
register_name = input_registers + holding_registers register_name = input_registers + holding_registers
if action != 'write' and addr == 0: if action != 'write' and addr == 0:
print(f'Cannot broadcast action "{action}"') print(f'Cannot broadcast action "{action}"')
@ -109,10 +109,10 @@ if action == 'read' or action == 'all':
for register in register_name + register_number: for register in register_name + register_number:
if isinstance(register, str): if isinstance(register, str):
reg_name = register reg_name = register
if register in Sensor.input_register_offset: all_registers = Sensor.input_register.copy()
reg_number = Sensor.input_register_start + Sensor.input_register_offset[register] all_registers.update(Sensor.holding_register)
elif register in Sensor.holding_register_offset: if reg_name in all_registers:
reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register] reg_number = all_registers[reg_name]
else: else:
print(f'Register name {register} not known') print(f'Register name {register} not known')
exit(-7) exit(-7)
@ -127,10 +127,10 @@ if action == 'read' or action == 'all':
print(f'{reg_number : <10} {int(result) : <10} {reg_name}') print(f'{reg_number : <10} {int(result) : <10} {reg_name}')
elif action == 'write': elif action == 'write':
if len(register_name) > 0: if len(register_name) > 0:
if register_name[0] not in Sensor.holding_register_offset: if register_name[0] not in Sensor.holding_register:
print(f'Register {register_name[0]} does not exist or is not holding register') print(f'Register {register_name[0]} does not exist or is not holding register')
exit(-9) exit(-9)
reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register_name[0]] reg_number = Sensor.holding_register[register_name[0]]
elif len(register_number) > 0: elif len(register_number) > 0:
reg_number = register_number[0] reg_number = register_number[0]
print('---- Register write ----') print('---- Register write ----')

View File

@ -3,28 +3,35 @@ import serial
import time import time
class Sensor(): class Sensor():
# MODBUS constants
holding_register_start = 40001 holding_register_start = 40001
holding_register_end = 49999 holding_register_end = 49999
input_register_start = 30001 input_register_start = 30001
input_register_end = 39999 input_register_end = 39999
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] # Sensor specific config
input_register_offset = { \ baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] # allowed baudrates
'CO2': 9, \ input_register = { \
'T_SHT4x': 10, \ 'CO2': 30010, \
'RH_SHT4x': 11, \ 'T_SHT4x': 30011, \
'T_SCD4x': 12, \ 'RH_SHT4x': 30012, \
'RH_SCD4x': 13, \ 'T_SCD4x': 30013, \
'T_SHT4x_signed': 14, \ 'RH_SCD4x': 30014, \
'T_SCD4x_signed': 15 } 'T_SHT4x_signed': 30015, \
holding_register_offset = { \ 'T_SCD4x_signed': 30016 }
'LED_on_register': 0, \ holding_register = { \
'LED_brightness_register': 1, \ 'LED_on': 40001, \
'LED_smooth_register': 2, \ 'LED_brightness': 40002, \
'CO2_alert_limit_1_register': 3, \ 'LED_smooth': 40003, \
'CO2_alert_limit_2_register': 4, \ 'CO2_alert_limit_1': 40004, \
'SCD4x_temperature_offset_register': 5, \ 'CO2_alert_limit_2': 40005, \
'MODBUS_address_register': 6, \ 'SCD4x_temperature_offset': 40006, \
'baudrate_register': 7 } 'MODBUS_address': 40007, \
'baudrate': 40008 }
# readout and error counters
readout_total = 0
readout_error_invalid_response = 0 # checksum error: bus transmission corrupted?
readout_error_no_response = 0 # no response - sensor device was busy
# methods
def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200): def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200):
self.dev_file = dev_file self.dev_file = dev_file
self.address = address self.address = address
@ -45,7 +52,7 @@ class Sensor():
# High level read functions # High level read functions
@property @property
def CO2(self): def CO2(self):
return self.read_input_register(self.input_register_offset['CO2'])*10 return int(self.read_register(self.input_register['CO2']))
@property @property
def T(self): def T(self):
# TODO maybe use rather signed version? # TODO maybe use rather signed version?
@ -55,33 +62,24 @@ class Sensor():
return self.RH_SHT4x return self.RH_SHT4x
@property @property
def T_SHT4x(self): def T_SHT4x(self):
return self.read_input_register(self.input_register_offset['T_SHT4x']) return self.read_register(self.input_register['T_SHT4x']) / 10
@property @property
def T_SHT4x_signed(self): def T_SHT4x_signed(self):
return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True) return self.read_register(self.input_register['T_SHT4x_signed'], signed=True) / 10
@property @property
def RH_SHT4x(self): def RH_SHT4x(self):
return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10 return self.read_register(self.input_register['RH_SHT4x'])
@property @property
def T_SCD4x(self): def T_SCD4x(self):
return self.read_input_register(self.input_register_offset['T_SCD4x']) return self.read_register(self.input_register['T_SCD4x']) / 10
@property @property
def T_SCD4x_signed(self): def T_SCD4x_signed(self):
return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True) return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10
@property @property
def RH_SCD4x(self): def RH_SCD4x(self):
return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10 return self.read_register(self.input_register['RH_SCD4x'])
def read_input_register(self, register_offset):
retries = 10
while retries:
try:
return self.serial.read_register(register_offset, 1, functioncode=4)
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
retries -= 1
continue
raise e
# generic read register function # generic read register function
def read_register(self, register_number): def read_register(self, register_number, retries=10):
if self.input_register_start <= register_number <= self.input_register_end: if self.input_register_start <= register_number <= self.input_register_end:
function_code = 4 function_code = 4
register_offset = register_number - self.input_register_start register_offset = register_number - self.input_register_start
@ -91,10 +89,32 @@ class Sensor():
else: else:
# wrong register number # wrong register number
raise ValueError raise ValueError
while retries:
retries -= 1
try:
self.readout_total += 1
# minimalmodbus divides received register value by 10
return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10 return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10
def write_register(self, register_number, register_value): except minimalmodbus.NoResponseError as e:
last_exception = e
self.readout_error_no_response += 1
continue
except minimalmodbus.InvalidResponseError as e:
last_exception = e
self.readout_error_invalid_response += 1
continue
# retries failed, raise last exception to inform user
raise last_exception
# generic write register function
def write_register(self, register_number, register_value, retries=10):
if not self.holding_register_start <= register_number <= self.holding_register_end: if not self.holding_register_start <= register_number <= self.holding_register_end:
raise ValueError raise ValueError
register_offset = register_number - self.holding_register_start register_offset = register_number - self.holding_register_start
while retries:
retries -= 1
try:
return self.serial.write_register(register_offset, register_value, functioncode=6) return self.serial.write_register(register_offset, register_value, functioncode=6)
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
last_exception = e
continue
raise last_exception