import minimalmodbus import serial import time class Sensor: # MODBUS constants holding_register_start = 40001 holding_register_end = 49999 input_register_start = 30001 input_register_end = 39999 # Sensor specific config baudrates = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] input_registers = { 'SERIAL_NUMBER_1': 30001, 'SERIAL_NUMBER_2': 30002, 'T': 30003, # deg C 'T_F': 30004, # deg F 'RH': 30005, # %, from SHT4x 'CO2': 30006, # ppm 'VOC_INDEX': 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100) 'VOC_TICKS': 30008, # raw VOC ticks 'PMC_MASS_1_0': 30009, # ug / m^3 'PMC_MASS_2_5': 30010, # ug / m^3 'PMC_MASS_4_0': 30011, # ug / m^3 'PMC_MASS_10_0': 30012, # ug / m^3 'PMC_NUMBER_0_5': 30013, # 1 / m^3 'PMC_NUMBER_1_0': 30014, # 1 / m^3 'PMC_NUMBER_2_5': 30015, # 1 / m^3 'PMC_NUMBER_4_0': 30016, # 1 / m^3 'PMC_NUMBER_10_0': 30017, # 1 / m^3 'TYPICAL_PARTICLE_SIZE': 30018, # nm 'T_SCD4x': 30019, # deg C 'T_SCD4x_F': 30020, # deg F 'RH_SCD4x': 30021} # % holding_registers = { 'MODBUS_ADDR': 40001, 'BAUDRATE': 40002, 'LED_ON': 40003, 'LED_BRIGHTNESS': 40004, 'LED_SMOOTH': 40005, 'CO2_ALERT_LIMIT1': 40006, 'CO2_ALERT_LIMIT2': 40007, 'SCD4x_T_OFFSET': 40008, 'RESET_DEVICE': 49999} reset_register = 40100 reset_magic_number = 0xABCD # readout and error counters readout_total = 0 readout_error_invalid_response = 0 # checksum error: bus transmission corrupted? readout_error_no_response = 0 # no response - sensor device was busy def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'): self.dev_file = dev_file self.address = address self.baudrate = baudrate self.open() def open(self): self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True) self.serial.serial.baudrate = self.baudrate self.serial.serial.bytesize = 8 self.serial.serial.parity = serial.PARITY_EVEN self.serial.serial.stopbits = 1 self.serial.serial.timeout = 0.05 # seconds self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode self.serial.clear_buffers_before_each_transaction = True def close(self): self.serial.serial.close() self.serial = None def reset(self): try: self.write_register(self.reset_register, self.reset_magic_number) return False # got answer => failed to reset except minimalmodbus.NoResponseError: return True # no answer => reset successful @property def CO2(self): return int(self.read_register(self.input_registers['CO2'])) @property def T(self): # TODO maybe use rather signed version? return self.T_SHT4x @property def RH(self): return self.RH_SHT4x @property def T_SHT4x(self): return self.read_register(self.input_registers['T']) / 10 @property def T_SHT4x_signed(self): return self.read_register(self.input_registers['T'], signed=True) / 10 @property def RH_SHT4x(self): return self.read_register(self.input_registers['RH']) @property def T_SCD4x(self): return self.read_register(self.input_registers['T_SCD4x']) / 10 @property def RH_SCD4x(self): return self.read_register(self.input_registers['RH_SCD4x']) def read_register(self, register_number, signed=False, retries=10): if self.input_register_start <= register_number <= self.input_register_end: function_code = 4 register_offset = register_number - self.input_register_start elif self.holding_register_start <= register_number <= self.holding_register_end: function_code = 3 register_offset = register_number - self.holding_register_start else: # wrong register number raise ValueError while retries: retries -= 1 try: self.readout_total += 1 # minimalmodbus divides received register value by 10 return self.serial.read_register(register_offset, 1, functioncode=function_code, signed=signed) * 10 except minimalmodbus.NoResponseError as e: last_exception = e self.readout_error_no_response += 1 continue except minimalmodbus.InvalidResponseError as e: last_exception = e self.readout_error_invalid_response += 1 continue # retries failed, raise last exception to inform user raise last_exception def write_register(self, register_number, register_value, retries=10): if not self.holding_register_start <= register_number <= self.holding_register_end: raise ValueError register_offset = register_number - self.holding_register_start while retries: retries -= 1 try: return self.serial.write_register(register_offset, register_value, functioncode=6) except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e: last_exception = e continue raise last_exception