This commit is contained in:
Jan Mrna 2023-02-19 09:49:26 +01:00
parent d4052e819a
commit 8960349d97
6 changed files with 223 additions and 225 deletions

View File

@ -1,2 +1,2 @@
from .sensor_wired_IAQ import SensorWiredIAQ from .sensor_wired import SensorWiredIAQ, SensorWiredRHT
from .sensor_wired_RHT import SensorWiredRHT from .find import find_devices

View File

@ -1,7 +1,6 @@
from typing import Final, Dict, Any, TypeVar, Type from typing import Final, Dict, Any, TypeVar, Type, Iterable
from .generic import Device, NoResponseError from .generic import Device, NoResponseError
from .sensor_wired_IAQ import SensorWiredIAQ from .sensor_wired import SensorWiredIAQ, SensorWiredRHT
from .sensor_wired_RHT import SensorWiredRHT
# links device identifiers to its class # links device identifiers to its class
DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = { DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = {
@ -11,20 +10,11 @@ DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = {
T = TypeVar("T", bound=Device) T = TypeVar("T", bound=Device)
def find_devices(device_cls: Type[T], address_space: list[Any]) -> list[T]:
def find_devices(device_cls: Type[T], address_space: Iterable[Any]) -> list[T]:
""" """
Look for devices in given address space Look for devices in given address space
""" """
# found_devices = []
# for address in address_space:
# try:
# found_devices.append(device_cls(address))
# except NoResponseError:
# pass
# return found_devices
return list(filter(device_cls.probe, address_space)) return list(filter(device_cls.probe, address_space))
# TODO add device args # TODO add device args
# TODO return devices themselves, not addresses
# TODO add .address to Device

View File

@ -1,21 +1,7 @@
"""Module containing classes for generic wired/wireless devices""" """Module containing classes for generic wired/wireless devices"""
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Final, Any from typing import Dict, Any
from abc import ABC, abstractclassmethod, abstractmethod, abstractstaticmethod from abc import ABC, abstractmethod
import minimalmodbus
import serial
class Modbus:
"""Class holding Modbus related constants"""
HOLDING_REGISTER_START = 40001
HOLDING_REGISTER_END = 49999
INPUT_REGISTER_START = 30001
INPUT_REGISTER_END = 39999
# ranges for testing if address is in address range
input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END + 1)
holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END + 1)
@dataclass(slots=True) @dataclass(slots=True)
@ -38,6 +24,11 @@ class Device(ABC):
Base class for all devices Base class for all devices
""" """
address: Any
"""
Address space is device-specific (e.g. int for modbus)
"""
@abstractmethod @abstractmethod
def get_data(self) -> Dict[str, Any]: def get_data(self) -> Dict[str, Any]:
""" """
@ -72,150 +63,3 @@ class Device(ABC):
except NoResponseError: except NoResponseError:
return False return False
return True return True
class ModbusRTUDevice(Device):
"""
Base class for wired device controlled over MODBUS RTU (via RS-485)
RS-485 to USB converter is needed for devices based off this class
"""
# Reflects array fw/Core/Src/config.c:config_baudrates[]
BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
# magic constant for resetting: common to all Modbus RTU devices
MAGIC_RESET_CONSTANT: Final[int] = 0xABCD
# registers common to all Modbus RTU devices
input_registers: Dict[str, int] = {
"SERIAL_NUMBER_1": 30001,
"SERIAL_NUMBER_2": 30002,
}
holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999}
def __comm_device_init(self) -> minimalmodbus.Instrument:
comm_device = minimalmodbus.Instrument(
self.dev, self.modbus_address, close_port_after_each_call=True
)
# RS-485 serial paramater init
comm_device.serial.baudrate = self.baudrate
comm_device.serial.bytesize = 8
comm_device.serial.parity = serial.PARITY_EVEN
comm_device.serial.stopbits = 1
comm_device.serial.timeout = 0.05 # seconds
comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
comm_device.clear_buffers_before_each_transaction = True
return comm_device
def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"):
self.modbus_address: int = modbus_address
self.baudrate: int = baudrate
self.dev: str = dev
self.comm_device: minimalmodbus.Instrument = self.__comm_device_init()
self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter()
# check if device actually exists on the bus (by reading serial number);
# if not, raise NoResponseException
try:
self.read_register(self.input_registers["SERIAL_NUMBER_1"])
except minimalmodbus.NoResponseError as exc:
raise NoResponseError from exc
def read_register(
self, register_number: int, signed: bool = False, retries: int = 10
) -> int:
"""Read Modbus input/holding register via serial device"""
if register_number in Modbus.input_register_range:
function_code = 4
register_offset = register_number - Modbus.INPUT_REGISTER_START
elif register_number in Modbus.holding_register_range:
function_code = 3
register_offset = register_number - Modbus.HOLDING_REGISTER_START
else:
# wrong register number
raise ValueError
for _ in range(retries):
try:
self.readout_errors.total_attempts += 1
# minimalmodbus divides received register value by 10
return (
self.comm_device.read_register(
register_offset, 1, functioncode=function_code, signed=signed
)
* 10
)
except minimalmodbus.NoResponseError as exception:
last_exception = exception
self.readout_errors.no_response += 1
continue
except minimalmodbus.InvalidResponseError as exception:
last_exception = exception
self.readout_errors.invalid_response += 1
continue
# retries failed, raise last exception to inform user
raise last_exception
def write_register(
self, register_number: int, register_value: int, retries: int = 10
) -> None:
"""
Write to slave holding register
"""
# only holding registers can be written
if register_number not in Modbus.holding_register_range:
raise ValueError
register_offset = register_number - Modbus.HOLDING_REGISTER_START
for _ in range(retries):
try:
return self.comm_device.write_register(
register_offset, register_value, functioncode=6
)
except (
minimalmodbus.NoResponseError,
minimalmodbus.InvalidResponseError,
) as exception:
last_exception = exception
continue
raise last_exception
def __getitem__(self, key: int) -> int:
return self.read_register(key)
def __setitem__(self, key: int, value: int) -> None:
return self.write_register(key, value)
def reset(self) -> bool:
"""
Soft-reset the device
"""
try:
self.write_register(
ModbusRTUDevice.holding_registers["RESET_DEVICE"],
ModbusRTUDevice.MAGIC_RESET_CONSTANT,
)
return False # got answer => failed to reset
except minimalmodbus.NoResponseError:
return True # no answer => reset successful
@property
def device_code(self) -> int:
"""
Return device code. This can be matched to DEVICE_CODE
in child classes.
"""
return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"]))
@property
def serial_number(self) -> int:
"""
Return serial number
"""
serial_number_1 = self.device_code
serial_number_2 = int(
self.read_register(self.input_registers["SERIAL_NUMBER_2"])
)
return (serial_number_1 << 16) + serial_number_2
def get_data(self) -> Dict[str, int]:
return {
name: self.read_register(number)
for name, number in self.input_registers.items()
}

167
src/veles/device/modbus.py Normal file
View File

@ -0,0 +1,167 @@
"""Module containing generic Modbus classes/devices"""
from typing import Dict, Final
import minimalmodbus
import serial
from .generic import Device, ReadoutErrorCounter, NoResponseError
class Modbus:
"""Class holding Modbus related constants"""
HOLDING_REGISTER_START = 40001
HOLDING_REGISTER_END = 49999
INPUT_REGISTER_START = 30001
INPUT_REGISTER_END = 39999
# ranges for testing if address is in address range
input_register_range = range(INPUT_REGISTER_START, INPUT_REGISTER_END + 1)
holding_register_range = range(HOLDING_REGISTER_START, HOLDING_REGISTER_END + 1)
class ModbusRTUDevice(Device):
"""
Base class for wired device controlled over MODBUS RTU (via RS-485)
RS-485 to USB converter is needed for devices based off this class
"""
# Reflects array fw/Core/Src/config.c:config_baudrates[]
BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
# magic constant for resetting: common to all Modbus RTU devices
MAGIC_RESET_CONSTANT: Final[int] = 0xABCD
# registers common to all Modbus RTU devices
input_registers: Dict[str, int] = {
"SERIAL_NUMBER_1": 30001,
"SERIAL_NUMBER_2": 30002,
}
holding_registers: Dict[str, int] = {"RESET_DEVICE": 49999}
def __comm_device_init(self) -> minimalmodbus.Instrument:
comm_device = minimalmodbus.Instrument(
self.dev, self.address, close_port_after_each_call=True
)
# RS-485 serial paramater init
comm_device.serial.baudrate = self.baudrate
comm_device.serial.bytesize = 8
comm_device.serial.parity = serial.PARITY_EVEN
comm_device.serial.stopbits = 1
comm_device.serial.timeout = 0.05 # seconds
comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
comm_device.clear_buffers_before_each_transaction = True
return comm_device
def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"):
self.address: int = modbus_address
self.baudrate: int = baudrate
self.dev: str = dev
self.comm_device: minimalmodbus.Instrument = self.__comm_device_init()
self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter()
# check if device actually exists on the bus (by reading serial number);
# if not, raise NoResponseException
try:
self.read_register(self.input_registers["SERIAL_NUMBER_1"])
except minimalmodbus.NoResponseError as exc:
raise NoResponseError from exc
def read_register(
self, register_number: int, signed: bool = False, retries: int = 10
) -> int:
"""Read Modbus input/holding register via serial device"""
if register_number in Modbus.input_register_range:
function_code = 4
register_offset = register_number - Modbus.INPUT_REGISTER_START
elif register_number in Modbus.holding_register_range:
function_code = 3
register_offset = register_number - Modbus.HOLDING_REGISTER_START
else:
# wrong register number
raise ValueError
for _ in range(retries):
try:
self.readout_errors.total_attempts += 1
# minimalmodbus divides received register value by 10
return (
self.comm_device.read_register(
register_offset, 1, functioncode=function_code, signed=signed
)
* 10
)
except minimalmodbus.NoResponseError as exception:
last_exception = exception
self.readout_errors.no_response += 1
continue
except minimalmodbus.InvalidResponseError as exception:
last_exception = exception
self.readout_errors.invalid_response += 1
continue
# retries failed, raise last exception to inform user
raise last_exception
def write_register(
self, register_number: int, register_value: int, retries: int = 10
) -> None:
"""
Write to slave holding register
"""
# only holding registers can be written
if register_number not in Modbus.holding_register_range:
raise ValueError
register_offset = register_number - Modbus.HOLDING_REGISTER_START
for _ in range(retries):
try:
return self.comm_device.write_register(
register_offset, register_value, functioncode=6
)
except (
minimalmodbus.NoResponseError,
minimalmodbus.InvalidResponseError,
) as exception:
last_exception = exception
continue
raise last_exception
def __getitem__(self, key: int) -> int:
return self.read_register(key)
def __setitem__(self, key: int, value: int) -> None:
return self.write_register(key, value)
def reset(self) -> bool:
"""
Soft-reset the device
"""
try:
self.write_register(
ModbusRTUDevice.holding_registers["RESET_DEVICE"],
ModbusRTUDevice.MAGIC_RESET_CONSTANT,
)
return False # got answer => failed to reset
except minimalmodbus.NoResponseError:
return True # no answer => reset successful
@property
def device_code(self) -> int:
"""
Return device code. This can be matched to DEVICE_CODE
in child classes.
"""
return int(self.read_register(self.input_registers["SERIAL_NUMBER_1"]))
@property
def serial_number(self) -> int:
"""
Return serial number
"""
serial_number_1 = self.device_code
serial_number_2 = int(
self.read_register(self.input_registers["SERIAL_NUMBER_2"])
)
return (serial_number_1 << 16) + serial_number_2
def get_data(self) -> Dict[str, int]:
"""
Get all data from sensor
"""
return {
name: self.read_register(number)
for name, number in self.input_registers.items()
}

View File

@ -1,7 +1,7 @@
from time import sleep from time import sleep
from typing import Dict, Final from typing import Dict, Final
from minimalmodbus import IllegalRequestError from minimalmodbus import IllegalRequestError
from .generic import ModbusRTUDevice from .modbus import ModbusRTUDevice
class SensorWiredIAQ(ModbusRTUDevice): class SensorWiredIAQ(ModbusRTUDevice):
@ -96,3 +96,45 @@ class SensorWiredIAQ(ModbusRTUDevice):
self.read_register(self.input_registers["PMC_MASS_1_0"]) self.read_register(self.input_registers["PMC_MASS_1_0"])
except IllegalRequestError: except IllegalRequestError:
self.__remove_sensor_from_input_registers("PMC") self.__remove_sensor_from_input_registers("PMC")
class SensorWiredRHT(ModbusRTUDevice):
"""
Wired sensor measuring temperature, relative humidity
and light intensity.
"""
DEVICE_CLASS: Final[str] = "RHT_Wired"
DEVICE_CODE: Final[int] = 0x0020
input_registers: Dict[str, int] = {
"SER_NUM_1": 30001,
"SER_NUM_2": 30002,
"T": 30003, # from SHT4x
"T_F": 30004,
"RH": 30005, # from SHT4x
"LIGHT_INTENSITY_1": 30006,
"LIGHT_INTENSITY_2": 30007,
"ERROR_T_RH": 30008,
"ERROR_LIGHT": 30009,
} | ModbusRTUDevice.input_registers
holding_registers: Dict[str, int] = {
"MODBUS_ADDR": 40001,
"BAUDRATE": 40002,
"LTR329_GAIN": 40003,
"LTR329_MEAS_RATE": 40004,
"LTR329_INTEGRATION_TIME": 40005,
"LTR329_MODE": 40006,
} | ModbusRTUDevice.holding_registers
@property
def CO2(self):
return int(self.read_register(self.input_registers["CO2"]))
@property
def T(self):
return self.read_register(self.input_registers["T"], signed=True) / 10
@property
def RH(self):
return self.read_register(self.input_registers["RH"])

View File

@ -1,45 +0,0 @@
from typing import Dict, Final
from minimalmodbus import NoResponseError
from .generic import ModbusRTUDevice
class SensorWiredRHT(ModbusRTUDevice):
"""
Wired sensor measuring temperature, relative humidity
and light intensity.
"""
DEVICE_CLASS: Final[str] = "RHT_Wired"
DEVICE_CODE: Final[int] = 0x0020
input_registers: Dict[str, int] = {
"SER_NUM_1": 30001,
"SER_NUM_2": 30002,
"T": 30003, # from SHT4x
"T_F": 30004,
"RH": 30005, # from SHT4x
"LIGHT_INTENSITY_1": 30006,
"LIGHT_INTENSITY_2": 30007,
"ERROR_T_RH": 30008,
"ERROR_LIGHT": 30009,
} | ModbusRTUDevice.input_registers
holding_registers: Dict[str, int] = {
"MODBUS_ADDR": 40001,
"BAUDRATE": 40002,
"LTR329_GAIN": 40003,
"LTR329_MEAS_RATE": 40004,
"LTR329_INTEGRATION_TIME": 40005,
"LTR329_MODE": 40006,
} | ModbusRTUDevice.holding_registers
@property
def CO2(self):
return int(self.read_register(self.input_registers["CO2"]))
@property
def T(self):
return self.read_register(self.input_registers["T"], signed=True) / 10
@property
def RH(self):
return self.read_register(self.input_registers["RH"])