Modified measure script for mass testing

This commit is contained in:
mj 2022-02-20 16:22:31 +01:00
parent f2ebe11354
commit 5a756e47c1
2 changed files with 58 additions and 39 deletions

16
tests/measure.py Normal file → Executable file
View File

@ -1,17 +1,23 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import minimalmodbus
import sensor import sensor
import time import time
addr = 247 addr = [103, 104, 105, 106, 107, 108, 110]
baud = 19200 baud = 19200
output_file = 'log' output_file = 'log'
s = sensor.Sensor(address=addr, baudrate=baud)
while True: while True:
with open('log','a+') as f: for a in addr:
s = sensor.Sensor(address=a, baudrate=baud)
values = f'{time.time():.2f} ' values = f'{time.time():.2f} '
for reg_name, reg_number in sensor.Sensor.input_registers.items(): for reg_name, reg_number in sensor.Sensor.input_registers.items():
values += f'{int(s.read_register(reg_number))} ' # print(f'reading {reg_name} {reg_number}')
try:
values += f'{int(s.read_register(reg_number))} '
except minimalmodbus.IllegalRequestError:
pass
values += '\n' values += '\n'
f.write(values) with open(f'log_{a}', 'a+') as f:
f.write(values)
time.sleep(10) time.sleep(10)

View File

@ -2,55 +2,57 @@ import minimalmodbus
import serial import serial
import time import time
class Sensor():
class Sensor:
# MODBUS constants # MODBUS constants
holding_register_start = 40001 holding_register_start = 40001
holding_register_end = 49999 holding_register_end = 49999
input_register_start = 30001 input_register_start = 30001
input_register_end = 39999 input_register_end = 39999
# Sensor specific config # Sensor specific config
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] # allowed baudrates baudrates = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
input_registers = { \ input_registers = {
'CO2': 30010, \ 'CO2': 30010,
'T_SHT4x': 30011, \ 'T_SHT4x': 30011,
'RH_SHT4x': 30012, \ 'RH_SHT4x': 30012,
'T_SCD4x': 30013, \ 'T_SCD4x': 30013,
'RH_SCD4x': 30014, \ 'RH_SCD4x': 30014,
'T_SHT4x_signed': 30015, \ 'T_SHT4x_signed': 30015,
'T_SCD4x_signed': 30016, \ 'T_SCD4x_signed': 30016,
'PM_mass_concentration_1.0': 30017, \ 'PM_mass_concentration_1.0': 30017,
'PM_mass_concentration_2.5': 30018, \ 'PM_mass_concentration_2.5': 30018,
'PM_mass_concentration_4.0': 30019, \ 'PM_mass_concentration_4.0': 30019,
'PM_mass_concentration_10.0': 30020, \ 'PM_mass_concentration_10.0': 30020,
'PM_number_concentration_0.5': 30021, \ 'PM_number_concentration_0.5': 30021,
'PM_number_concentration_1.0': 30022, \ 'PM_number_concentration_1.0': 30022,
'PM_number_concentration_2.5': 30023, \ 'PM_number_concentration_2.5': 30023,
'PM_number_concentration_4.0': 30024, \ 'PM_number_concentration_4.0': 30024,
'PM_number_concentration_10.0': 30025, \ 'PM_number_concentration_10.0': 30025,
'PM_typical_particle_size': 30026, \ 'PM_typical_particle_size': 30026,
'VOC_ticks': 30027, \ 'VOC_ticks': 30027,
'VOC_index': 30028} 'VOC_index': 30028}
holding_registers = { \ holding_registers = {
'LED_on': 40001, \ 'LED_on': 40001,
'LED_brightness': 40002, \ 'LED_brightness': 40002,
'LED_smooth': 40003, \ 'LED_smooth': 40003,
'CO2_alert_limit_1': 40004, \ 'CO2_alert_limit_1': 40004,
'CO2_alert_limit_2': 40005, \ 'CO2_alert_limit_2': 40005,
'SCD4x_temperature_offset': 40006, \ 'SCD4x_temperature_offset': 40006,
'MODBUS_address': 40007, \ 'MODBUS_address': 40007,
'baudrate': 40008 } 'baudrate': 40008}
reset_register = 40100 reset_register = 40100
reset_magic_number = 0xABCD reset_magic_number = 0xABCD
# readout and error counters # readout and error counters
readout_total = 0 readout_total = 0
readout_error_invalid_response = 0 # checksum error: bus transmission corrupted? readout_error_invalid_response = 0 # checksum error: bus transmission corrupted?
readout_error_no_response = 0 # no response - sensor device was busy readout_error_no_response = 0 # no response - sensor device was busy
# methods
def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'): def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'):
self.dev_file = dev_file self.dev_file = dev_file
self.address = address self.address = address
self.baudrate = baudrate self.baudrate = baudrate
self.open() self.open()
def open(self): def open(self):
self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True) self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True)
self.serial.serial.baudrate = self.baudrate self.serial.serial.baudrate = self.baudrate
@ -60,45 +62,56 @@ class Sensor():
self.serial.serial.timeout = 0.05 # seconds self.serial.serial.timeout = 0.05 # seconds
self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
self.serial.clear_buffers_before_each_transaction = True self.serial.clear_buffers_before_each_transaction = True
def close(self): def close(self):
self.serial.serial.close() self.serial.serial.close()
self.serial = None self.serial = None
def reset(self): def reset(self):
try: try:
self.write_register(self.reset_register, self.reset_magic_number) self.write_register(self.reset_register, self.reset_magic_number)
return False # got answer => failed to reset return False # got answer => failed to reset
except minimalmodbus.NoResponseError: except minimalmodbus.NoResponseError:
return True # no answer => reset successful return True # no answer => reset successful
# High level read functions
@property @property
def CO2(self): def CO2(self):
return int(self.read_register(self.input_registers['CO2'])) return int(self.read_register(self.input_registers['CO2']))
@property @property
def T(self): def T(self):
# TODO maybe use rather signed version? # TODO maybe use rather signed version?
return self.T_SHT4x return self.T_SHT4x
@property @property
def RH(self): def RH(self):
return self.RH_SHT4x return self.RH_SHT4x
@property @property
def T_SHT4x(self): def T_SHT4x(self):
return self.read_register(self.input_registers['T_SHT4x']) / 10 return self.read_register(self.input_registers['T_SHT4x']) / 10
@property @property
def T_SHT4x_signed(self): def T_SHT4x_signed(self):
return self.read_register(self.input_registers['T_SHT4x_signed'], signed=True) / 10 return self.read_register(self.input_registers['T_SHT4x_signed'], signed=True) / 10
@property @property
def RH_SHT4x(self): def RH_SHT4x(self):
return self.read_register(self.input_registers['RH_SHT4x']) return self.read_register(self.input_registers['RH_SHT4x'])
@property @property
def T_SCD4x(self): def T_SCD4x(self):
return self.read_register(self.input_registers['T_SCD4x']) / 10 return self.read_register(self.input_registers['T_SCD4x']) / 10
@property @property
def T_SCD4x_signed(self): def T_SCD4x_signed(self):
return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10 return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10
@property @property
def RH_SCD4x(self): def RH_SCD4x(self):
return self.read_register(self.input_registers['RH_SCD4x']) return self.read_register(self.input_registers['RH_SCD4x'])
# generic read register function
def read_register(self, register_number, retries=10): def read_register(self, register_number, retries=10):
if self.input_register_start <= register_number <= self.input_register_end: if self.input_register_start <= register_number <= self.input_register_end:
function_code = 4 function_code = 4
@ -125,7 +138,7 @@ class Sensor():
continue continue
# retries failed, raise last exception to inform user # retries failed, raise last exception to inform user
raise last_exception raise last_exception
# generic write register function
def write_register(self, register_number, register_value, retries=10): def write_register(self, register_number, register_value, retries=10):
if not self.holding_register_start <= register_number <= self.holding_register_end: if not self.holding_register_start <= register_number <= self.holding_register_end:
raise ValueError raise ValueError