153 lines
5.4 KiB
Python
153 lines
5.4 KiB
Python
import minimalmodbus
|
|
import serial
|
|
import time
|
|
|
|
|
|
class Sensor:
|
|
# MODBUS constants
|
|
holding_register_start = 40001
|
|
holding_register_end = 49999
|
|
input_register_start = 30001
|
|
input_register_end = 39999
|
|
# Sensor specific config
|
|
baudrates = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
|
|
input_registers = {
|
|
'SERIAL_NUMBER_1': 30001,
|
|
'SERIAL_NUMBER_2': 30002,
|
|
'T': 30003, # deg C
|
|
'T_F': 30004, # deg F
|
|
'RH': 30005, # %, from SHT4x
|
|
'CO2': 30006, # ppm
|
|
'VOC_INDEX': 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100)
|
|
'VOC_TICKS': 30008, # raw VOC ticks
|
|
'PMC_MASS_1_0': 30009, # ug / m^3
|
|
'PMC_MASS_2_5': 30010, # ug / m^3
|
|
'PMC_MASS_4_0': 30011, # ug / m^3
|
|
'PMC_MASS_10_0': 30012, # ug / m^3
|
|
'PMC_NUMBER_0_5': 30013, # 1 / m^3
|
|
'PMC_NUMBER_1_0': 30014, # 1 / m^3
|
|
'PMC_NUMBER_2_5': 30015, # 1 / m^3
|
|
'PMC_NUMBER_4_0': 30016, # 1 / m^3
|
|
'PMC_NUMBER_10_0': 30017, # 1 / m^3
|
|
'TYPICAL_PARTICLE_SIZE': 30018, # nm
|
|
'T_SCD4x': 30019, # deg C
|
|
'T_SCD4x_F': 30020, # deg F
|
|
'RH_SCD4x': 30021} # %
|
|
holding_registers = {
|
|
'MODBUS_ADDR': 40001,
|
|
'BAUDRATE': 40002,
|
|
'LED_ON': 40003,
|
|
'LED_BRIGHTNESS': 40004,
|
|
'LED_SMOOTH': 40005,
|
|
'CO2_ALERT_LIMIT1': 40006,
|
|
'CO2_ALERT_LIMIT2': 40007,
|
|
'SCD4x_T_OFFSET': 40008,
|
|
'RESET_DEVICE': 49999}
|
|
reset_register = 40100
|
|
reset_magic_number = 0xABCD
|
|
# readout and error counters
|
|
readout_total = 0
|
|
readout_error_invalid_response = 0 # checksum error: bus transmission corrupted?
|
|
readout_error_no_response = 0 # no response - sensor device was busy
|
|
|
|
def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'):
|
|
self.dev_file = dev_file
|
|
self.address = address
|
|
self.baudrate = baudrate
|
|
self.open()
|
|
|
|
def open(self):
|
|
self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True)
|
|
self.serial.serial.baudrate = self.baudrate
|
|
self.serial.serial.bytesize = 8
|
|
self.serial.serial.parity = serial.PARITY_EVEN
|
|
self.serial.serial.stopbits = 1
|
|
self.serial.serial.timeout = 0.05 # seconds
|
|
self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
|
|
self.serial.clear_buffers_before_each_transaction = True
|
|
|
|
def close(self):
|
|
self.serial.serial.close()
|
|
self.serial = None
|
|
|
|
def reset(self):
|
|
try:
|
|
self.write_register(self.reset_register, self.reset_magic_number)
|
|
return False # got answer => failed to reset
|
|
except minimalmodbus.NoResponseError:
|
|
return True # no answer => reset successful
|
|
|
|
|
|
@property
|
|
def CO2(self):
|
|
return int(self.read_register(self.input_registers['CO2']))
|
|
|
|
@property
|
|
def T(self):
|
|
# TODO maybe use rather signed version?
|
|
return self.T_SHT4x
|
|
|
|
@property
|
|
def RH(self):
|
|
return self.RH_SHT4x
|
|
|
|
@property
|
|
def T_SHT4x(self):
|
|
return self.read_register(self.input_registers['T']) / 10
|
|
|
|
@property
|
|
def T_SHT4x_signed(self):
|
|
return self.read_register(self.input_registers['T'], signed=True) / 10
|
|
|
|
@property
|
|
def RH_SHT4x(self):
|
|
return self.read_register(self.input_registers['RH'])
|
|
|
|
@property
|
|
def T_SCD4x(self):
|
|
return self.read_register(self.input_registers['T_SCD4x']) / 10
|
|
|
|
@property
|
|
def RH_SCD4x(self):
|
|
return self.read_register(self.input_registers['RH_SCD4x'])
|
|
|
|
def read_register(self, register_number, signed=False, retries=10):
|
|
if self.input_register_start <= register_number <= self.input_register_end:
|
|
function_code = 4
|
|
register_offset = register_number - self.input_register_start
|
|
elif self.holding_register_start <= register_number <= self.holding_register_end:
|
|
function_code = 3
|
|
register_offset = register_number - self.holding_register_start
|
|
else:
|
|
# wrong register number
|
|
raise ValueError
|
|
while retries:
|
|
retries -= 1
|
|
try:
|
|
self.readout_total += 1
|
|
# minimalmodbus divides received register value by 10
|
|
return self.serial.read_register(register_offset, 1, functioncode=function_code, signed=signed) * 10
|
|
except minimalmodbus.NoResponseError as e:
|
|
last_exception = e
|
|
self.readout_error_no_response += 1
|
|
continue
|
|
except minimalmodbus.InvalidResponseError as e:
|
|
last_exception = e
|
|
self.readout_error_invalid_response += 1
|
|
continue
|
|
# retries failed, raise last exception to inform user
|
|
raise last_exception
|
|
|
|
def write_register(self, register_number, register_value, retries=10):
|
|
if not self.holding_register_start <= register_number <= self.holding_register_end:
|
|
raise ValueError
|
|
register_offset = register_number - self.holding_register_start
|
|
while retries:
|
|
retries -= 1
|
|
try:
|
|
return self.serial.write_register(register_offset, register_value, functioncode=6)
|
|
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
|
|
last_exception = e
|
|
continue
|
|
raise last_exception
|