iaq_sensor_wired/tests/sensor.py

156 lines
5.6 KiB
Python

import minimalmodbus
import serial
import time
class Sensor:
# MODBUS constants
holding_register_start = 40001
holding_register_end = 49999
input_register_start = 30001
input_register_end = 39999
# Sensor specific config
baudrates = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
input_registers = {
'T': 30010, # from SHT4x
'T_F': 30011,
'RH': 30012, # from SHT4x
'CO2': 30013, # from SCD4x
'VOC_index': 30014,
'VOC_ticks': 30015,
'NOx_index': 30016,
'NOx_ticks': 30017,
'PM_mass_concentration_1.0': 30018,
'PM_mass_concentration_2.5': 30019,
'PM_mass_concentration_4.0': 30020,
'PM_mass_concentration_10.0': 30021,
'PM_number_concentration_0.5': 30022,
'PM_number_concentration_1.0': 30023,
'PM_number_concentration_2.5': 30024,
'PM_number_concentration_4.0': 30025,
'PM_number_concentration_10.0': 30026,
'PM_typical_particle_size': 30027,
'T_SCD4x': 30028,
'T_SCF4x_F': 30029,
'RH_SCD4x': 30030}
holding_registers = {
'LED_on': 40001,
'LED_brightness': 40002,
'LED_smooth': 40003,
'CO2_alert_limit_1': 40004,
'CO2_alert_limit_2': 40005,
'SCD4x_temperature_offset': 40006,
'MODBUS_address': 40007,
'baudrate': 40008}
reset_register = 40100
reset_magic_number = 0xABCD
# readout and error counters
readout_total = 0
readout_error_invalid_response = 0 # checksum error: bus transmission corrupted?
readout_error_no_response = 0 # no response - sensor device was busy
def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'):
self.dev_file = dev_file
self.address = address
self.baudrate = baudrate
self.open()
def open(self):
self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True)
self.serial.serial.baudrate = self.baudrate
self.serial.serial.bytesize = 8
self.serial.serial.parity = serial.PARITY_EVEN
self.serial.serial.stopbits = 1
self.serial.serial.timeout = 0.05 # seconds
self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
self.serial.clear_buffers_before_each_transaction = True
def close(self):
self.serial.serial.close()
self.serial = None
def reset(self):
try:
self.write_register(self.reset_register, self.reset_magic_number)
return False # got answer => failed to reset
except minimalmodbus.NoResponseError:
return True # no answer => reset successful
@property
def CO2(self):
return int(self.read_register(self.input_registers['CO2']))
@property
def T(self):
# TODO maybe use rather signed version?
return self.T_SHT4x
@property
def RH(self):
return self.RH_SHT4x
@property
def T_SHT4x(self):
return self.read_register(self.input_registers['T_SHT4x']) / 10
@property
def T_SHT4x_signed(self):
return self.read_register(self.input_registers['T_SHT4x_signed'], signed=True) / 10
@property
def RH_SHT4x(self):
return self.read_register(self.input_registers['RH_SHT4x'])
@property
def T_SCD4x(self):
return self.read_register(self.input_registers['T_SCD4x']) / 10
@property
def T_SCD4x_signed(self):
return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10
@property
def RH_SCD4x(self):
return self.read_register(self.input_registers['RH_SCD4x'])
def read_register(self, register_number, signed=False, retries=10):
if self.input_register_start <= register_number <= self.input_register_end:
function_code = 4
register_offset = register_number - self.input_register_start
elif self.holding_register_start <= register_number <= self.holding_register_end:
function_code = 3
register_offset = register_number - self.holding_register_start
else:
# wrong register number
raise ValueError
while retries:
retries -= 1
try:
self.readout_total += 1
# minimalmodbus divides received register value by 10
return self.serial.read_register(register_offset, 1, functioncode=function_code, signed=signed) * 10
except minimalmodbus.NoResponseError as e:
last_exception = e
self.readout_error_no_response += 1
continue
except minimalmodbus.InvalidResponseError as e:
last_exception = e
self.readout_error_invalid_response += 1
continue
# retries failed, raise last exception to inform user
raise last_exception
def write_register(self, register_number, register_value, retries=10):
if not self.holding_register_start <= register_number <= self.holding_register_end:
raise ValueError
register_offset = register_number - self.holding_register_start
while retries:
retries -= 1
try:
return self.serial.write_register(register_offset, register_value, functioncode=6)
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
last_exception = e
continue
raise last_exception