101 lines
4.0 KiB
Python
101 lines
4.0 KiB
Python
import minimalmodbus
|
|
import serial
|
|
import time
|
|
|
|
class Sensor():
|
|
holding_register_start = 40001
|
|
holding_register_end = 49999
|
|
input_register_start = 30001
|
|
input_register_end = 39999
|
|
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ]
|
|
input_register_offset = { \
|
|
'CO2': 9, \
|
|
'T_SHT4x': 10, \
|
|
'RH_SHT4x': 11, \
|
|
'T_SCD4x': 12, \
|
|
'RH_SCD4x': 13, \
|
|
'T_SHT4x_signed': 14, \
|
|
'T_SCD4x_signed': 15 }
|
|
holding_register_offset = { \
|
|
'LED_on_register': 0, \
|
|
'LED_brightness_register': 1, \
|
|
'LED_smooth_register': 2, \
|
|
'CO2_alert_limit_1_register': 3, \
|
|
'CO2_alert_limit_2_register': 4, \
|
|
'SCD4x_temperature_offset_register': 5, \
|
|
'MODBUS_address_register': 6, \
|
|
'baudrate_register': 7 }
|
|
def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200):
|
|
self.dev_file = dev_file
|
|
self.address = address
|
|
self.baudrate = baudrate
|
|
self.open()
|
|
def open(self):
|
|
self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True)
|
|
self.serial.serial.baudrate = self.baudrate
|
|
self.serial.serial.bytesize = 8
|
|
self.serial.serial.parity = serial.PARITY_EVEN
|
|
self.serial.serial.stopbits = 1
|
|
self.serial.serial.timeout = 0.05 # seconds
|
|
self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
|
|
self.serial.clear_buffers_before_each_transaction = True
|
|
def close(self):
|
|
self.serial.serial.close()
|
|
self.serial = None
|
|
# High level read functions
|
|
@property
|
|
def CO2(self):
|
|
return self.read_input_register(self.input_register_offset['CO2'])*10
|
|
@property
|
|
def T(self):
|
|
# TODO maybe use rather signed version?
|
|
return self.T_SHT4x
|
|
@property
|
|
def RH(self):
|
|
return self.RH_SHT4x
|
|
@property
|
|
def T_SHT4x(self):
|
|
return self.read_input_register(self.input_register_offset['T_SHT4x'])
|
|
@property
|
|
def T_SHT4x_signed(self):
|
|
return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True)
|
|
@property
|
|
def RH_SHT4x(self):
|
|
return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10
|
|
@property
|
|
def T_SCD4x(self):
|
|
return self.read_input_register(self.input_register_offset['T_SCD4x'])
|
|
@property
|
|
def T_SCD4x_signed(self):
|
|
return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True)
|
|
@property
|
|
def RH_SCD4x(self):
|
|
return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10
|
|
def read_input_register(self, register_offset):
|
|
retries = 10
|
|
while retries:
|
|
try:
|
|
return self.serial.read_register(register_offset, 1, functioncode=4)
|
|
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
|
|
retries -= 1
|
|
continue
|
|
raise e
|
|
# generic read register function
|
|
def read_register(self, register_number):
|
|
if self.input_register_start <= register_number <= self.input_register_end:
|
|
function_code = 4
|
|
register_offset = register_number - self.input_register_start
|
|
elif self.holding_register_start <= register_number <= self.holding_register_end:
|
|
function_code = 3
|
|
register_offset = register_number - self.holding_register_start
|
|
else:
|
|
# wrong register number
|
|
raise ValueError
|
|
return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10
|
|
def write_register(self, register_number, register_value):
|
|
if not self.holding_register_start <= register_number <= self.holding_register_end:
|
|
raise ValueError
|
|
register_offset = register_number - self.holding_register_start
|
|
return self.serial.write_register(register_offset, register_value, functioncode=6)
|
|
|