import minimalmodbus import serial import time class Sensor(): holding_register_start = 40001 holding_register_end = 49999 input_register_start = 30001 input_register_end = 39999 baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] input_register_offset = { \ 'CO2': 9, \ 'T_SHT4x': 10, \ 'RH_SHT4x': 11, \ 'T_SCD4x': 12, \ 'RH_SCD4x': 13, \ 'T_SHT4x_signed': 14, \ 'T_SCD4x_signed': 15 } holding_register_offset = { \ 'LED_on_register': 0, \ 'LED_brightness_register': 1, \ 'LED_smooth_register': 2, \ 'CO2_alert_limit_1_register': 3, \ 'CO2_alert_limit_2_register': 4, \ 'SCD4x_temperature_offset_register': 5, \ 'MODBUS_address_register': 6, \ 'baudrate_register': 7 } def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200): self.dev_file = dev_file self.address = address self.baudrate = baudrate self.open() def open(self): self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True) self.serial.serial.baudrate = self.baudrate self.serial.serial.bytesize = 8 self.serial.serial.parity = serial.PARITY_EVEN self.serial.serial.stopbits = 1 self.serial.serial.timeout = 0.05 # seconds self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode self.serial.clear_buffers_before_each_transaction = True def close(self): self.serial.serial.close() self.serial = None # High level read functions @property def CO2(self): return self.read_input_register(self.input_register_offset['CO2'])*10 @property def T(self): # TODO maybe use rather signed version? return self.T_SHT4x @property def RH(self): return self.RH_SHT4x @property def T_SHT4x(self): return self.read_input_register(self.input_register_offset['T_SHT4x']) @property def T_SHT4x_signed(self): return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True) @property def RH_SHT4x(self): return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10 @property def T_SCD4x(self): return self.read_input_register(self.input_register_offset['T_SCD4x']) @property def T_SCD4x_signed(self): return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True) @property def RH_SCD4x(self): return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10 def read_input_register(self, register_offset): retries = 10 while retries: try: return self.serial.read_register(register_offset, 1, functioncode=4) except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e: retries -= 1 continue raise e # generic read register function def read_register(self, register_number): if self.input_register_start <= register_number <= self.input_register_end: function_code = 4 register_offset = register_number - self.input_register_start elif self.holding_register_start <= register_number <= self.holding_register_end: function_code = 3 register_offset = register_number - self.holding_register_start else: # wrong register number raise ValueError return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10 def write_register(self, register_number, register_value): if not self.holding_register_start <= register_number <= self.holding_register_end: raise ValueError register_offset = register_number - self.holding_register_start return self.serial.write_register(register_offset, register_value, functioncode=6)