Adapted to new registers, added LED getter etc.

This commit is contained in:
Jan Mrna 2022-06-18 18:24:17 +02:00
parent d2c6185c47
commit 79317fed95
2 changed files with 111 additions and 96 deletions

View File

@ -1,82 +1,80 @@
'''Module containing classes for generic wired/wireless devices''' """Module containing classes for generic wired/wireless devices"""
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Final from typing import Dict, Final
import minimalmodbus import minimalmodbus
import serial import serial
class Modbus(): class Modbus:
'''Class holding Modbus related constants''' """Class holding Modbus related constants"""
HOLDING_REGISTER_START = 40001 HOLDING_REGISTER_START = 40001
HOLDING_REGISTER_END = 49999 HOLDING_REGISTER_END = 49999
INPUT_REGISTER_START = 30001 INPUT_REGISTER_START = 30001
INPUT_REGISTER_END = 39999 INPUT_REGISTER_END = 39999
# ranges for testing if address is in address range # ranges for testing if address is in address range
input_register_range = range(INPUT_REGISTER_START, input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END)
INPUT_REGISTER_END) holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END)
holding_register_range = range(HOLDING_REGISTER_START,
HOLDING_REGISTER_END)
@dataclass @dataclass
class ReadoutErrorCounter(): class ReadoutErrorCounter:
'''Class used to track readout errors''' """Class used to track readout errors"""
total_attempts: int = 0 total_attempts: int = 0
invalid_response: int = 0 invalid_response: int = 0
no_response: int = 0 no_response: int = 0
class Device(): class Device:
'''Base class for all devices''' """Base class for all devices"""
class ModbusRTUDevice(Device): class ModbusRTUDevice(Device):
''' """
Base class for wired device controlled over MODBUS RTU (via RS-485) Base class for wired device controlled over MODBUS RTU (via RS-485)
RS-485 to USB converter is needed for devices based off this class RS-485 to USB converter is needed for devices based off this class
''' """
# Reflects array fw/Core/Src/config.c:config_baudrates[] # Reflects array fw/Core/Src/config.c:config_baudrates[]
BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200] BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
# registers common to all Modbus RTU devices # registers common to all Modbus RTU devices
input_registers: Dict[str, int] = { input_registers: Dict[str, int] = {
'IDENTIFIER': 30100 "SERIAL_NUMBER_1": 30001,
} "SERIAL_NUMBER_2": 30002,
holding_registers: Dict[str, int] = {
'RESET': 40100
} }
holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999}
# magic constant for resetting: common to all Modbus RTU devices # magic constant for resetting: common to all Modbus RTU devices
MAGIC_RESET_CONSTANT: Final[int] = 0xABCD MAGIC_RESET_CONSTANT: Final[int] = 0xABCD
def __comm_device_init(self) -> minimalmodbus.Instrument: def __comm_device_init(self) -> minimalmodbus.Instrument:
comm_device = minimalmodbus.Instrument(self.dev, comm_device = minimalmodbus.Instrument(
self.modbus_address, self.dev, self.modbus_address, close_port_after_each_call=True
close_port_after_each_call=True) )
# RS-485 serial paramater init # RS-485 serial paramater init
comm_device.serial.baudrate = self.baudrate comm_device.serial.baudrate = self.baudrate
comm_device.serial.bytesize = 8 comm_device.serial.bytesize = 8
comm_device.serial.parity = serial.PARITY_EVEN comm_device.serial.parity = serial.PARITY_EVEN
comm_device.serial.stopbits = 1 comm_device.serial.stopbits = 1
comm_device.serial.timeout = 0.05 # seconds comm_device.serial.timeout = 0.05 # seconds
comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
comm_device.clear_buffers_before_each_transaction = True comm_device.clear_buffers_before_each_transaction = True
return comm_device return comm_device
def __init__(self, modbus_address, baudrate=19200, dev='/dev/rs485'): def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"):
self.modbus_address: int = modbus_address self.modbus_address: int = modbus_address
self.baudrate: int = baudrate self.baudrate: int = baudrate
self.dev: str = dev self.dev: str = dev
self.comm_device: minimalmodbus.Instrument = self.__comm_device_init() self.comm_device: minimalmodbus.Instrument = self.__comm_device_init()
self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter() self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter()
def read_register(self, def read_register(
register_number: int, self, register_number: int, signed: bool = False, retries: int = 10
signed: bool = False, ) -> int:
retries: int = 10) -> int: """Read Modbus input/holding register via serial device"""
'''Read Modbus input/holding register via serial device'''
if register_number in Modbus.input_register_range: if register_number in Modbus.input_register_range:
function_code = 4 function_code = 4
register_offset = register_number - Modbus.INPUT_REGISTER_START register_offset = register_number - Modbus.INPUT_REGISTER_START
@ -86,43 +84,47 @@ class ModbusRTUDevice(Device):
else: else:
# wrong register number # wrong register number
raise ValueError raise ValueError
for i in range(retries): for _ in range(retries):
try: try:
self.readout_errors.total_attempts += 1 self.readout_errors.total_attempts += 1
# minimalmodbus divides received register value by 10 # minimalmodbus divides received register value by 10
return self.comm_device.read_register( return (
register_offset, self.comm_device.read_register(
1, register_offset, 1, functioncode=function_code, signed=signed
functioncode=function_code, )
signed=signed * 10
) * 10 )
except minimalmodbus.NoResponseError as e: except minimalmodbus.NoResponseError as exception:
last_exception = e last_exception = exception
self.readout_errors.no_response += 1 self.readout_errors.no_response += 1
continue continue
except minimalmodbus.InvalidResponseError as e: except minimalmodbus.InvalidResponseError as exception:
last_exception = e last_exception = exception
self.readout_errors.invalid_response += 1 self.readout_errors.invalid_response += 1
continue continue
# retries failed, raise last exception to inform user # retries failed, raise last exception to inform user
raise last_exception raise last_exception
def write_register(self, def write_register(
register_number: int, self, register_number: int, register_value: int, retries: int = 10
register_value: int, ) -> None:
retries: int = 10) -> None: """
Write to slave holding register
"""
# only holding registers can be written # only holding registers can be written
if register_number not in Modbus.holding_register_range: if register_number not in Modbus.holding_register_range:
raise ValueError raise ValueError
register_offset = register_number - Modbus.HOLDING_REGISTER_START register_offset = register_number - Modbus.HOLDING_REGISTER_START
for i in range(retries): for _ in range(retries):
try: try:
return self.comm_device.write_register(register_offset, return self.comm_device.write_register(
register_value, register_offset, register_value, functioncode=6
functioncode=6) )
except (minimalmodbus.NoResponseError, except (
minimalmodbus.InvalidResponseError) as e: minimalmodbus.NoResponseError,
last_exception = e minimalmodbus.InvalidResponseError,
) as exception:
last_exception = exception
continue continue
raise last_exception raise last_exception
@ -133,12 +135,14 @@ class ModbusRTUDevice(Device):
return self.write_register(key, value) return self.write_register(key, value)
def reset(self) -> bool: def reset(self) -> bool:
''' """
Soft-reset the device Soft-reset the device
''' """
try: try:
self.write_register(ModbusRTUDevice.holding_registers['RESET'], self.write_register(
ModbusRTUDevice.MAGIC_RESET_CONSTANT) ModbusRTUDevice.holding_registers["RESET"],
ModbusRTUDevice.MAGIC_RESET_CONSTANT,
)
return False # got answer => failed to reset return False # got answer => failed to reset
except minimalmodbus.NoResponseError: except minimalmodbus.NoResponseError:
return True # no answer => reset successful return True # no answer => reset successful

View File

@ -1,61 +1,72 @@
from time import sleep
from typing import Dict, Final from typing import Dict, Final
from minimalmodbus import NoResponseError from minimalmodbus import NoResponseError
from .generic import ModbusRTUDevice from .generic import ModbusRTUDevice
class SensorWiredIAQ(ModbusRTUDevice): class SensorWiredIAQ(ModbusRTUDevice):
''' """
Wired sensor measuring temperature, relative humidity, Wired sensor measuring temperature, relative humidity,
carbon dioxide and VOC, optionally particulate matter carbon dioxide and VOC, optionally particulate matter
''' """
IDENTIFIER: int = 0xbeef
IDENTIFIER: int = 0xBEEF
input_registers: Dict[str, int] = { input_registers: Dict[str, int] = {
'T': 30010, # from SHT4x 'T': 30003, # deg C
'T_F': 30011, 'T_F': 30004, # deg F
'RH': 30012, # from SHT4x 'RH': 30005, # %, from SHT4x
'CO2': 30013, # from SCD4x 'CO2': 30006, # ppm
'VOC_index': 30014, 'VOC_INDEX': 30007, # VOC index as calculated by Sensirion library (1 to 500, average 100)
'VOC_ticks': 30015, 'VOC_TICKS': 30008, # raw VOC ticks
'NOx_index': 30016, 'PMC_MASS_1_0': 30009, # ug / m^3
'NOx_ticks': 30017, 'PMC_MASS_2_5': 30010, # ug / m^3
'PM_mass_concentration_1.0': 30018, 'PMC_MASS_4_0': 30011, # ug / m^3
'PM_mass_concentration_2.5': 30019, 'PMC_MASS_10_0': 30012, # ug / m^3
'PM_mass_concentration_4.0': 30020, 'PMC_NUMBER_0_5': 30013, # 1 / m^3
'PM_mass_concentration_10.0': 30021, 'PMC_NUMBER_1_0': 30014, # 1 / m^3
'PM_number_concentration_0.5': 30022, 'PMC_NUMBER_2_5': 30015, # 1 / m^3
'PM_number_concentration_1.0': 30023, 'PMC_NUMBER_4_0': 30016, # 1 / m^3
'PM_number_concentration_2.5': 30024, 'PMC_NUMBER_10_0': 30017, # 1 / m^3
'PM_number_concentration_4.0': 30025, 'TYPICAL_PARTICLE_SIZE': 30018, # nm
'PM_number_concentration_10.0': 30026, 'T_SCD4x': 30019, # deg C
'PM_typical_particle_size': 30027, 'T_SCD4x_F': 30020, # deg F
'T_SCD4x': 30028, 'RH_SCD4x': 30021 # %
'T_SCF4x_F': 30029, } | ModbusRTUDevice.input_registers
'RH_SCD4x': 30030
} | ModbusRTUDevice.input_registers
# TODO use super, but __class__ not defined # TODO use super, but __class__ not defined
holding_registers: Dict[str, int] = { holding_registers: Dict[str, int] = {
'LED_on': 40001, 'MODBUS_ADDR': 40001,
'LED_brightness': 40002, 'BAUDRATE': 40002,
'LED_smooth': 40003, 'LED_ON': 40003,
'CO2_alert_limit_1': 40004, 'LED_BRIGHTNESS': 40004,
'CO2_alert_limit_2': 40005, 'LED_SMOOTH': 40005,
'SCD4x_temperature_offset': 40006, 'CO2_ALERT_LIMIT1': 40006,
'MODBUS_address': 40007, 'CO2_ALERT_LIMIT2': 40007,
'baudrate': 40008 'SCD4x_T_OFFSET': 40008
} | ModbusRTUDevice.holding_registers } | ModbusRTUDevice.holding_registers
RESET_MAGIC_NUMBER: Final[int] = 0xABCD RESET_MAGIC_NUMBER: Final[int] = 0xABCD
@property @property
def CO2(self): def CO2(self) -> int:
return int(self.read_register(self.input_registers['CO2'])) return int(self.read_register(self.input_registers["CO2"]))
@property @property
def T(self): def T(self) -> float:
# TODO maybe use signed version? # TODO maybe use signed version?
return self.read_register(self.input_registers['T'], return self.read_register(self.input_registers["T"], signed=True) / 10
signed=True
) / 10
@property @property
def RH(self): def RH(self) -> float:
return self.read_register(self.input_registers['RH']) return self.read_register(self.input_registers["RH"])
@property
def LED(self) -> int:
return int(self.read_register(self.holding_registers["LED_brightness"]))
@LED.setter
def LED(self, value: int):
if value == 0:
self.write_register(self.holding_registers["LED_on"], 0)
else:
self.write_register(self.holding_registers["LED_brightness"], value)
sleep(0.1)
self.write_register(self.holding_registers["LED_on"], 1)